[Из песочницы] Внешняя сортировка с O(1) дополнительной памяти
Прочитав эту статью, я вспомнил, как писал внешнюю сортировку, которая использовала O(1) внешней памяти. Функция получала бинарый файл и максимальный размер памяти, которую она могла выделить под массив:
void ext_sort(const std::string filename, const size_t memory)
Я использовал алгоритм из Effective Performance of External Sorting with No Additional Disk Space:
- Разделим файл на блоки, которые помещаются в доступную память. Обозначим эти блоки Block_1, Block_2, …, Block_(S-1), Block_S. Установим P = 1.
- Читаем Block_P в память.
- Отсортируем данные в памяти и запишем назад в Block_P. Установим P = P + 1, и если P ≤ S, то читаем Block_P в память и повторяем этот шаг. Другими словами, отсортируем каждый блок файла.
- Разделим каждый блок на меньшие блоки B_1 и B_2. Каждый из таких блоков занимает половину доступной памяти.
- Читаем блок B_1 блока Block_1 в первую половину доступной памяти. Установим Q = 2.
- Читаем блок B_1 блока Block_Q во вторую половину доступной памяти.
- Объеденим массивы в памяти с помощью in-place слияния, запишем вторую половину памяти в блок B_1 блока Block_Q и установим Q = Q + 1, если Q ≤ S, читаем блок B_1 блока Block_Q во вторую половину доступной памяти и повторяем этот шаг.
- Записываем первую половину доступной памяти в блок B_1 блока Block_1. Так как мы всегда оставляли в памяти меньшую половину элементов и провели слияние со всеми блоками, то в этой части памяти хранятся M минимальных элементы всего файла.
- Читаем блок B_2 блока Block_S во вторую половину доступной памяти. Установим Q = S −1.
- Читаем блок B_2 блока Block_Q в первую половину доступной памяти.
- Объеденим массивы в памяти с помощью in-place слияния, запишем первую половину доступной памяти в блок B_2 блока Block_Q и установим Q = Q −1. Если Q ≥ 1 читаем блок B_2 блока Block_Q в первую половину доступной памяти и повторяем этот шаг.
- Записываем вторую половину доступной памяти в блок B_2 блока Block_S. Аналогично шагу 8, тут хранятся максимальные элементы всего файла.
- Начиная от блока B_2 блока Block_1 и до блока B_1 блока Block_S, определим новые блоки в файле и снова пронумеруем их Block_1 to Block_S. Разделим каждый блок на блоки B_1 и B_2. Установим P = 1.
- Читаем B_1 и B_2 блока Block_P в память. Объеденим массивы в памяти. запишем отсортированный массив назад в Block_P и установим P = P +1. Если P ≤ S, повторяем этот шаг.
- Если S > 1, возвращаемся к шагу 5. Каждый раз мы выделяем M минимальных и максимальных элементов, записываем их в начало и конец файла соответственно, а потом делаем то же самое с оставшимися элементами, пока не дойдем до середины файла.
Преимущество такого алгоритма, кроме отсутствия буфера на диске, это то, что с диска мы читаем данные относительно большими порциями, что ускоряет алгоритм.
Реализуем алгоритм на C++.
Для начала определим количество блоков и размер блока в байтах и в элементах и выделим память:
const size_t type_size = sizeof(T);
const uint64_t filesize = file_size(filename);
std::fstream data(filename, std::ios::in | std::ios::out | std::ios::binary);
const uint64_t chunk_number = filesize / memory;
const size_t chunk_size = memory / type_size - (memory / type_size) % 2, chunk_byte_size =
chunk_size * type_size, half_chunk_byte_size = chunk_byte_size / 2, half_chunk_size = chunk_size / 2;
std::vector<T> *chunk = new std::vector<T>(chunk_size);
Теперь пункты 2-3 — сортируем каждый блок:
for (uint64_t i = 0; i < chunk_number; ++i) {
data.seekg(chunk_byte_size * i);
data.read((char *) chunk->data(), chunk_byte_size);
flat_quick_sort(chunk->begin(), chunk->end());
data.seekp(chunk_byte_size * i);
data.write((char *) chunk->data(), chunk_byte_size);
}
Саму сортировку мы напишем чуть позднее.
Приступим к слияниям. Нижняя половина:
int64_t s = chunk_number, start = 0;
while (s > 0) {
data.seekp(half_chunk_byte_size * start);
data.read((char *) chunk->data(), half_chunk_byte_size);
for (int64_t q = 1; q < s; ++q) {
data.seekg(half_chunk_byte_size * start + chunk_byte_size * q);
data.read((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size);
data.seekp(half_chunk_byte_size * start + chunk_byte_size * q);
data.write((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
}
data.seekp(half_chunk_byte_size * start);
data.write((char *) chunk->data(), half_chunk_byte_size);
И аналогично верхняя:
data.seekp(half_chunk_byte_size * start + chunk_byte_size * s - half_chunk_byte_size);
data.read((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
for (int64_t q = s - 2; q >= 0; --q) {
data.seekg(half_chunk_byte_size * (start + 1) + chunk_byte_size * q);
data.read((char *) chunk->data(), half_chunk_byte_size);
in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size);
data.seekp(half_chunk_byte_size * (start + 1) + chunk_byte_size * q);
data.write((char *) chunk->data(), half_chunk_byte_size);
}
data.seekg(half_chunk_byte_size * start + chunk_byte_size * s - half_chunk_byte_size);
data.write((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
Перераспределяем блоки, завершаем цикл и не забываем освободить память:
--s;
++start;
for (int64_t p = 0; p < s; ++p) {
data.seekp(half_chunk_byte_size * start + chunk_byte_size * p);
data.read((char *) chunk->data(), chunk_byte_size);
in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size);
data.seekg(half_chunk_byte_size * start + chunk_byte_size * p);
data.write((char *) chunk->data(), chunk_byte_size);
}
}
delete chunk;
}
template<typename T>
void ext_sort(const std::string filename, const size_t memory) {
const size_t type_size = sizeof(T);
const uint64_t filesize = file_size(filename);
std::fstream data(filename, std::ios::in | std::ios::out | std::ios::binary);
const uint64_t chunk_number = filesize / memory;
const size_t chunk_size = memory / type_size - (memory / type_size) % 2, chunk_byte_size =
chunk_size * type_size, half_chunk_byte_size = chunk_byte_size / 2, half_chunk_size = chunk_size / 2;
std::vector<T> *chunk = new std::vector<T>(chunk_size);
for (uint64_t i = 0; i < chunk_number; ++i) {
data.seekg(chunk_byte_size * i);
data.read((char *) chunk->data(), chunk_byte_size);
flat_quick_sort(chunk->begin(), chunk->end());
data.seekp(chunk_byte_size * i);
data.write((char *) chunk->data(), chunk_byte_size);
}
int64_t s = chunk_number, start = 0;
while (s > 0) {
data.seekp(half_chunk_byte_size * start);
data.read((char *) chunk->data(), half_chunk_byte_size);
for (int64_t q = 1; q < s; ++q) {
data.seekg(half_chunk_byte_size * start + chunk_byte_size * q);
data.read((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size);
data.seekp(half_chunk_byte_size * start + chunk_byte_size * q);
data.write((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
}
data.seekp(half_chunk_byte_size * start);
data.write((char *) chunk->data(), half_chunk_byte_size);
data.seekp(half_chunk_byte_size * start + chunk_byte_size * s - half_chunk_byte_size);
data.read((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
for (int64_t q = s - 2; q >= 0; --q) {
data.seekg(half_chunk_byte_size * (start + 1) + chunk_byte_size * q);
data.read((char *) chunk->data(), half_chunk_byte_size);
in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size);
data.seekp(half_chunk_byte_size * (start + 1) + chunk_byte_size * q);
data.write((char *) chunk->data(), half_chunk_byte_size);
}
data.seekg(half_chunk_byte_size * start + chunk_byte_size * s - half_chunk_byte_size);
data.write((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
--s;
++start;
for (int64_t p = 0; p < s; ++p) {
data.seekp(half_chunk_byte_size * start + chunk_byte_size * p);
data.read((char *) chunk->data(), chunk_byte_size);
in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size);
data.seekg(half_chunk_byte_size * start + chunk_byte_size * p);
data.write((char *) chunk->data(), chunk_byte_size);
}
}
delete chunk;
}
Осталось реализовать функции flat_quick_sort и in_place_merge. Идею (и большую часть реализации) flat quick sort я взял в статье хабраюзера ripatti. Я только заменил median of medians (посчитал это оверкиллом в среднем случае) на median-of-nine и добавил сортировку вставками для маленьких частей массива.
#ifndef EXTERNAL_SORT_FLAT_QUICKSORT_H
#define EXTERNAL_SORT_FLAT_QUICKSORT_H
template<class ForwIt>
void insertion_sort(ForwIt be, ForwIt en) {
for (ForwIt ii = be + 1; ii != en; ii++) {
ForwIt jj = ii - 1;
auto val = *ii;
while (jj >= be and *jj > val) {
*(jj + 1) = *jj;
--jj;
}
*(jj + 1) = val;
}
}
template<class ForwIt>
ForwIt median_of_3(ForwIt it1, ForwIt it2, ForwIt it3) {
return (*it1 > *it2) ?
(*it3 > *it2) ? (*it1 > *it3) ? it3 : it1 : it2 :
(*it3 > *it1) ? (*it2 > *it3) ? it3 : it2 : it1;
}
template<class ForwIt>
ForwIt choose_pivot(ForwIt be, ForwIt en) {
ptrdiff_t s = (en - be) / 8;
ForwIt mid = be + (en - be) / 2;
ForwIt best1 = median_of_3(be, be + s, be + 2 * s), best2 = median_of_3(mid - s, mid, mid + s), best3 = median_of_3(
en - 2 * s, en - s, en);
return median_of_3(best1, best2, best3);
}
// search for the end of the current block
template<class ForwIt>
ForwIt block_range(ForwIt be, ForwIt en) {
ForwIt it = be;
while (it != en) {
if (*be < *it)
return it;
++it;
}
return it;
}
// warning: after the partition outer pivot may point to random element
template<class ForwIt>
std::pair<ForwIt, ForwIt> partition_range(ForwIt be, ForwIt en, ForwIt pivot) {
std::pair<ForwIt, ForwIt> re;
ForwIt j = be;
for (ForwIt i = be; i != en; ++i)
if (*i < *pivot) {
if (pivot == i) pivot = j; else if (pivot == j) pivot = i;
std::swap(*j, *i);
++j;
}
re.first = j;
for (ForwIt i = j; i != en; ++i)
if (*pivot == *i) {
if (pivot == i) pivot = j; else if (pivot == j) pivot = i;
std::swap(*j, *i);
++j;
}
re.second = j;
return re;
}
// makes largest element the first
template<class ForwIt>
void blockify(ForwIt be, ForwIt en) {
for (ForwIt it = be; it != en; ++it)
if (*be < *it)
std::swap(*be, *it);
}
template<class ForwIt>
void flat_quick_sort(ForwIt be, ForwIt en) {
ForwIt tmp = en; // the current end of block
while (be != en) {
if (std::is_sorted(be, tmp)) {
be = tmp;
tmp = block_range(be, en);
continue;
}
if (tmp - be < 32)
insertion_sort(be, tmp);
else {
ForwIt pivot = choose_pivot(be, tmp);
std::pair<ForwIt, ForwIt> range = partition_range(be, tmp, pivot);
blockify(range.second, tmp);
tmp = range.first;
}
}
}
#endif //EXTERNAL_SORT_FLAT_QUICKSORT_H
Со слиянием было сложнее. Сначала я использовал заглушку, использующую O(n) памяти:
template<typename T>
void merge(std::vector<T> &chunk, size_t s, size_t q, size_t r) {
std::vector<T> *chunk2 = new std::vector<T>(q - s + 1);
auto it2 = chunk2->begin(), it1 = chunk.begin() + q + 1, it = chunk.begin() + s;
std::copy(it, it1, it2);
while (it2 != chunk2->end() && it1 != chunk.begin() + r + 1) {
if (*it1 > *it2) {
*it = *it2;
++it2;
} else {
*it = *it1;
++it1;
}
++it;
}
if (it1 == chunk.begin() + r + 1)
std::copy(it2, chunk2->end(), it);
else
std::copy(it1, chunk.begin() + r + 1, it);
delete chunk2;
}
Когда я захотел заменить заглушку in-place версией, оказалось, что быстрые алгоритмы in-place слияния в большинстве своем достаточно запутанные (посмотрите, например, On optimal and efficient in place merging). Мне надо было что-то попроще, и я выбрал алгоритм из статьи A simple algorithm for in-place merging:
template<typename Iter>
void merge_B_and_Y(Iter z, Iter y, Iter yn) {
for (; z < y && y < yn; ++z) {
Iter j = std::min_element(z, y);
if (*j <= *y)
std::swap(*z, *j);
else {
std::swap(*z, *y);
++y;
}
}
if (z < y)
flat_quick_sort(z, yn);
}
template<typename Iter>
Iter find_next_X_block(Iter x0, Iter z, Iter y, size_t k, size_t f, Iter b1,
Iter b2, auto max) {
auto min1 = max, min2 = max;
Iter m = x0 + (ptrdiff_t) floor((z - x0 - f) / k) * k + f, x = x0;
if (m <= z)
m += k;
for (auto i = m; i + k <= y; i += k) {
if (i != b1 && i != b2) {
Iter j = (i < b1 && b1 < i + k) ? m - 1 : i + k - 1;
if (*i <= min1 && *j <= min2) {
x = i;
min1 = *i;
min2 = *j;
}
}
}
return x;
}
template<typename Iter>
void in_place_merge(Iter x0, Iter y0, Iter yn, int64_t k, bool rec) {
if (k == -1)
k = (int64_t) sqrt(yn - x0);
size_t f = (y0 - x0) % k;
Iter x = (f == 0) ? y0 - 2 * k : y0 - k - f;
auto t = *x, max = *std::max_element(x0, yn);
*x = *x0;
Iter z = x0, y = y0, b1 = x + 1, b2 = y0 - k;
int i = 0;
while (y - z > 2 * k) {
++i;
if (*x <= *y || y >= yn) {
*z = *x;
*x = *b1;
++x;
if ((x - x0) % k == f) if (z < x - k)
b2 = x - k;
x = find_next_X_block(x0, z, y, k, f, b1, b2, max);
} else {
*z = *y;
*y = *b1;
++y;
if ((y - y0) % k == 0)
b2 = y - k;
}
++z;
*b1 = *z;
if (z == x)
x = b1;
if (z == b2)
b2 = yn + 1;
++b1;
if ((b1 - x0) % k == f)
b1 = (b2 == yn + 1) ? b1 - k : b2;
}
*z = t;
if (rec)
merge_B_and_Y(z, y, yn);
else {
flat_quick_sort(z, y);
in_place_merge(z,y,yn,(int64_t)sqrt(k),true);
}
}
Но на моем компьютере замена merge на in-place merge замедляла алгоритм почти на порядок. Возможно я ошибся в реализации или просто выбрал медленный алгоритм в погоне за простотой. Времени разбираться, как всегда, не было, к тому же gprof почему-то падал. И тут меня осенило. Если мы выделям M байт динамической памяти, то не важно, как мы её используем, мы все равно получаем O(1). Тогда просто выделим 2/3 под данные, а треть — под буфер слияния. Замедление будет гораздо меньше. И правда:
Алгоритм | Время (75MB int64 в 7,5MB памяти) | Скорость (75MB int64 в 7,5MB памяти) | Время (7,5MB int64 в 75KB памяти) | Скорость (7,5MB int64 в 75KB памяти) | Время (750MB int64 в 75MB памяти) | Скорость (750MB int64 в 75MB памяти) |
---|---|---|---|---|---|---|
In-place merge | 6.04329 s | 1 241 045 B/s | 24.2993 s | 3 086 508 B/s | - | - |
Merge | 0.932663 s | 8 041 489 B/s | 2.73895 s | 27 382 756 B/s | 47.7946 s | 15 691 689 B/s |
Алгоритм SLY_G | 1.79629 s | 4175272 B/s | 3.84775 s | 19 491 910 B/s | 39.77 s | 18 858 436 B/s |
К сожалению, на больших объемах алгоритм замедляется, что ожидаемо, ведь мы не используем никакого буфера вообще. Тем не менее, скорость алгоритма достаточно адекватная, и, я уверен, может быть улучшена.
Все исходники лежат здесь.