Эффективное обновление состояний в БД из сервисов потоковой обработки событий

Как хранить сложные сущности в БД? Что нужно сделать, чтобы не перезаписывать весь рекламный баннер для обновления одного лишь заголовка? Рассмотрим как минимум 5 независимых и взаимно совместимых идей для многократного снижения нагрузки на чтение и запись подобных объектов.

Меня зовут Юрий Печатнов, я уже 6 лет работаю в Яндексе и занимаюсь сервисами потоковой обработки. Это большие распределенные системы, читающие сообщения из большой распределенной очереди и создающие полезный выход. Полезным выходом могут быть сообщения в другой распределенной очереди и/или обновление состояний в БД. Поговорим о том, как их эффективно обновлять.

3bdfd6668c477c4c67eb67c3d460bdb4.jpeg

Типичный сервис потоковой обработки событий

Типичный сервис потоковой обработки читает события из упорядоченной очереди и поддерживает актуальные состояния в БД. В нашем случае это состояния рекламных сущностей, например, стейтов баннера. Чтобы примерно понимать масштабы: у нас 100 ТБ стейтов — это 100 миллиардов штук, в секунду происходит 10 миллионов обновлений, и средний размер часто меняющегося состояния — 10 kb. Если бы мы наивно реализовывали бы работу со стейтами в БД, то читали и писали в БД по 100 ГБ/сек. Вместо этого у нас получилось добиться — 3 ГБ/сек. для записи и 8 ГБ/сек. для чтения.

Стейт

Сначала мы рассмотрим упрощённое представление стейта баннера. Это важно, так как мы делали оптимизации, исходя из структуры нашего состояния, и саму структуру мы развиваем так, чтобы лучше работали уже сделанные оптимизации.

Итак, упрощенное json-представление стейта баннера:

{

«BannerID»: «table_top», // Key

«Resources»: {

«Title»: «Столешницы из натурального камня»,

«Href»: «https://stoleshnitsyiskamnya.ru/…»,

/* … */

},

«Flags»: {/* … */},

«Counters»: {/* … > 10 kb … */},

«Embeddings»: {/* … > 10 kb … */},

/* … */

}

Есть ключ (BannerID) и несколько верхнеуровневых полей:

  • Поле Resources — то, что задаёт рекламодатель (заголовок баннера, ссылка на сайт и прочее);

  • Поле Flags — информация о том, включен ли баннер, заархивирован ли он и т. д.;

  • Поле Counters. Это различные счетчики, их довольно много, среди них, например мог бы быть счётчик числа кликов по баннеру, сделанных пользователями с 12 до 13 часов дня.

  • Поле Embeddings. В нём хранятся эмбеддинги, рассчитанные по баннеру различными нейросетевыми моделями.

Поля достаточно разнородные, поэтому можно предположить, что обновляются они не совместно (событие о том, что по баннеру был сделан клик, не будет приводить к изменению заголовка баннера).
В реальности мы храним стейты баннеров не в виде JSON-строк, а в протобуфах, так как это более компактный формат, у которого всё хорошо с обратной совместимостью.

Размеры

Стейт

Стейт

На этом флеймграфе показаны размеры полей состояния баннера в байтах. Вертикальная ось соответствует глубине вложенности полей, а ширина полос соответствует размеру полей. Нижняя полоса, занимающая всю ширину, соответствует размеру всего стейта. Полосы уровнем выше — это размеры верхнеуровневых полей (Resources, Counters, Embeddings, …), полосы еще уровнем выше — размеры полей, вложенных в верхнеуровневые. И так далее.

Получается, что по размерам полей структура довольно разнородная, но при этом без явных перекосов, то есть одно поле не занимает весь стейт.

Базовое решение

Самый простой способ работать с состояниями из сервиса потоковой обработки — это сделать в БД таблицу с двумя колонками: ключ и значение. В ключе хранить наш BannerID, в значении — сериализованное состояние протобуфа.

6b5e811cd0ef8fa8a337fe718bdd9bbf.jpg

Для обновления состояния, сначала нужно скачать его из БД, затем распаковать протобуф, обновить его в соответствии с продуктовой логикой и записать обратно в БД. То есть на любое изменение состояния мы пишем в БД столько байт, сколько будет в новом сериализованном представлении протобуфа.

Сжатие

Первый и очевидный шаг для оптимизации работы с БД — использовать сжатие. Мы используем сжатие на стороне сервиса потоковой обработки, поскольку так мы контролируем кодек сжатия и экономим поток чтения из БД, поток записи в БД и место в БД.

Добавим сжатие на нашем бэкенде

Добавим сжатие на нашем бэкенде

Так у нас получилось сэкономить 15% потока чтения и записи, а также места в БД на наших данных.

Сжатие со словарем

Второй и менее очевидный шаг — сделать сжатие мощнее за счет использования словаря. Кодек сжатия zstd позволяет обучаться на выборке данных, которые будут сжиматься, и формировать словарь. Он содержит часто встречающиеся фрагменты данных, то есть хранит в себе значительную часть состояний (это не совсем правда, но помогает понять суть). Это позволяет сжимать ещё лучше. У нас получилось сэкономить ещё 40% потока чтения и записи, а также места в БД.

Так как структура стейтов меняется с течением времени, то обученный словарь постепенно теряет свой эффект и данные начинают сжиматься хуже. Поэтому периодически надо обучать новый кодек сжатия и мигрировать на новый словарь.

Хранение состояния в нескольких колонках

Мы можем добавить колонки в БД и распределить состояния верхнеуровневых полей баннера по ним. То есть мы можем хранить поля Resources, Flags и Embeddings в разных колонках.

Хранение состояния в нескольких колонках

Хранение состояния в нескольких колонках

За счёт этого можно обновлять только часть колонок за раз. Поэтому если событие меняет один лишь счётчик в поле Counters, то мы можем обновить только колонку Counters. Так мы в несколько раз экономим поток на запись. А бонусом экономим поток на чтение, потому что этим стейтом баннера ещё пользуются другие компоненты рекламы, которым могут быть нужны разные наборы колонок.

Патчи: чтение

На данный момент на маленькое событие обновления одного счётчика мы перезаписали всю колонку счётчиков, которая может иметь размер порядка десяти килобайт. Хотелось бы отправлять в БД только само произведённое изменение.

На помощь приходят бинарные дельта-кодеки, в частности, xdelta. Он позволяет вычислить diff между любой парой бинарных строк, а затем наложить его на оригинальную строку и получить новое значение. Это как git diff и git apply, только не с текстовыми файлами в репозитории, а с любыми бинарными строками.

Патчи: можно обновлять меньше

Патчи: можно обновлять меньше

Для упрощения все примеры будут в JSON.

Чтобы эти бинарные кодеки применить в работе с БД можно для каждой основной колонки со значениями добавить парные колонки с патчами.

Патчи: новые колонки

Патчи: новые колонки

Теперь в основных колонках со значениями мы будем хранить не актуальные состояния верхнеуровневых полей, а их устаревшие (базовые) состояния. А в патчах — разницу между устаревшими состояниями и самыми актуальными. Таким образом, если мы захотим считать стейт баннера, то прочитаем всю строку из БД, произведём наложение патчей на базовые версии верхнеуровневых полей и получим их актуальные значения, которые в совокупности дают актуальный стейт баннера.

Патчи: запись

А запись будет вероятностная: при каждом обновлении стейта обновляются только колонки полей, которые реально изменились. Но теперь по каждой паре колонок (основной и патчевой) мы будем подкидывать монетку, и с вероятностью размер_патча/размер_базового_состояния — занулять патч и полностью обновлять базовую колонку. В противном случае мы не будем трогать базовую колонку, а только обновим патч. То есть в базовой колонке останется та же самая устаревшая версия, которая была до этого.

Патчи: когда сбрасывать

Патчи: когда сбрасывать

Давайте для одного конкретного обновления посчитаем матожидание количества байт, записанных в БД:

Патчи: немного математики

Патчи: немного математики

Получим, что на одно обновление в среднем пишется не больше двух размеров текущего патча.
Если у нас такой профиль нагрузки, что в стейте постоянно меняются одни и те же байты, то размер патча примерно равен количеству измененных байт. Получается, что в БД мы будем писать всего лишь в 2 раза больше байт, чем реально меняется в состояниях. Это идеальный случай.

Но, к сожалению, это не всегда так. Обычно в состояниях всё-таки меняются разные кусочки. Давайте оценим утрированный худший случай. Предположим, что в нашем стейте каждый раз меняются новые байты, и из-за этого размер патча (при условии, что мы не сбрасываем патч) должен каждый раз увеличиваться на ΔP (прирост патча).  Тогда в среднем при каждой записи мы будем писать примерно среднее геометрическое между размером базовой колонки и приростом патча.

Патчи: ещё немного математики

Патчи: ещё немного математики

Если в колонке 10000 байт и каждый раз мы меняем по байту, то в среднем каждый раз мы будем писать по 100 байт. Это сильно хуже идеала, но лучше, чем было до этого.
В реальности мы имеем что-то между идеальным и худшим случаем. Конкретно на наших данных механизм патчей сэкономил нам 50% потока на запись в БД.

Кэши в потоковых обработчиках: консистентность

Теперь поговорим об оптимизации потока на чтение из БД. Мы будем использовать идею, лежащую максимально на поверхности — кэши.

Кэши в потоковых обработчиках: консистентность

Кэши в потоковых обработчиках: консистентность

Но самый простой кэш не подойдет для сервиса потоковой обработки. Важно, чтобы кэш стейтов был консистентен с БД.

Допустим, мы обработчик из сервиса потоковой обработки и у нас вдруг образовался устаревший стейт в кэше. Мы прочитаем какие-то события, обновим стейт с их учётом и запишем в БД. Тогда, в лучшем случае, мы потеряем какие-то изменения, которые уже были записаны в БД, а в худшем — посчитаем новый патч относительно устаревшей базовой версии и запишем его в БД. Тогда там окажутся значения базовой и патчевой колонок, несовместимые друг с другом, что приведёт к порче данных. Это в лучшем случае остановит процессинг, в худшем мы будем жить с увеличивающимся количеством мусора в наших состояниях.

Чтобы бороться с этим наш сервис следит за консистентностью кэша стейтов. Это гарантирует следующие манипуляции:

→ Шардирование данных

Поясню по шагам. Сначала определим функцию, которая по ключу стейта возвращает число от 0 до количества шардов не включительно:

GetShard («table_top») = 1

Функция использует под собой хэш-функцию, никакой особой логики нет. Дальше все входящие события предварительно разложим по партициям промежуточной очереди в соответствии с номерами шардов ключей.

• Шардирование данных

• Номер шарда — функция от ключа стейта

• Входящие события предварительно раскладываем по партициям
в соответствии с номерами шардов ключей

Кэши в потоковых обработчиках: консистентность

Кэши в потоковых обработчиках: консистентность

Партицию можно упрощенно считать обычной очередью из курса алгоритмов. В её конец попадают свежезаписанные сообщения. Но важно, что у каждого сообщения есть номер (оффсет), который возрастает от более старых записанных событий к более свежим. Оффсет сообщения — его неизменяемый атрибут.

Теперь каждый шард (каждую промежуточную партицию) обрабатывает свой обработчик. У каждого обработчика свой локальный кэш стейтов (важно, что локальный). А стейты в целом обрабатывает только единственный сервис потоковой обработки.

→ Транзакционное обновление стейтов и прогресса обработки событий

Рассмотрим работу одного обработчика одного промежуточного шарда. Прогресс обработки событий хранится в таблице с оффсетами в той же БД, что и стейты.

В таблице с оффсетами по каждому шарду (по каждой промежуточной партиции) хранится номер сообщения, до которого включительно все сообщения уже обработаны, и эффект от обработки этих сообщений уже закоммичен в БД.

• Шардирование данных
• Транзакционное обновление стейтов и прогресса обработки событий
• Прогресс обработки событий храним в той же БД, что и стейты

Кэши в потоковых обработчиках: консистентность

Кэши в потоковых обработчиках: консистентность

Поэтому, когда обработчик хочет обработать новые сообщения, то он вычитывает значение оффсета из таблицы, например 541, и с 542 сообщения начинает вычитывать сообщения из партиции (промежуточной очереди). Вычитывает какое-то множество последовательных событий, подгружает или берёт из кэша по ним стейты, обновляет их в соответствии с продуктовой логикой и пытается записать обратно.

→ Обновление

Обработчик:

  1. Открывает транзакцию;

  2. В транзакции проверяет, что оффсет-прогресс ожидаемый (что там по-прежнему 541);

  3. Записывает обновлённые стейты и обновлённый оффсет (например, 610);

  4. Коммитит транзакцию.

Достаточность алгоритма

Из-за того, что в нашей БД транзакции имеют snapshot-изоляцию, такого алгоритма достаточно, чтобы всё было хорошо.

Предположим обратное. Пусть один обработчик работал, обновлял свой баннер table_top, но в какой-то момент его table_top оказался устаревшим. Он прочитал новые события, обновил этот table_top и попытался что-то записать в БД:

• Шардирование данных

• Транзакционное обновление стейтов и прогресса обработки событий

Кэши в потоковых обработчиках: консистентность

Кэши в потоковых обработчиках: консистентность

Такое могло произойти только если обработчик 2 того же сервиса тоже решил пообновлять table_top. В принципе, два обработчика могут одновременно пытаться обновлять один и тот же ключ, поскольку у нас не транзакционная балансировка и в процессе перебалансировки два хоста могут думать, что они обрабатывают один и тот же шард. Это довольно рядовая ситуация, но длится обычно недолго.

Обработчики обновляли один и тот же баннер, но у этого баннера один и тот же номер шарда, а значит, оба обработчика не могли пройти мимо обновления одного и того же значения оффсета в таблице. Но оффсеты мы обновляем исключительно транзакционно, при этом коммитим только после проверки открывшейся транзакции на ожидаемое значение. Этот механизм гарантирует, что хотя бы у одного обработчика либо упадёт проверка того, что оффсет ожидаемый, либо упадёт коммит транзакции с ошибкой конфликта записи. А когда у обработчика происходит ошибка, он полностью перезапускается, при этом сбрасывает весь свой кэш стейтов.

Получается, что кэш стейтов в обработчике либо консистентен с БД, либо теряет смысл, поскольку обработчик ничего не сможет записать в БД из-за того, что упадут проверки.

Холодный кэш

Для того, чтобы кэш хит у нас был выше мы используем холодный кэш — часть стейтов храним в сжатом виде. За счёт этого в тот же самый объём оперативной памяти можно уместить больше стейтов, а значит больше кэш хит и меньше поток на чтение из БД.

Холодный кэш

Холодный кэш

Таким образом с кэшом и холодным кэшом мы на наших данных сэкономили 50% потока на чтение.

Агрегирующие колонки в БД

До текущего момента мы уменьшали поток на запись в БД с помощью использования дельта-кодеков и записью патчей в БД. Но делали это на стороне сервиса потоковой обработки, что, казалось бы не очень оптимально, как минимум можно было бы всегда отправлять в БД патчи и давать ей самой их применять, используя агрегирующие колонки.

Агрегирующая колонка — это колонка в которую можно последовательно записать несколько значений, а при чтении будет возвращаться агрегат над ними. Например, в некую агрегирующую колонку можно было бы писать единички, а при чтении видеть сумму всех записанных единичек.

И у нас есть специальная агрегирующая колонка, в которую можно писать xdelta-патчи, а читать из нее результат смердживания всех патчей, то есть актуальное значение (вместо использования колонок с базовой версией и патчем, мы используем одну агрегирующую колонку, и БД сама накладывает патч на базовую версию).

Агрегирующие колонки в БД

Агрегирующие колонки в БД

Важная деталь про реализацию агрегирующей колонки: базе данных нужно либо на каждое обновление применять патч, что приведет к полному перезаписыванию стейта в БД и потере оптимизации потока на запись на диски в БД, либо хранить неприменёнными некоторое количество последних патчей, а в этом случае мы можем сильно проиграть по CPU в базе данных на чтении стейтов.

Поэтому мы используем такую оптимизацию только в случаях, когда можем обеспечить очень высокий кеш-хит. Это возможно для стейтов, которых в принципе мало и они практически никогда не вымываются из кешей на наших обработчиках. И в этом случае мы хорошо выигрываем на записи, так как пишем в БД только реально производимые изменения, а проигрыш на стоимости чтения нас не сильно волнует, так как мы очень редко читаем эти стейты.

Итоги

Карта сокровищ обновлений состояний в БД из сервисов потоковой обработки событий

Карта сокровищ обновлений состояний в БД из сервисов потоковой обработки событий

Вертикальная ось — оптимизация по чтению, горизонтальная ось — оптимизация по записи.

Мы начали с базового решения, где в двух колонках хранится ключ и значение. Затем применили сжатие zstd и сэкономили 15% потока чтения/записей и места в БД. Потом улучшили сжатие, начав сжимать со словарём, и сэкономили ещё 40% от оставшегося. Далее, мы пошли оптимизировать запись в БД. Поделили стейт на много колонок и в несколько раз уменьшили поток на запись. Также для внешних потребителей сэкономили поток на чтение. Затем мы применили дельта-кодеки (сделав базовые и патчевые колонки) дополнительно к многоколоночности. Это вдвое соптимизировало поток на запись. Мы применили кэши и холодные кэши и вдвое сэкономили поток на чтение из БД. Наконец, мы сделали агрегирующую колонку в БД и получили очень мощное уменьшение потока на запись в БД.

И по итогу очень хорошо соптимизировали нагрузку на нашу базу данных.

Но оптимизировать можно бесконечно, как и обсуждать соптимизированное, поэтому напоследок подкину тем для размышления:

  • Патчи: на стороне бэкенда VS на стороне БД, что лучше?

  • Консистентные кэши: почему все-таки работает и нельзя ли иначе?

  • Сочетание сжатия и патчей: в каком порядке делать и сжимать ли патч?

  • Иные способы справляться с большими потоками и объёмами данных — тоже можно обсудить!

Буду рад обсудить их вместе с вами по почте: yuri.pechatnov@gmail.com или тг.

© Habrahabr.ru