Про сортировку чисел и SIMD или как я обогнал STL в 16 раз

image

Cитуация, когда недостаток производительности пытаются покрыть новым железом, не редка. Важно понимать, однако, что железо, которое мы использовали и используем сегодня, содержит в себе множество механизмов, способных актуализировать наш код на годы вперед. В моем понимании программист, умеющий грамотно оперировать этими механизмами (в частности в терминах бизнес процессов, требующих 'Здесь и Сейчас', терминах поиска золотой середины между Скоростью и Дизайном) — профессионал. В этой статье речь пойдет про довольно изъезженную и, казалось бы, понятную тему — тему сортировок, но с одним небольшим дополнением — SIMD. Эту тему я выбрал не случайно: в процессе решения довольно важной для индустрии задачи возникла следующая подзадача: есть входное множество целых чисел. Каждому множеству сопоставлено свое уникальное значение. При этом множества элементов, которые отличаются между собой только порядком следования элементов, а не их значениями, считаются одинаковыми и должны возвращать одно и тоже значение. Одно из решений — посортировать множества, а затем использовать результат как ключ в Хеш Таблице. Одно из важных условий — количество элементов в множестве не превышает 128 элементов. Под катом рассказываю о том, как сортировать такие множества быстро.

Современные процессоры реализуют так называемый SIMD: принцип компьютерных вычислений, позволяющий обеспечить параллелизм на уровне данных. Например, в случае Intel это SSE, AVX расширения, реализуемые набором специальных 128, 256, 512-битных регистров, которые де-факто представляют некоторый 'Массив' значений меньшей размерности (например 64, 32, 16 или 8 бит) и инструкции работы с ними (сложить два таких массива в столбик одной командой, например). Таким образом, одной инструкцией мы получаем результат сразу для нескольких входных значений, что при умелом использовании положительно сказывается на производительности алгоритма. Хорошим кандидатом на использование SIMD выглядит какая-нибудь параллельная сортировка. Одна из самых известных и нам вполне подходящая — Битонная сортировка. И хотя свою долю славы она давно сыскала на различных многоблочных вычислительных устройствах (например GPU), сегодня мы будем рассматривать ее в контексте векторных SIMD инструкций CPU.

Алгоритм Битонной сортировки таков:

На вход подается последовательность чисел, размерность которой — степень двойки.

Первый шаг алгоритма заключается в получении Битонной последовательности из исходной. Битонная последовательность это такая последовательность, которая сначала монотонно не убывает, а затем монотонно не возрастает, либо приводится к такому виду в результате циклического cдвига. Например, последовательность чисел 1, 2, 4, 3 — битонная. Легко заметить, что из определения, любая последовательность, входящая в битонную, любая последовательность состоящая из одного или двух элементов, а также любая монотонная последовательность также является битонной. Каждое множество неотсортированных элементов можно считать множеством битонных последовательностей, состоящих из двух элементов. Сам алгоритм Битонной сортировки предполагает работу с каждой из половин входящего множества элементов.

Чтобы превратить произвольную последовательность в битонную, нужно:

  1. Разделить последовательность пополам.

  2. Первую часть превратить в битонную последовательность и отсортировать по возрастанию.

  3. Вторую часть превратить в битонную последовательность и отсортировать по убыванию.

На втором шаге алгоритма производится слияние полученных половин Битонной последовательности.

Чтобы превратить любую битонную последовательность в отсортированную, нужно:

  1. Разделить последовательность пополам, попарно сравнить элементы x[i] и x[n / 2 + i], меняя их местами, если элемент из первой половины больше (или меньше).

  2. Рекурсивно выполнить сортировку над каждой из получившихся половин.

В коде описанный алгоритм можно представить как:

#include 
#include 

template
    
void compare_and_swap(uint32_t *input_array, uint32_t i_a, uint32_t i_b)
{
    if (dir ==   (input_array[i_a] > input_array[i_b]))
        std::swap(input_array[i_a],  input_array[i_b]);
}

template
    
void bitonicMerge(uint32_t *input_array, uint32_t input_array_size, uint32_t start_ix)
{
    if (input_array_size < 2)
        return;

    const uint32_t half_size = input_array_size >> 1;

    for (uint32_t it_b = start_ix,
                  it_e = start_ix + half_size; it_b < it_e; ++it_b)
        compare_and_swap
            (input_array, it_b, it_b + half_size);

    bitonicMerge
        (input_array, half_size, start_ix);
    bitonicMerge
        (input_array, half_size, start_ix + half_size);
}

template
    
void bitonicSort(uint32_t *input_array, uint32_t input_array_size, uint32_t start_ix)
{
    if (input_array_size < 2)
        return;

    const uint32_t half_size = input_array_size >> 1;
            
    bitonicSort< dir> 
        (input_array, half_size, start_ix);
    bitonicSort
        (input_array, half_size, start_ix + half_size);

    bitonicMerge
        (input_array, input_array_size, start_ix);
}

Ассимптотическая сложность Битонной сортировки для худшего, медианного и лучшего случаев — O (N * log (N) * log (N)).

Таким образом, алгоритм начинает свою основную работу с последовательностей длины два, сортируя и наращивая битонные последовательности большей размерности.

Сам алгоритм можно изобразить графически с помощью сортировочных сетей. Ниже изображена сортировочная сеть для входного набора данных из восьми элементов:

5n7_9aej81xglo90btvv7yfhxam.png

Тем не менее у алгоритма битонной сортировки, описанного выше, есть один существенный для этой статьи недостаток, а именно — компараторы разной направленности (максимум и минимум двух чисел меняются местами в разные стороны). Этот недостаток легко исправляется изменением индексов сравниваемых элементов и добавлением компаратора, меняющего минимум и максимум только в одном направлении. Таким образом, изображение выше можно представить как:

p6unbqanz0nxuppoejkrevabf2a.png

Данное изображение и будет нашей отправной точкой в описании реализации алгоритма Битонной сортировки с использованием SIMD инструкций. Использовать будем — Intel SSE (4.2). Ниже — подробное, пошаговое описание реализации SSE алгоритма Битонной сортировки для последовательностей из 8 элементов, краткое описание масштабирования алгоритма для обработки массивов входных данных большей размерности, код, а также некоторые бенчмарки.

Intel SSE может оперировать 128-ью битными регистрами. Команда _mm_loadu_si128 загружает из памяти 4-е 32-битных целочисленных значения и хранит их в регистре в том же порядке, в котором они идут в памяти. Нам потребуется выполнить две таких команды для загрузки 8-ми 32-ух битных целочисленных значений из памяти прямиком в регистры. Однако порядок загруженных в регистр данных мы будем интерпретировать иначе исходному (линейному). Для понимания причин давайте взглянем на первый шаг алгоритма:

wq_gndghgdeoytag9k1gifc7hlo.png

Видим, что для выполнения первой итерации мы должны выполнить 4-е операции сравнения парно идущих двоек элементов исходной последовательности, перемещая минимумы и максимумы на соответствующие шагу 1 индексы. Используя SIMD, мы можем отдельно найти 4-е минимума и 4-е максимума, используя команды _mm_min_epi32 и _mm_max_epi32 соответсвенно. Другими словами, имея два регистра A {a_1, a_2, a_3, a_4} и B {b_1, b_2, b_3, b_4} со значениями (для наглядности) {1, 6, 7, 4} и {8, 3, 5, 2} мы интепретируем данные в них так, как изображено ниже:

rgzm0y3feee9bppirn7gaa6_r0a.png

Этот важный процесс я также продемонстрировал в анимации ниже, под спойлером:

Иллюстрация процесса инициализации [3МБ]

jtcuukmdqr6rxqvq8nxme9y3crm.gif

Таким образом, шаг 1 можно изобразить как:

a9aopiu9-rxjzotcbrmlnfpxowe.png

Здесь, я также выполнил нумерацию Минимальных и Максимальных элементов на выходе сети — это пригодится для понимания запутанного процесса перестановок данных внутри регистров. Таким образом, мы можем двумя SSE командами найти нужные нам минимумы и максимумы и перейти к следующему шагу алгоритма. Вообще говоря, поиск минимумов и максимумов — ключевая и неизменная операция, которая понадобится нам на протяжении работы всех шагов алгоритма.

Второй шаг сортировочной сети выглядит следующим образом (слева — выходные миниммумы и максимумы прошлого шага):

23ybxena_3cgksmkbifsi8y7p34.png

Важно понимать как выглядит текущее состояние регистров и как выглядит состояние необходимое для выполнения текущего шага. Для шага 2 это:

zwzamuqiq-c6jicknkzz1zob-e0.png

Можно заметить, что требуется некоторая модификация одного из регистров, полученного на прошлом шаге, а именно перестановка местами частей регистра хранящего максимумы: max2 с max1 и max4 с max3, после чего мы снова можем выполнить уже привычный нам compare_and_swap (поиск минимумов и максимумов среди двух пар регистров). Для перестановки данных внутри регистра SSE предоставляет команду _mm_shuffle_epi32, на вход которой мы подаем регистр, внутри которого выполняем перестановку, а также маску, по которой перестановка происходит. Результат описанных действий можно увидеть ниже:

cm67notf4usd0rnbui2yzw2ca5y.png

Переходим к шагу 3:

aukhypgosmjy0zg4_vlxj5-uxkc.png

В целом это повторение шага 1, однако перед его выполнением нам снова нужно перемешать данные в регистрах. И если данные внутри одного регистра перемешивались командой _mm_shuffle_epi32, то перемешать и закинуть данные из двух регистров в один можно с помощью команды _mm_shuffle_ps. Особенность этой команды в том, что итоговый регистр смешивания должен в первой половине содержать данные из первого регистра, а во второй половине — второго. Другими словами, во второй половине результата не может быть данных первого регистра, а в первой — данных второго. Здесь и во всех последующих шагах я опущу визуализацию состояния регистров на прошлом шаге и перед текущим compare_and_swap шага: по изображению текущего шага можно легко понять, что именно мы хотим получить. А вот визуализацию шагов связанных с перемешиванием и получением результата я опускать не стану:

wa3saw9gn5h6ukhhjgxuibidqmi.png

Шаг 4 выглядит так:

6jc82ce7f5qfw83biqmhdspvt-8.png

Опять же, для его выполнения, нам потребуется аналогично шагу 2 проделать перемешивание элементов внутри регистра с использованием _mm_shuffle_epi32:

jy8eakyc6_yalxfnpzdlommx2k8.png

Шаг 5:

mstwddoa0mhvghjd53an4zwoxsc.png

Необходимые операции перемешивания внутри регистров на шаге 5:

dnsbb9uyzewqpcbql9hxqyqca4c.png

Шаг 6:

irg7lak-dh_u-gnyxtzpxq8vfh4.png

Необходимые операции перемешивания внутри регистров на шаге 6:

prmnzw6u5migrf5c0clburba06o.png

Все шаги проделаны, но взять и сохранить данные регистров в память мы пока что не можем — нужно преобразовать наши регистры так, чтобы при сохранении данных в память они располагались в ней как {min1, max1, min2, max2, min3, max3, min4, max4}. Если бы мы выгружали данные из регистров сразу по окончании шага шесть, то только часть элементов находилась бы на нужном месте в памяти:

j-mxmcmx9komt4njyeeem_0of_y.png

Выходит, нужно проделать некоторые дополнительные действия, чтобы получить нужный результат. Во-первых, понадобятся временные регистры, в которые мы сохраним результат перемешивания данных внутри исходных регистров таким образом, чтобы вторая группа данных с изображения выше шла на своих местах. Во-вторых, нам потребуется команда блендинга регистров _mm_blend_epi16. Принцип ее работы таков: по 8-ми битам входной маски создается выходной регистр, в котором на i-ой позиции стоят соответствующие данные из первого регистра, если бит маски установлен в единицу, либо из второго, если соответствующий бит равен нулю. Иллюстрация этого процесса изображена ниже:

tr403mntx1q5jtwbp3mjuxicumu.png

Результатом смешивания является искомая последовательность {min1, max1, min2, max2, min3, max3, min4, max4}.

В коде описанный алгоритм можно представить как:

#include 

inline void compare_and_swap(__m128i &a, __m128i &b)
{
    __m128i c = a;

    a = _mm_min_epi32(a, b);
    b = _mm_max_epi32(c, b);
}

#define shuffle_two_vecs(a, b, mask) \
    reinterpret_cast<__m128i>(_mm_shuffle_ps( \
        reinterpret_cast<__m128>(a), reinterpret_cast<__m128>(b), mask));

inline void sort_8(int *arr)
{
    __m128i a = _mm_loadu_si128(reinterpret_cast<__m128i *>(arr + 0));
    __m128i b = _mm_loadu_si128(reinterpret_cast<__m128i *>(arr + 4));

    /* шаг 1 */
    compare_and_swap(a, b);

    /* шаг 2 */
    b = _mm_shuffle_epi32(b, _MM_SHUFFLE(2, 3, 0, 1));
    compare_and_swap(a, b);

    /* шаг 3 */
    __m128i t = a;
    a = shuffle_two_vecs(a, b, 0b10001000);
    b = shuffle_two_vecs(t, b, 0b11011101);
 
    compare_and_swap(a, b);

    /* шаг 4 */
    b = _mm_shuffle_epi32(b, _MM_SHUFFLE(0, 1, 2, 3));

    compare_and_swap(a, b);

    /* шаг 5 */
    t = a;
    a = shuffle_two_vecs(a, b, 0b01000100);
    b = shuffle_two_vecs(t, b, 0b11101110);

    compare_and_swap(a, b);

    /* шаг 6 */
    t = a;
    a = shuffle_two_vecs(a, b, 0b10001000);
    b = shuffle_two_vecs(t, b, 0b11011101);

    compare_and_swap(a, b);

    /* перед сохранением в память - меняем порядок */
    auto t_1 = _mm_shuffle_epi32(a, _MM_SHUFFLE(2, 3, 0, 1));
    auto t_2 = _mm_shuffle_epi32(b, _MM_SHUFFLE(2, 3, 0, 1));

    b = _mm_blend_epi16(t_1, b, 0b11001100);
    a = _mm_blend_epi16(a, t_2, 0b11001100);

    /* сохраняем данные регистров в память */
    _mm_storeu_si128(reinterpret_cast<__m128i *>(arr + 0), a);
    _mm_storeu_si128(reinterpret_cast<__m128i *>(arr + 4), b);
}

Итак, выше мы получили SSE версию алгоритма Битонной Сортировки для 8-ми элементов. Полученный алгоритм — возрастающая inplace сортировка (не требует дополнительной оперативной памяти) реализованная на регистрах. Чтобы сделать сортировку убывающей, достаточно в функции compare_and_swap в переменной a хранить максимум, а в переменной b — минимум.

Масштабирование алгоритма для большего количества входных элементов:

Существует ряд закономерностей упрощающих масштабировать описанный выше алгоритм на массивы большего размера. На рисунке ниже — сортировочная сеть для 16-ти элементов:

rbaix0deqehouugn6fjebcl_tss.png

До шага 7 мы применяем к каждой из входных половин алгоритм, описанный выше. Шаг 7 привносит кое-что «новое»: нам необходимо попарно сравнивать элементы, стоящие на одних и тех же индексах, с начала и с конца. Чтобы выполнить этот шаг, я сделал следующее: первую половину входных данных отсортировал по возрастанию, а вторую — по убыванию. При этом сортировка тут — это процесс алгоритма сортировки 8-ми элементов до момента перестановки данных внутри регистров для результирующего сохранения в память. Затем выполнил compare_and_swap над полученными половинами (каждая из которых хранится в двух XMM регистрах). Последующие шаги, как можно видеть, не требуют знания обо всех элементах сортируемого множества, и задачу можно свести к обработке массивов меньшей размерности. Шаг 8, шаг 9 и шаг 10 похожи между собой: мы сравниваем пары элементов расстояние между индексами которых 4, 2 и 1 соответственно. Однако основной похожестью этих шагов является то, что их выполнение это повторение одной и той же операции, но с разными входными регистрами: мы выполняем уже известный нам _mm_shuffle_ps и, более того, с одной и той же маской на каждом шаге. После выполнения всех шагов мы должны поменять порядок следования элементов внутри регистров аналогично тому, как мы делали это при сортировке 8-ми элементов. Только теперь подобная операция распространяется на каждую пару соседних регистров. Аналогичным образом выполняется масштабирование алгоритма для сортировки массивов большей длины: сначала сортируем по возрастанию первую половину, по убыванию — вторую, выполняем compare_and_swap. Следящий шаг, требующий попарной сортировки чисел с расстоянием между индексами большей или равной 8 — отличается от того, что мы делали ранее. Здесь нам не требуется выполнять каких-либо перестановок элементов внутри регистров, и мы можем просто выполнить compare_and_swap над соответствующими регистрами. Все последующие шаги, за исключением последних трех — повторяют данный шаг (не забываем только про разные входные регистры). Последние три шага — это всегда перестановка _mm_shuffle_ps с одной и той же маской.

Код и бенчмарки:

Исходный код описанной SSE сортировки я выложил на github. Здесь же находится код бенчмарка. В сравненнии я использовал написанные мной SSE версии сортировок для 8, 16, 32, 64 и 128 элементов (bench_sort_bit_xxx), а само сравнение производил с STL sort (bench_sort_stl_xxx), а также с некоторыми из наилучших сортировок из этого бенчмарка и репозитория с сортировками.

Результаты на Ryzen 7 5800H, компилятор GCC 13:

bench_sort_bit_8/min_warmup_time:1.000/iterations:1000000         2.81 ns         2.81 ns      1000000
bench_sort_stl_8/min_warmup_time:1.000/iterations:1000000         48.2 ns         48.2 ns      1000000
bench_sort_pdq_8/min_warmup_time:1.000/iterations:1000000         47.5 ns         47.5 ns      1000000
bench_sort_ska_8/min_warmup_time:1.000/iterations:1000000         47.7 ns         47.7 ns      1000000
bench_sort_sn_8/min_warmup_time:1.000/iterations:1000000          39.6 ns         39.6 ns      1000000
bench_sort_bit_16/min_warmup_time:1.000/iterations:1000000        7.28 ns         7.29 ns      1000000
bench_sort_stl_16/min_warmup_time:1.000/iterations:1000000         119 ns          119 ns      1000000
bench_sort_pdq_16/min_warmup_time:1.000/iterations:1000000         121 ns          121 ns      1000000
bench_sort_ska_16/min_warmup_time:1.000/iterations:1000000         121 ns          121 ns      1000000
bench_sort_sn_16/min_warmup_time:1.000/iterations:1000000          125 ns          125 ns      1000000
bench_sort_bit_32/min_warmup_time:1.000/iterations:1000000        18.1 ns         18.1 ns      1000000
bench_sort_stl_32/min_warmup_time:1.000/iterations:1000000         405 ns          405 ns      1000000
bench_sort_pdq_32/min_warmup_time:1.000/iterations:1000000         295 ns          295 ns      1000000
bench_sort_ska_32/min_warmup_time:1.000/iterations:1000000         295 ns          295 ns      1000000
bench_sort_sn_32/min_warmup_time:1.000/iterations:1000000          379 ns          379 ns      1000000
bench_sort_bit_64/min_warmup_time:1.000/iterations:1000000        45.5 ns         45.5 ns      1000000
bench_sort_stl_64/min_warmup_time:1.000/iterations:1000000         994 ns          994 ns      1000000
bench_sort_pdq_64/min_warmup_time:1.000/iterations:1000000         675 ns          675 ns      1000000
bench_sort_ska_64/min_warmup_time:1.000/iterations:1000000         674 ns          674 ns      1000000
bench_sort_sn_64/min_warmup_time:1.000/iterations:1000000         1245 ns         1245 ns      1000000
bench_sort_bit_128/min_warmup_time:1.000/iterations:1000000        116 ns          116 ns      1000000
bench_sort_stl_128/min_warmup_time:1.000/iterations:1000000       2310 ns         2310 ns      1000000
bench_sort_pdq_128/min_warmup_time:1.000/iterations:1000000       1496 ns         1496 ns      1000000
bench_sort_ska_128/min_warmup_time:1.000/iterations:1000000       1897 ns         1897 ns      1000000

Результаты на Ryzen 7 5800H, компилятор Clang 17:

bench_sort_bit_8/min_warmup_time:1.000/iterations:1000000         2.75 ns         2.75 ns      1000000
bench_sort_stl_8/min_warmup_time:1.000/iterations:1000000         52.8 ns         52.8 ns      1000000
bench_sort_pdq_8/min_warmup_time:1.000/iterations:1000000         38.2 ns         38.2 ns      1000000
bench_sort_ska_8/min_warmup_time:1.000/iterations:1000000         38.8 ns         38.8 ns      1000000
bench_sort_sn_8/min_warmup_time:1.000/iterations:1000000          5.62 ns         5.62 ns      1000000
bench_sort_bit_16/min_warmup_time:1.000/iterations:1000000        7.34 ns         7.34 ns      1000000
bench_sort_stl_16/min_warmup_time:1.000/iterations:1000000         126 ns          126 ns      1000000
bench_sort_pdq_16/min_warmup_time:1.000/iterations:1000000        89.3 ns         89.3 ns      1000000
bench_sort_ska_16/min_warmup_time:1.000/iterations:1000000        88.6 ns         88.6 ns      1000000
bench_sort_sn_16/min_warmup_time:1.000/iterations:1000000         17.0 ns         17.0 ns      1000000
bench_sort_bit_32/min_warmup_time:1.000/iterations:1000000        18.7 ns         18.7 ns      1000000
bench_sort_stl_32/min_warmup_time:1.000/iterations:1000000         419 ns          419 ns      1000000
bench_sort_pdq_32/min_warmup_time:1.000/iterations:1000000         297 ns          297 ns      1000000
bench_sort_ska_32/min_warmup_time:1.000/iterations:1000000         296 ns          296 ns      1000000
bench_sort_sn_32/min_warmup_time:1.000/iterations:1000000         56.6 ns         56.6 ns      1000000
bench_sort_bit_64/min_warmup_time:1.000/iterations:1000000        46.1 ns         46.1 ns      1000000
bench_sort_stl_64/min_warmup_time:1.000/iterations:1000000        1020 ns         1020 ns      1000000
bench_sort_pdq_64/min_warmup_time:1.000/iterations:1000000         691 ns          691 ns      1000000
bench_sort_ska_64/min_warmup_time:1.000/iterations:1000000         689 ns          689 ns      1000000
bench_sort_sn_64/min_warmup_time:1.000/iterations:1000000          225 ns          225 ns      1000000
bench_sort_bit_128/min_warmup_time:1.000/iterations:1000000        128 ns          128 ns      1000000
bench_sort_stl_128/min_warmup_time:1.000/iterations:1000000       2400 ns         2400 ns      1000000
bench_sort_pdq_128/min_warmup_time:1.000/iterations:1000000       1533 ns         1532 ns      1000000
bench_sort_ska_128/min_warmup_time:1.000/iterations:1000000       1967 ns         1966 ns      1000000

По результатам видим, что SIMD версия битонной сортировки минимум в 16 раз обгоняет C++ STL сортировку, для разных размерностей входного множества.

Послесловие: Наверное, SIMD одна из тех технологий, используя которую, можно сказать «Зачем платить больше, если можно платить меньше?». Многие начинающие программисты, часто находят интересное в оптимизации простейших алгоритмов по типу memcpy с использлванием SIMD, но эта технология применима и в более комплексных алгоритмах: сортировки, хширование, численные методы, геометрические задачи, работа со строками и т.д

Работу над этой статьей я начинал, еще работая в Яндексе (вдохновленный @juks) и даже выложил «черновой» вариант на telegra.ph, а также на внутрикорпоративный ресурс Яндекса — Этушку. К моему удивлению, этот вариант статьи смог легко взлететь в местный топ, обгоняя статьи о новостях компании, всяких ништяках и т.п

А еще меня можно почитать в телеграмм.

© Habrahabr.ru