Merger в YTsaurus: безболезненное объединение данных в статических таблицах
Статические таблицы нужны, чтобы хранить в них неизменяемые данные. Это означает, что данные, записанные в статическую таблицу, нельзя модифицировать в произвольном месте — их можно либо перезаписать полностью, либо дополнить, сделав запись в конец таблицы.
Основная боль при работе с такой таблицей возникает при росте количества хранимых в ней данных. Время чтения или записи возрастает пропорционально её объёму. Из‑за этого может настать момент, когда взаимодействовать с таблицей становится просто невозможно, и пользователям приходится придумывать ухищрения, чтобы справиться с этим.
В этой статье мы разберём механизм мёрджа чанков с помощью мастер‑серверов, который мы реализовали для статических таблиц YTsaurus — нашей платформы распределённого хранения и обработки больших данных с открытым исходным кодом.
Итак, в статических таблицах удобно хранить большие объёмы данных. Это выбор тех, кому не нужна запись по ключу и низкая латентность, а мап‑редьюсы запускать хочется. В таких таблицах можно полностью перезаписывать данные или дописывать их в конец. Об этих модификациях мы поговорим чуть позже, а пока рассмотрим внутреннее представление таблиц.
Чанки
Данные в таблицах хранятся в виде небольших кусочков — чанков. Чанки иммутабельны: после формирования их содержимое нельзя изменить. Это очень удобно для репликации и чтения, но не очень — при дозаписи в конец таблицы, потому что каждая такая дозапись порождает новый чанк. Типичная статическая таблица выглядит так:
На самом деле структура таблиц чуть сложнее: чанки хранятся не плоским списком, а в виде дерева. Листья этого дерева — чанки, промежуточные узлы — чанклисты (списки чанков), а корень — корневой чанклист. В простейшем случае дерево имеет вид:
При дозаписи данных в таблицу с такой структурой создаётся новый корневой чанклист, к которому подвешивают бывший корневой чанклист этой таблицы и новый чанклист с новым чанком с новыми данными.
При копировании таблицы с такой структурой не придётся копировать полный список чанков: достаточно создать в новой таблице новый корневой чанклист и подцепить к нему поддерево с чанками.
Заметим, что эта схема не мешает дозаписывать данные в таблицу, что не может не радовать!
Но при дозаписи данных могут появляться мелкие чанки. Они создают проблемы как для кластера (каждый чанк нужно создать и удалить, отслеживать, реплицировать, помнить метаданные), так и для самого пользователя (вместе с количеством чанков линейно растёт количество round‑trip, из‑за чего значительно падает скорость чтения или записи статических таблиц, состоящих из мелких чанков).
Чтобы избежать этих проблем, мы стараемся мёрджить (укрупнять) чанки, и сейчас расскажу как.
Мёрдж чанков с помощью мастер-серверов
До появления механизма укрупнения чанков с помощью мастер‑серверов мы боролись с мелкими чанками двумя способами:
Запускали специальную операцию (наподобие map), которая проходила по всем чанкам и объединяла смежные мелкие чанки.
Включали автоматическое слияние чанков на выходе операции (если мелкие чанки генерировались джобами).
Мёрдж на стороне мастера удобен своей прозрачностью: таблицу не приходится блокировать, она не пересоздается и не меняет атрибуты. Вся работа происходит слоем ниже.
Чтобы включить автоматический мёрдж, достаточно выставить у таблицы атрибут @chunk_merger_mode
с желаемым режимом:
yt set //home/dir_with_tables/table_i_want_to_merge/@chunk_merger_mode auto
По умолчанию лучше использовать режим auto
. Подробнее о режимах мёрджа расскажу чуть ниже.
Чтобы смёрджить все таблицы в определённом поддереве, можно выставить атрибут на всю директорию:
yt set //home/dir_with_tables/@chunk_merger_mode auto
chunk_merger_mode
— наследуемый атрибут. После установки его значение будут наследовать все новые таблицы в директории. Для старых таблиц ничего не поменяется — в момент включения придётся пройтись по всем таблицам.
Как работает мёрджер
Таблицы попадают в очередь на мёрдж в двух случаях:
При выставлении атрибута
chunk_merger_mode
на таблицу.При записи в таблицу, для которой такой атрибут уже выставлен.
Если одно из этих условий сработало — начинается объединение на стороне мастера.
Этап первый. После того как мы проверим, что таблицу действительно можно мёрджить (например, она не динамическая), персистентно запомним её в специальной очереди на мёрдж. Так мы не забудем перемёрджить таблицу в случае рестарта мастер‑серверов.
Первая очередь — поаккаунтная. Здесь таблицы могут задержаться подольше, если для какого‑то аккаунта уже запущено слишком много мёрдж‑джобов.
Этап второй. На этом этапе нужно определить, какие последовательности чанков таблицы мы хотим объединить. Для этого запускаем специальный traverser, который обходит дерево чанков и ищет последовательности, удовлетворяющие нашим критериям (мёрджер следит, чтобы получившийся чанк был не слишком крупным в разных смыслах).
Отметим, что мёрджер умеет объединять только чанки из одного чанклиста. Это позволяет начинать замену чанков в тот момент, когда добежали все джобы конкретного чанклиста, а не всей таблицы.
Этап третий. После нарезки на такие последовательности чанков нужно создать новые чанки как объекты на мастере. Чтобы объяснить, почему это действие достойно отдельного пункта, сделаю небольшое отступление.
Мастер‑серверы — это набор машин с развёрнутым алгоритмом консенсуса (Raft‑like) между ними. Другими словами, всю активность мастер‑серверов можно поделить на персистентную и транзиентную.
Персистентное состояние мы надёжно сохраняем на диск, и оно одинаково на всех участниках консенсуса. Все мутации (изменения) персистентного состояния фиксируются в ченджлогах.
Транзиентное состояние само по себе не оставляет персистентных следов, не фиксируется надёжно в наших снапшотах и ченджлогах и зачастую отличается на разных пирах. Случается, что транзиентная активность выполняется только на лидере, а по её завершении лидер заказывает мутацию, которая уже надёжно фиксирует результат.
Бо́льшая часть активности мёрджера транзиентна и выполняется только на лидере: например, traverse и нарезка на последовательность чанков для джобов. Однако часть вещей мы хотим зафиксировать персистентно: например, попадание таблицы в очередь мёрджа. Факт создания чанка мы, конечно, тоже хотим запомнить персистентно, поэтому после нарезки на последовательности чанков заказываем мутацию на создание новых чанков и ждём её выполнения.
Этап четвертый. После того как чанки созданы, нужно запланировать и запустить сами мёрдж-джобы. Мастер-джобы выполняются на data-нодах — тех же машинах, где хранятся сами чанки, поэтому на данном этапе мы ждём, когда нода придёт к мастеру с периодическим хартбитом. Когда она это сделает, мы сообщим ей о необходимости выполнить новый джоб, а дальше будем смиренно ждать, когда она рапортует нам о результате.
Но этот этап не последний.
Режимы мёрджа
Пришло время выполнить обещание и рассказать о мёдрже чанков на самой ноде и его режимах.
У автоматического мёрджа есть несколько режимов:
shallow
. Объединение чанков на уровне метаданных без распаковки. Такой подход работает, только если чанки достаточно похожи (в плане сортированности, колонок, compression‑кодека и других параметров).deep
. Объединение чанков с полным пережатием (путём считывания всех входных чанков и записи в новый). Этот режим значительно дороже, чем предыдущий, но позволяет объединять чанки с разными характеристиками, которые оказались в одной таблице. Режим помогает избавиться от мелких блоков или чанков в старых форматах.auto
. Объединение чанков в shallow‑режиме с откатом вdeep
в случае неудачи. Как видно из названия, режим устанавливается по умолчанию.
Этап пятый. После того как завершатся все джобы для конкретного чанклиста, мастер может приступить к реальной замене чанков. Здесь мы в первую очередь проверяем, что и чанклист, и заменяемые чанки всё ещё существуют в таблице. Если это не так, то, увы, мёрдж отработает впустую, а таблицу придется смёрджить ещё раз. Неприятно, но такую цену мы платим за отсутствие блокировок и незаметность мёрджа для пользователя.
Если всё в порядке и чанки на месте, заменяем их и перестраиваем дерево:
Поскольку чанклисты тоже иммутабельны и могут быть общими для разных таблиц, а мы не хотим мёрджить те таблицы, о которых нас не просили, то при замене пути от корневого чанклиста до заменяемых чанков будут созданы новые чанклисты. К ним будут корректно подвешены существующие чанклисты и чанки. В случае нетривиального дерева чанков это может выглядеть так:
Если мы заменим в исходной таблице чанки С2, С3, С4 на NewC1, то новое дерево чанков будет выглядеть так:
После того как джобы добегут, а замены для всех чанклистов таблицы выполнятся, мёрдж таблицы завершится.
Иногда после завершения мёрджа таблицу нужно снова прогнать через весь пайплайн. На это может быть несколько причин:
Пока мы мёрджили таблицу, кто‑то дописал данные в её конец. Старые данные никуда не делись, с заменой мы справились, но могли появиться новые чанки, которые следует объединить.
Часть джобов завершилась
retriable
‑ошибкой.Чанки таблицы настолько мелкие, что даже после замены их нужно объединить.
Итак, мы рассмотрели механизм автоматического укрупнения чанков статической таблицы, который помогает пользователям YTsaurus решить проблему длительного доступа к данным с увеличением объёма таблицы.
Попробовать мёрджер можно, воспользовавшись опенсорс‑версией YTsaurus. А если у вас возникнут вопросы, то смело задавайте их в нашем community‑чате.