[Из песочницы] Внешняя сортировка с O(1) дополнительной памяти

Прочитав эту статью, я вспомнил, как писал внешнюю сортировку, которая использовала O(1) внешней памяти. Функция получала бинарый файл и максимальный размер памяти, которую она могла выделить под массив:

void ext_sort(const std::string filename, const size_t memory)


Я использовал алгоритм из Effective Performance of External Sorting with No Additional Disk Space:

  1. Разделим файл на блоки, которые помещаются в доступную память. Обозначим эти блоки Block_1, Block_2, …, Block_(S-1), Block_S. Установим P = 1.
  2. Читаем Block_P в память.
  3. Отсортируем данные в памяти и запишем назад в Block_P. Установим P = P + 1, и если P ≤ S, то читаем Block_P в память и повторяем этот шаг. Другими словами, отсортируем каждый блок файла.
  4. Разделим каждый блок на меньшие блоки B_1 и B_2. Каждый из таких блоков занимает половину доступной памяти.
  5. Читаем блок B_1 блока Block_1 в первую половину доступной памяти. Установим Q = 2.
  6. Читаем блок B_1 блока Block_Q во вторую половину доступной памяти.
  7. Объеденим массивы в памяти с помощью in-place слияния, запишем вторую половину памяти в блок B_1 блока Block_Q и установим Q = Q + 1, если Q ≤ S, читаем блок B_1 блока Block_Q во вторую половину доступной памяти и повторяем этот шаг.
  8. Записываем первую половину доступной памяти в блок B_1 блока Block_1. Так как мы всегда оставляли в памяти меньшую половину элементов и провели слияние со всеми блоками, то в этой части памяти хранятся M минимальных элементы всего файла.
  9. Читаем блок B_2 блока Block_S во вторую половину доступной памяти. Установим Q = S −1.
  10. Читаем блок B_2 блока Block_Q в первую половину доступной памяти.
  11. Объеденим массивы в памяти с помощью in-place слияния, запишем первую половину доступной памяти в блок B_2 блока Block_Q и установим Q = Q −1. Если Q ≥ 1 читаем блок B_2 блока Block_Q в первую половину доступной памяти и повторяем этот шаг.
  12. Записываем вторую половину доступной памяти в блок B_2 блока Block_S. Аналогично шагу 8, тут хранятся максимальные элементы всего файла.
  13. Начиная от блока B_2 блока Block_1 и до блока B_1 блока Block_S, определим новые блоки в файле и снова пронумеруем их Block_1 to Block_S. Разделим каждый блок на блоки B_1 и B_2. Установим P = 1.
  14. Читаем B_1 и B_2 блока Block_P в память. Объеденим массивы в памяти. запишем отсортированный массив назад в Block_P и установим P = P +1. Если P ≤ S, повторяем этот шаг.
  15. Если S > 1, возвращаемся к шагу 5. Каждый раз мы выделяем M минимальных и максимальных элементов, записываем их в начало и конец файла соответственно, а потом делаем то же самое с оставшимися элементами, пока не дойдем до середины файла.


Преимущество такого алгоритма, кроме отсутствия буфера на диске, это то, что с диска мы читаем данные относительно большими порциями, что ускоряет алгоритм.

Реализуем алгоритм на C++.
Для начала определим количество блоков и размер блока в байтах и в элементах и выделим память:

 const size_t type_size = sizeof(T);
        const uint64_t filesize = file_size(filename);
        std::fstream data(filename, std::ios::in | std::ios::out | std::ios::binary);
        const uint64_t chunk_number = filesize / memory;
        const size_t chunk_size = memory / type_size - (memory / type_size) % 2, chunk_byte_size =
                chunk_size * type_size, half_chunk_byte_size = chunk_byte_size / 2, half_chunk_size = chunk_size / 2;

        std::vector<T> *chunk = new std::vector<T>(chunk_size);


Теперь пункты 2-3 — сортируем каждый блок:

 for (uint64_t i = 0; i < chunk_number; ++i) {
                data.seekg(chunk_byte_size * i);
                data.read((char *) chunk->data(), chunk_byte_size);
                flat_quick_sort(chunk->begin(), chunk->end());
                data.seekp(chunk_byte_size * i);
                data.write((char *) chunk->data(), chunk_byte_size);
        }


Саму сортировку мы напишем чуть позднее.

Приступим к слияниям. Нижняя половина:

 int64_t s = chunk_number, start = 0;
        while (s > 0) {
                data.seekp(half_chunk_byte_size * start);
                data.read((char *) chunk->data(), half_chunk_byte_size);
                for (int64_t q = 1; q < s; ++q) {
                        data.seekg(half_chunk_byte_size * start + chunk_byte_size * q);
                        data.read((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
                        in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size);
                        data.seekp(half_chunk_byte_size * start + chunk_byte_size * q);
                        data.write((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
                }
                data.seekp(half_chunk_byte_size * start);
                data.write((char *) chunk->data(), half_chunk_byte_size);


И аналогично верхняя:

         data.seekp(half_chunk_byte_size * start + chunk_byte_size * s - half_chunk_byte_size);
                data.read((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
                for (int64_t q = s - 2; q >= 0; --q) {
                        data.seekg(half_chunk_byte_size * (start + 1) + chunk_byte_size * q);
                        data.read((char *) chunk->data(), half_chunk_byte_size);
                        in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size);
                        data.seekp(half_chunk_byte_size * (start + 1) + chunk_byte_size * q);
                        data.write((char *) chunk->data(), half_chunk_byte_size);
                }
                data.seekg(half_chunk_byte_size * start + chunk_byte_size * s - half_chunk_byte_size);
                data.write((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);


Перераспределяем блоки, завершаем цикл и не забываем освободить память:

         --s;
                ++start;
                for (int64_t p = 0; p < s; ++p) {
                        data.seekp(half_chunk_byte_size * start + chunk_byte_size * p);
                        data.read((char *) chunk->data(), chunk_byte_size);
                        in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size);
                        data.seekg(half_chunk_byte_size * start + chunk_byte_size * p);
                        data.write((char *) chunk->data(), chunk_byte_size);
                }
        }

        delete chunk;
}


Функция полностью
template<typename T>
void ext_sort(const std::string filename, const size_t memory) {

        const size_t type_size = sizeof(T);
        const uint64_t filesize = file_size(filename);
        std::fstream data(filename, std::ios::in | std::ios::out | std::ios::binary);
        const uint64_t chunk_number = filesize / memory;
        const size_t chunk_size = memory / type_size - (memory / type_size) % 2, chunk_byte_size =
                chunk_size * type_size, half_chunk_byte_size = chunk_byte_size / 2, half_chunk_size = chunk_size / 2;

        std::vector<T> *chunk = new std::vector<T>(chunk_size);
        for (uint64_t i = 0; i < chunk_number; ++i) {
                data.seekg(chunk_byte_size * i);
                data.read((char *) chunk->data(), chunk_byte_size);
                flat_quick_sort(chunk->begin(), chunk->end());
                data.seekp(chunk_byte_size * i);
                data.write((char *) chunk->data(), chunk_byte_size);
        }

        int64_t s = chunk_number, start = 0;
        while (s > 0) {
                data.seekp(half_chunk_byte_size * start);
                data.read((char *) chunk->data(), half_chunk_byte_size);
                for (int64_t q = 1; q < s; ++q) {
                        data.seekg(half_chunk_byte_size * start + chunk_byte_size * q);
                        data.read((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
                        in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size);
                        data.seekp(half_chunk_byte_size * start + chunk_byte_size * q);
                        data.write((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
                }
                data.seekp(half_chunk_byte_size * start);
                data.write((char *) chunk->data(), half_chunk_byte_size);

                data.seekp(half_chunk_byte_size * start + chunk_byte_size * s - half_chunk_byte_size);
                data.read((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
                for (int64_t q = s - 2; q >= 0; --q) {
                        data.seekg(half_chunk_byte_size * (start + 1) + chunk_byte_size * q);
                        data.read((char *) chunk->data(), half_chunk_byte_size);
                        in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size);
                        data.seekp(half_chunk_byte_size * (start + 1) + chunk_byte_size * q);
                        data.write((char *) chunk->data(), half_chunk_byte_size);
                }
                data.seekg(half_chunk_byte_size * start + chunk_byte_size * s - half_chunk_byte_size);
                data.write((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
                --s;
                ++start;
                for (int64_t p = 0; p < s; ++p) {
                        data.seekp(half_chunk_byte_size * start + chunk_byte_size * p);
                        data.read((char *) chunk->data(), chunk_byte_size);
                        in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size);
                        data.seekg(half_chunk_byte_size * start + chunk_byte_size * p);
                        data.write((char *) chunk->data(), chunk_byte_size);
                }
        }

        delete chunk;
}



Осталось реализовать функции flat_quick_sort и in_place_merge. Идею (и большую часть реализации) flat quick sort я взял в статье хабраюзера ripatti. Я только заменил median of medians (посчитал это оверкиллом в среднем случае) на median-of-nine и добавил сортировку вставками для маленьких частей массива.

flat_quicksort.h
#ifndef EXTERNAL_SORT_FLAT_QUICKSORT_H
#define EXTERNAL_SORT_FLAT_QUICKSORT_H

template<class ForwIt>
void insertion_sort(ForwIt be, ForwIt en) {
        for (ForwIt ii = be + 1; ii != en; ii++) {
                ForwIt jj = ii - 1;
                auto val = *ii;
                while (jj >= be and *jj > val) {
                        *(jj + 1) = *jj;
                        --jj;
                }
                *(jj + 1) = val;
        }
}

template<class ForwIt>
ForwIt median_of_3(ForwIt it1, ForwIt it2, ForwIt it3) {
        return (*it1 > *it2) ?
               (*it3 > *it2) ? (*it1 > *it3) ? it3 : it1 : it2 :
               (*it3 > *it1) ? (*it2 > *it3) ? it3 : it2 : it1;
}

template<class ForwIt>
ForwIt choose_pivot(ForwIt be, ForwIt en) {
        ptrdiff_t s = (en - be) / 8;
        ForwIt mid = be + (en - be) / 2;
        ForwIt best1 = median_of_3(be, be + s, be + 2 * s), best2 = median_of_3(mid - s, mid, mid + s), best3 = median_of_3(
                en - 2 * s, en - s, en);
        return median_of_3(best1, best2, best3);
}

// search for the end of the current block
template<class ForwIt>
ForwIt block_range(ForwIt be, ForwIt en) {
        ForwIt it = be;
        while (it != en) {
                if (*be < *it)
                        return it;
                ++it;
        }
        return it;
}

// warning: after the partition outer pivot may point to random element
template<class ForwIt>
std::pair<ForwIt, ForwIt> partition_range(ForwIt be, ForwIt en, ForwIt pivot) {
        std::pair<ForwIt, ForwIt> re;
        ForwIt j = be;
        for (ForwIt i = be; i != en; ++i)
                if (*i < *pivot) {
                        if (pivot == i) pivot = j; else if (pivot == j) pivot = i;
                        std::swap(*j, *i);
                        ++j;
                }
        re.first = j;
        for (ForwIt i = j; i != en; ++i)
                if (*pivot == *i) {
                        if (pivot == i) pivot = j; else if (pivot == j) pivot = i;
                        std::swap(*j, *i);
                        ++j;
                }
        re.second = j;
        return re;
}
// makes largest element the first
template<class ForwIt>
void blockify(ForwIt be, ForwIt en) {
        for (ForwIt it = be; it != en; ++it)
                if (*be < *it)
                        std::swap(*be, *it);
}

template<class ForwIt>
void flat_quick_sort(ForwIt be, ForwIt en) {
        ForwIt tmp = en; // the current end of block
        while (be != en) {
                if (std::is_sorted(be, tmp)) {
                        be = tmp;
                        tmp = block_range(be, en);
                        continue;
                }
                if (tmp - be < 32)
                        insertion_sort(be, tmp);
                else {
                        ForwIt pivot = choose_pivot(be, tmp);
                        std::pair<ForwIt, ForwIt> range = partition_range(be, tmp, pivot);
                        blockify(range.second, tmp);
                        tmp = range.first;
                }
        }
}

#endif //EXTERNAL_SORT_FLAT_QUICKSORT_H



Со слиянием было сложнее. Сначала я использовал заглушку, использующую O(n) памяти:

template<typename T>
void merge(std::vector<T> &chunk, size_t s, size_t q, size_t r) {
        std::vector<T> *chunk2 = new std::vector<T>(q - s + 1);
        auto it2 = chunk2->begin(), it1 = chunk.begin() + q + 1, it = chunk.begin() + s;
        std::copy(it, it1, it2);
        while (it2 != chunk2->end() && it1 != chunk.begin() + r + 1) {
                if (*it1 > *it2) {
                        *it = *it2;
                        ++it2;
                } else {
                        *it = *it1;
                        ++it1;
                }
                ++it;
        }
        if (it1 == chunk.begin() + r + 1)
                std::copy(it2, chunk2->end(), it);
        else
                std::copy(it1, chunk.begin() + r + 1, it);
        delete chunk2;
}


Когда я захотел заменить заглушку in-place версией, оказалось, что быстрые алгоритмы in-place слияния в большинстве своем достаточно запутанные (посмотрите, например, On optimal and efficient in place merging). Мне надо было что-то попроще, и я выбрал алгоритм из статьи A simple algorithm for in-place merging:

in-place merge
template<typename Iter>
void merge_B_and_Y(Iter z, Iter y, Iter yn) {
        for (; z < y && y < yn; ++z) {
                Iter j = std::min_element(z, y);
                if (*j <= *y)
                        std::swap(*z, *j);
                else {
                        std::swap(*z, *y);
                        ++y;
                }
        }
        if (z < y)
                flat_quick_sort(z, yn);
}

template<typename Iter>
Iter find_next_X_block(Iter x0, Iter z, Iter y, size_t k, size_t f, Iter b1,
                       Iter b2, auto max) {
        auto min1 = max, min2 = max;
        Iter m = x0 + (ptrdiff_t) floor((z - x0 - f) / k) * k + f, x = x0;
        if (m <= z)
                m += k;

        for (auto i = m; i + k <= y; i += k) {
                if (i != b1 && i != b2) {
                        Iter j = (i < b1 && b1 < i + k) ? m - 1 : i + k - 1;
                        if (*i <= min1 && *j <= min2) {
                                x = i;
                                min1 = *i;
                                min2 = *j;
                        }
                }
        }
        return x;
}

template<typename Iter>
void in_place_merge(Iter x0, Iter y0, Iter yn, int64_t k, bool rec) {
        if (k == -1)
                k = (int64_t) sqrt(yn - x0);
        size_t f = (y0 - x0) % k;
        Iter x = (f == 0) ? y0 - 2 * k : y0 - k - f;
        auto t = *x, max = *std::max_element(x0, yn);
        *x = *x0;
        Iter z = x0, y = y0, b1 = x + 1, b2 = y0 - k;
        int i = 0;
        while (y - z > 2 * k) {
                ++i;
                if (*x <= *y || y >= yn) {
                        *z = *x;
                        *x = *b1;
                        ++x;
                        if ((x - x0) % k == f) if (z < x - k)
                                b2 = x - k;
                        x = find_next_X_block(x0, z, y, k, f, b1, b2, max);
                } else {
                        *z = *y;
                        *y = *b1;
                        ++y;
                        if ((y - y0) % k == 0)
                                b2 = y - k;
                }
                ++z;
                *b1 = *z;
                if (z == x)
                        x = b1;
                if (z == b2)
                        b2 = yn + 1;
                ++b1;
                if ((b1 - x0) % k == f)
                        b1 = (b2 == yn + 1) ? b1 - k : b2;
        }
        *z = t;
        if (rec)
                merge_B_and_Y(z, y, yn);
        else  {
                flat_quick_sort(z, y);
                in_place_merge(z,y,yn,(int64_t)sqrt(k),true);
        }
}



Но на моем компьютере замена merge на in-place merge замедляла алгоритм почти на порядок. Возможно я ошибся в реализации или просто выбрал медленный алгоритм в погоне за простотой. Времени разбираться, как всегда, не было, к тому же gprof почему-то падал. И тут меня осенило. Если мы выделям M байт динамической памяти, то не важно, как мы её используем, мы все равно получаем O(1). Тогда просто выделим 2/3 под данные, а треть — под буфер слияния. Замедление будет гораздо меньше. И правда:

Алгоритм Время (75MB int64 в 7,5MB памяти) Скорость (75MB int64 в 7,5MB памяти) Время (7,5MB int64 в 75KB памяти) Скорость (7,5MB int64 в 75KB памяти) Время (750MB int64 в 75MB памяти) Скорость (750MB int64 в 75MB памяти)
In-place merge 6.04329 s 1 241 045 B/s 24.2993 s 3 086 508 B/s - -
Merge 0.932663 s 8 041 489 B/s 2.73895 s 27 382 756 B/s 47.7946 s 15 691 689 B/s
Алгоритм SLY_G 1.79629 s 4175272 B/s 3.84775 s 19 491 910 B/s 39.77 s 18 858 436 B/s

К сожалению, на больших объемах алгоритм замедляется, что ожидаемо, ведь мы не используем никакого буфера вообще. Тем не менее, скорость алгоритма достаточно адекватная, и, я уверен, может быть улучшена.

Все исходники лежат здесь.

© Habrahabr.ru