Реализация алгоритма PolyUnpack для распаковки вредоносного ПО

Одним днём я читал отчёты по TI из различных источников. В одном из таких отчётов упоминался некий алгоритм для распаковки вредоносного ПО, под названием PolyUnpack. Мне стало интересно, и я решил изучить данную тему. Оказалось, что в Интернете очень мало информации по данному алгоритму. Из интересного я нашёл статью, в которой впервые был описан алгоритм и чью-то курсовую работу.

Я решил написать свою реализацию данного алгоритма, так как часто приходится иметь дело с запакованным ВПО. В этой статье я расскажу, в чём заключается алгоритм, как я его реализовал, с какими проблемами я столкнулся и какие есть альтернативы для распаковки вредоносов. В конце я оставил ссылку на GitHub.

Описание алгоритма

Используя ту самую статью, мы можем понять, в чём заключается алгоритм. PolyUnpack — это метод динамического анализа, предназначенный для выявления упакованных или зашифрованных вредоносных программ, особенно тех, которые используют полиморфизм для обхода сигнатурных антивирусов. Он был предложен как способ автоматической распаковки вредоносных файлов во время их исполнения.

Процесс распаковки
Процесс распаковки

Идея алгоритма заключается в том, что мы сравниваем два варианта кода бинарного файла: статический и динамический. Для этого выполняется статическое дизассеблирование программы. Далее, во время отладки, для каждой инструкции выполняются следующие действия:

  1. Исполняется инструкция;

  2. Читается память по адресу регистра ip;

  3. Инструкция дизассемблируется и сравнивается с инструкцией по адресу регистра ip из статического представления;

  4. Если инструкции различаются или же мы вышли за пределы секции кода исполняемого файла — мы нашли распакованный код.

После определения факта распаковки, мы можем сдампить интересующий нас сегмент памяти или же сдампить весь процесс целиком, для дальнейшего анализа.

Реализация алогритма

Для реализации алгоритма, я использовал python3 с библиотекой py3dbg, которая является форком pydbg для python2. По факту эта библиотека — небольшая оболочка для WinAPI-вызовов, связанных с отладкой. Для дизассемблирования я использовал библиотеку capstone.

На вход программа будет принимать имя исполняемого файла, а на выходе выдавать нам листинг ассемблера распакованного кода и дампить память процесса.

Итак, для начала нам нужно получить статическое представление программы в виде листинга ассемблера. Для этого я создал класс Disassembler. Его главный метод — getInstruction, который предназначен для получения инструкции по адресу address.

class Disassembler():
    ...
    def getInstruction(self, address: int) -> str:
          physical_address = self.getPhysicalAddress(
              address - self.__offset)
          # Вычисляем адрес в файле, с учётом виртуальных адресов и ASLR 
  
          data = self.__data
          data = data[physical_address:physical_address+16]
          # Берём 16 байт для дизассемблирования
  
          try:
              instruction = next(self.__md.disasm(data, address, 1))
          except StopIteration:
              # Если не можем дизассемблировать - возвращаём пустую строку
              return ("")
  
          return instruction.mnemonic + " " + instruction.op_str

Этот класс успешно справляется с задачами дизассемблирования и позволяет нам получить полный статический листинг ассемблера кода программы.

Теперь нам нужно запустить и исполнить наш файл. Класс Debugger использует библиотеку py3dbg для отладки. Я использовал метод библиотеки debug_event_iteration для того, чтобы исполнять код по одной инструкции. В методе getNextInstruction происходит исполнение инструкции.

class Debugger():
    ...
    def getNextInstruction(self) -> tuple[str, int]:
        if (not self.__dbg.debugger_active):
            raise ChildProcessError("Program has exited!")

        self.__dbg.debug_event_iteration()

        if (self.__next_address > 0x7F0000000000):
            # Мы в системной библиотеке 
            return self.getNextInstruction()

        if (self.__wordsize == 32):
            if (self.__next_address > 0x70000000 and self.__next_address < 0x7FFFFFFF):
                # Мы в системной библиотеке
                return self.getNextInstruction()

        return (self.__next_instruction, self.__next_address)

Вся магия исполнения происходит в методе __handleBreak, который вызывается при исполнении каждой инструкции. В этом же методе меняются значения полей __next_instruction и __next_address. Алгоритм работы метода можно описать так:

  • Читаем регистр ip;

  • Читаем память по адресу ip;

  • Дизассемблируем прочитанную память;

  • Возвращаем инструкцию.

В начале, мы определяем значение регистра ip:

class Debugger():
    ...
    def __handleBreak(self, _: py3dbg.pydbg) -> int:
        self.__next_address = self.__dbg.context.Rip 
        # Eip для 32-разрядных процессов
        ...

После этого мы читаем инструкцию по адресу ip и дизассемблируем её:

class Debugger():
    ...
    def __handleBreak(self, _: py3dbg.pydbg) -> int:
        ...
        data = self.__dbg.read_process_memory(self.__next_address, 16)
        # Читаем 16 байт по адресу ip
    
        instruction = next(self.__md.disasm(data, self.__next_address, 1))
        self.__next_instruction = instruction.mnemonic +  " " + instruction.op_str
        ...

В самом конце мы ставим флаг EFLAGS_TRAP, для исполнения инструкций по одной. В py3dbg есть встроенная функция single_step, но она работает намного медленнее, по сравнению с ручным выставлением флага:

class Debugger():
    ...
    def __handleBreak(self, _: py3dbg.pydbg) -> int:
        ...
        self.__dbg.context.EFlags |= py3dbg.defines.EFLAGS_TRAP
        self.__dbg.set_thread_context(self.__dbg.context)

        return py3dbg.defines.DBG_EXCEPTION_HANDLED

Как уже было сказано, __handleBreak выполняется после каждой инструкции. Такое поведение приводит к очень низкой производительности. Например, распаковка UPX, в такой конфигурации, занимает бесконечное количество времени. Для ускорения работы, мы бы могли кэшировать выполненные инструкции. Поэтому нам нужно оптимизировать наш метод __handleBreak, для того, чтобы программу можно было использовать.

Оптимизации

Оптимизацию я начал с кэширования инструкций, которые уже исполнялись. Для этого я определил поле __cache в классе Debugger, которое является словарём, и куда будут записываться уже выполненные инструкции.

class Debugger():
    ...
    def __handleBreak(self, _: py3dbg.pydbg) -> int:
        ...
        if (self.__next_address in self.__cache):
            if (self.__cache[self.__next_address] == self.__next_instruction):
                return py3dbg.defines.DBG_EXCEPTION_HANDLED

        self.__cache[self.__next_address] = self.__next_instruction
        
        self.__dbg.context.EFlags |= py3dbg.defines.EFLAGS_TRAP
        self.__dbg.set_thread_context(self.__dbg.context)

        return py3dbg.defines.DBG_EXCEPTION_HANDLED

Теперь нам нужно использовать наш кэш, чтобы пропускать уже исполненные инструкции. В этот момент возникает следующая проблема: мы ставим флаг ELFAGS_TRAP в методе __handleBreak. Этот флаг отвечает за последовательное выполнение команд по одной. Если мы попадём на кешированную инструкцию, флаг не будет установлен, так как мы сразу выйдем из метода. Если ставить флаг, до выхода из метода, в кэшировании не будет смысла, так как мы также будем исполнять все инструкции. Для решения этой проблемы я придумал небольшой алгоритм.

Всю программу можно условно поделить на блоки. Разделяют эти блоки инструкции переходов (jmp, jz, jne и т.д) и инструкции вызова функций (call).

Типичная программа, открытая в IDA Pro
Типичная программа, открытая в IDA Pro

При кэшировании инструкций, мы можем рассматривать нашу программу как такую последовательность блоков. Мой алгоритм заключается в следующем:

  1. Исполняем инструкции, пока не наткнёмся на вызов функции или переход;

  2. Ставим точку останова на адрес перехода (операнд инструкции);

  3. Ставим точку останова на адрес, следующий после инструкции перехода;

  4. Продолжаем работу отладчика.

Таким образом мы установим точки останова на оба разветвления условного перехода или, в случае функций, на входе и адресе возврата. Такой метод позволяет нам кэшировать блоки кода целиком (в том числе и функции), и пропускать уже исполненные блоки.

Визуализация работы алгоритма. Зёленные блолки – уже выполненный код, Красные блоки – там где стоят точки останова
Визуализация работы алгоритма. Зёленные блолки — уже выполненный код, Красные блоки — там где стоят точки останова

На видео показана работа алгоритма. За несколько исполнений функции мы проходим все возможные ответвления, и кэшируем их. После того как мы полностью кэшировали функцию, мы больше не будем в неё заходить, ведь мы ставим точку останова после инструкции call. В коде это выглядит вот так:

class Debugger():
    ...
    def __handleBreak(self, _: py3dbg.pydbg) -> int:
        ...
        if (self.__next_instruction[:4] == "call"
            or self.__next_instruction[0] == "j"
            or self.__next_instruction[:3] == "rep" 
           ):
            # Для инстркций rep такая-же логика
            
            self.__dbg.bp_set(self.__next_address + len(instruction.bytes))
            # Точка останова на следующую инструкцию
            
            if (self.__next_instruction[:3] != "rep"):
                try:
                    self.__dbg.bp_set(int(instruction.op_str, 16))
                    # Точка останова на адрес перехода
                    
                    self.__dbg.bp_set(getattr(self.__dbg.context,
                                              instruction.op_str.capitalize()))
                    # Если у нас, например, вызов call rax – читаем значение регистра 
                    
                except Exception:
                    pass
        ...

Ещё одна оптимизация, которую я сделал — оптимизация чтения памяти процесса. Я решил кэшировать блоки по 4096 байт (размер страницы Windows для x86 и x64, согласно статье), чтобы каждый раз не выполнять вызов ReadProcessMemory. Для этого я создал поле __pages (словарь), и немного дополнил код:

class Debugger():
    ...
    def __handleBreak(self, _: py3dbg.pydbg) -> int:
        ...
        padded_address = self.__next_address & 0xFFFFFFFFFFFFF000

        if (padded_address not in self.__pages):
            data = self.__dbg.read_process_memory(padded_address, 4096)
            self.__pages[padded_address] = data

        offset = self.__next_address & 0xFFF
        instruction_address = self.__pages[padded_address][offset:offset + 16]
        ...

Теперь программа может спокойно распаковать простые упаковщики. Можно переходить к тестам.

Тестирование программы

Начнём с простого — UPX. За 0,39 секунд, программа распаковала 2431 инструкцию.

Распаковка UPX
Распаковка UPX

Довольно неплохой результат. Я также добавил флаги --dump и --pd для дампа памяти процесса с использованием вызова ReadProcessMemory и утилиты ProcessDump соответсвтенно.

Теперь попробуем распаковать кастомный упаковщик. Я выбрал один из имеющихся у меня запакованных сэмплов YoungLotus. В этот раз я также использую флаг --pd, для того, чтобы сдампить память процесса. Хоть в конце нам и выдаёт ошибку, программа отрабатывает как надо, и мы получаем распакованный сэмпл.

Распаковка YoungLotus
Распаковка YoungLotus

Последним тестом покажу распаковку шэлкода, который я скомпилировал, используя следующий код на C:

# include 
# include 
# include 

# include 


int main(void) {
    char* shellcode = 
        "\x33\xc9\x64\x8b\x49\x30\x8b\x49\x0c\x8b"
        "\x49\x1c\x8b\x59\x08\x8b\x41\x20\x8b\x09"
        "\x80\x78\x0c\x33\x75\xf2\x8b\xeb\x03\x6d"
        "\x3c\x8b\x6d\x78\x03\xeb\x8b\x45\x20\x03"
        "\xc3\x33\xd2\x8b\x34\x90\x03\xf3\x42\x81"
        "\x3e\x47\x65\x74\x50\x75\xf2\x81\x7e\x04"
        "\x72\x6f\x63\x41\x75\xe9\x8b\x75\x24\x03"
        "\xf3\x66\x8b\x14\x56\x8b\x75\x1c\x03\xf3"
        "\x8b\x74\x96\xfc\x03\xf3\x33\xff\x57\x68"
        "\x61\x72\x79\x41\x68\x4c\x69\x62\x72\x68"
        "\x4c\x6f\x61\x64\x54\x53\xff\xd6\x33\xc9"
        "\x57\x66\xb9\x33\x32\x51\x68\x75\x73\x65"
        "\x72\x54\xff\xd0\x57\x68\x6f\x78\x41\x01"
        "\xfe\x4c\x24\x03\x68\x61\x67\x65\x42\x68"
        "\x4d\x65\x73\x73\x54\x50\xff\xd6\x57\x68"
        "\x72\x6c\x64\x21\x68\x6f\x20\x57\x6f\x68"
        "\x48\x65\x6c\x6c\x8b\xcc\x57\x57\x51\x57"
        "\xff\xd0\x57\x68\x65\x73\x73\x01\xfe\x4c"
        "\x24\x03\x68\x50\x72\x6f\x63\x68\x45\x78"
        "\x69\x74\x54\x53\xff\xd6\x57\xff\xd0";

    char* buf = VirtualAlloc(NULL, strlen(shellcode),
		MEM_COMMIT, PAGE_EXECUTE_READWRITE);

    memcpy(buf, shellcode, strlen(shellcode));

    DWORD tmp;
    VirtualProtect(buf, strlen(shellcode), PAGE_EXECUTE_READWRITE, &tmp);

    ((void (*)(void))buf)();

    return EXIT_SUCCESS;
}

Этот шелкод предназначен для архитектуры x86 и просто создаёт диалоговое окно с текстом «Hello World!». Теперь я покажу работу флага --dump. Он просто читает и записывает в файл всю память, в которой исполняется распакованный код. Выравнивается память по адресу страницы.

Распаковка шэлкода
Распаковка шэлкода

Теперь, когда я описал логику работы и показал тесты программы, мы можем поговорить о существенных недостатках как алгоритма PolyUnpack, так и моей реализации.

Недостатки программы

Рассмотрим недостатки программы, коих существует не мало:

  • Нужно запускать в изолированной среде (VM) — одни из главных недостатков, так как нам нужно использовать VM под ОС Windows. Это ограничивает встраивание данной реализации в TI-workflow;

  • Запакованное ВПО может загружать дополнительную полезную нагрузку с сервера — в данном случае, вредонос может, например, распаковывать скрипт на VisualBasic, который дополнительно скачивает нагрузку с сервера и запускает её в отдельном процессе. Наша реализация не способна отработать данную ситуацию;

  • Реализация не работает для модифицирующих свой код вредоносов — во время тестирование на семействе SmokeLoader стала очевидна проблема самомодифицирующихся файлов. Этот вредонос имеет предварительно зашифрованный код, который в процессе исполнения программы расшифровывается. Для нас это является проблемой, так как мы ставим точки останова после всех инструкций call. В SmokeLoader большое количество функций зашифрованы и расшифровка происходит прямо перед их исполнением. Так как мы ставим программную точку останова (меняем инструкцию на int3), вредонос во время расшифровки применяет XOR, для нашей инструкции int3, а не для оригинальной инструкции. Это в свою очередь нарушает работу программы и процесс распаковки становится не возможным;

  • Существует множество альтернатив — есть множество более простых и удобных вариантов для распаковки вредоносного ПО.

Альтернативы для распаковки ВПО

В качестве альтернатив я могу предложить довольно простой алгоритм:

  1. Ставим точки останова на VirtualAlloc, VirtualProtect и подобные функции;

  2. Смотрим, какой адрес передавался в функцию (или возвращается ей), и ставим точку останова на исполнение на всю страницу;

  3. При попадании на такую точку останова дампим память процесса.

Довольно простой алгоритм, который позволяет распаковывать самые простые упаковщики.

Если же мы говорим о готовых решениях, можно использовать такие платформы, как Any.Run, Joe Sandbox, Hybird Analysis, UnpackMe и им подобные. Но если мы имеем дело с тысячами или даже десятками тысяч экземпляров ВПО ежедневно, использовать данные инструменты становиться проблематично. Поэтому мы должны смотреть в сторону собственных решений и разработок в области распаковки ВПО, для включения их в свой TI-workflow.

Заключение

В этой статье мы рассмотрели, реализовали и протестировали алгоритм PolyUnpack. Во время разработки и реализации алгоритма, мы столкнулись с проблемами оптимизации, которые мы успешно решили, использовав простой алгоритм кэширования. Тестирование программы показало, что она способна распаковывать простенькие образцы ВПО. Мы выделили недостатки как самого алгоритма, так и конкретной реализации и предложили альтернативы для распаковки вредоносов.

Для интерисующихся я оставлю ссылку на проект: https://github.com/Reedus0/Upack.

© Habrahabr.ru