Мой диплом, или Как собрать вещи и переехать на YDB

Меня зовут Арслан, в этом году я делал сервис для построения циклов заказа (например, заказа такси). Возможно, вы видели пост от другого разработчика в команде, Ильи Lol4t0. Всего сервис обрабатывает примерно 5000 RPS с задержкой 100 мс в 99 перцентиле. Раньше для хранения данных использовалась связка PostgreSQL с YT — MapReduce-системой Яндекса.

Обычно информация по заказу нужна в быстром доступе в течение пары часов. На эту парадигму хорошо ложилась архитектура с горячим и холодным хранилищем. Событие создавалось в PostgreSQL, асинхронно реплицировалось в YT, а спустя два часа удалялось из PostgreSQL, никаких проблем. Но со временем начали напрягать несколько вещей: сложность архитектуры, низкая доступность во время проведения работ на PostgreSQL и ограниченная возможность горизонтально масштабировать систему. Мы решили перейти на новую архитектуру с базой данных YDB. Хотели на примере тестового сервиса разобраться, как работать с базой, проверить всё под нагрузкой и реализовать хранение данных исходного сервиса.

relxhm5kpb1mfsbdxl7gqrbcl8k.jpeg

Вообще, изначально я написал про это диплом. Но потом подумал, что читателям здесь тоже будет интересно, и всё переделал под Хабр. Если тоже переезжаете на YDB (после выхода в опенсорс это стало проще) или адаптируете систему с базой — заглядывайте. Поговорим о большинстве возможных трудностей при переезде.

Processing as a Service


Подробнее о цикле заказа можно почитать в прошлом посте, а здесь мы сосредоточимся на технических деталях сервиса, которые влияют на его требования к хранилищу данных.

Processing as a Service (ProcaaS) — микросервис для создания конечных автоматов, обрабатывающих упорядоченные ряды событий. Каждому ряду сопоставляется свой экземпляр конечного автомата, который адресуется по уникальному FSM_ID. Пользователи генерируют события и отправляют их в ProcaaS, а он уже сохраняет событие в базу и асинхронно запускает его обработчик.

Обработчик запускается с помощью постановки задачи в брокер задач, который гарантирует, что она будет выполняться до успешного завершения. Из-за этого есть ограничение — обработчик должен быть идемпотентным. При запуске обработчик вычитывает из базы все произошедшие на текущий момент события и по ним вычисляет состояние конечного автомата. Далее начинают последовательно обрабатываться необработанные события. Для каждого из них на основе текущего состояния и конфигурации автомата выбирается подходящий пайплайн обработки, который состоит из нескольких последовательно выполняемых стадий. Типичное действие внутри стадии — сделать запрос в несколько ресурсов, собрать из их ответов объект и потом отправить его в другой ресурс или на вход следующей стадии. В случае успеха событие в базе помечается обработанным, иначе включается fallback.

Пример автомата и ряда событий по нему:

tips2v6tkanxyvpi3qzo2npt9_a.jpeg

Событие — некоторая сущность, которая генерируется пользователем: допустим, «создать заказ». Для него в базе хранятся разные свойства, основные из них перечислены в таблице ниже.


При работе с базой данных ProcaaS использует следующие операции:

  • Вставить новое событие в базу. Это пишущий запрос.
  • Вычитать все события ряда и упорядочить их. Мы не можем это делать при вставке, так как есть возможность постановки событий на будущее. Кроме того, запрос вставки — основной, которым пользуются клиенты, поэтому хочется выполнять его как можно быстрее. Это одновременно читающий и пишущий запрос.
  • Пометить несколько событий обработанными. Это пишущий запрос.


Можно заметить, что все основные запросы включают в себя запись данных, значит в текущей схеме все они создают нагрузку на мастер PostgreSQL — то есть реплики служат для отказоустойчивости, а не для балансировки нагрузки.

Исходная архитектура


В концепции со старым-добрым PostgreSQL у нас было два хранилища упорядоченных рядов событий — горячее и холодное. Такой выбор обусловлен тем, что большинство событий происходит в короткий срок. Горячее хранилище представляет собой шардированную базу в PostgreSQL. В сервисе довольно классическая схема с тремя шардами в трёх дата-центрах с синхронной master-slave-репликацией, что обеспечивает высокую отказоустойчивость. Каждый шард хранит 20 ГБ данных, события хранятся два часа, после чего уходят в холодное хранилище — фактически в табличку в YT. Данные в нём хранятся вечно, поэтому его размер составляет 30 ТБ. Но тут есть некоторые сложности.

Для обработки ряда нужен полный лог событий по нему. Нам нужно уметь понимать, ушла ли уже часть событий ряда в холодное хранилище, и если да, то нужно предварительно восстановить их перед началом обработки. Пытаться выяснить для каждого ряда, есть ли заархивированные события по нему в холодном хранилище, слишком долго (latency холодного хранилища составляет несколько секунд вместо нескольких десятков миллисекунд у горячего). Эту проблему мы решали тем, что пользователи проставляли флаг is-create стартовому событию в ряде, что было для них не слишком удобно. События архивируются последовательно, поэтому отсутствие стартового события означает, что нужно перед началом обработки восстановить события из холодного хранилища. У такого подхода есть несколько потенциальных проблем, которые могут привести систему в состояние, требующее ручного вмешательства разработчиков, а нам своё время жалко.

Во-первых, если клиент по ошибке отправит несколько событий с флагом is-create (см. рис. 5), endpoint events может вернуть неконсистентный ответ. Пусть в моменты времени t1 и t2 добавились события, а в момент t3 мы запросили их список. При этом событие удаляется из горячего хранилища через tA после добавления:

  • Если t3-t1 < tA, вернётся AB.
  • Если t3-t1 ≥tA ≥t3−t2, вернётся B.
  • Если t3-t2 ≥ tA, то непонятно, что должно вернуться.


На практике это периодически происходит, поэтому приходится называть такие ряды сломанными и обрабатывать их по отдельной логике.

Во-вторых, если клиент переиспользует idempotency-token после того, как событие заархивировано, создастся новое событие, а не вернётся старое, как должно, потому что endpoint create-event никак не взаимодействует с холодным хранилищем.

Следующая характерная проблема этой схемы — её масштабируемость. Сейчас база представлена тремя шардами со следующими характеристиками: 16 ядер, 16 ГБ ОЗУ, 500 ГБ SSD. Yandex Cloud позволяет поднять эти параметры не более, чем в два раза. Значит, дальнейшее масштабирование должно быть горизонтальным, а не вертикальным — при необходимости нужно будет добавить новые шарды. Процесс грамотного решардирования довольно трудный и не представлен в PostgreSQL из коробки, поэтому такая логика должна быть реализована в пользовательском коде, а это его сильно усложняет.

Отказоустойчивость системы в текущем виде достигается тем, что у каждого шарда есть реплики в трёх разных дата-центрах. В PostgreSQL используется master-slave репликация. В случае отказа мастера его роль занимает одна из реплик. Это происходит, например, во время проведения регламентных работ в дата-центрах. Во время смены мастера база данных и вместе с ней весь сервис перестаёт отвечать на несколько десятков секунд, зажигаются мониторинги, и коллеги начинают интересоваться, почему часть запросов выдаёт ошибки. При этом, к сожалению, мы ничего не можем сделать, кроме как дождаться, пока смена мастера закончится. Периодически мастера всех шардов оказываются в одном дата-центре. Если шард выключается, в этот момент база перестаёт отвечать на все запросы, что негативно влияет на доступность сервиса. За время, пока база не отвечает, с бешеной скоростью копятся необработанные задачи. А в момент, когда она поднимается, происходит резкий всплеск запросов к ней.

Новая архитектура


Вместо того чтобы хранить упорядоченные ряды событий в проверенной временем связке PostgreSQL + YT, мы решили попробовать использовать свежую YDB в качестве основного хранилища.

YDB становится единственным хранилищем, поэтому все сложности, связанные с использованием схемы горячего и холодного хранилища пропадают. Флаг is-create становится ненужным, так как больше не надо понимать, есть ли в базе все данные или нужно ли что-то восстановить из холодного хранилища. Вставить разные события по одному и тому же idempotency_token тоже больше не получится, потому что данные в YDB хранятся вечно. Бонусом стабилизируются тайминги, потому что нет больше долгих запросов, которым нужно восстанавливать данные из холодного хранилища.

YDB автоматически шардируется по диапазонам значений первичного ключа, поэтому отпадает необходимость загружать этой логикой код сервиса. Когда шард в YDB достигает предельного размера, он автоматически делится пополам по медиане первичного ключа. Также можно включить шардирование по нагрузке, при котором разделение будет происходить так, чтобы равномерно распределять нагрузку по CPU между новыми шардами. Различные шарды таблицы могут обслуживаться разными серверами распределённой БД (в том числе расположенными в разных локациях), а также они могут независимо друг от друга перемещаться между серверами для перебалансировки или поддержания работоспособности шарда при отказах серверов или сетевого оборудования. Если один дата-центр отключается, то шарды поднимаются в других дата-центрах за десятки миллисекунд, поскольку данные хранятся в distributed storage в трёх дата-центрах. Клиент YDB можно настроить так, чтобы он он использовал подходящие политики балансировки по серверам базы. Это позволяет избежать перегрузки конкретного узла кластера, так что проблемы всплеска соединений в YDB тоже нет.

План перехода


По порядку:

  1. Разобраться, как работать с YDB на примере тестового сервиса и минимальной конфигурации базы, и провести нагрузочное тестирование, чтобы выявить предельное число обрабатываемых запросов и тайминги ответов. На этом этапе нам было важно понять, удовлетворяет ли потенциально YDB нашим требованием по производительности (с отказоустойчивостью проблем не должно было быть из-за её архитектуры). Нас немного пугало, что никто в Яндекс Go до нас не использовал YDB, поэтому мы решили для начала устроить подробные тесты, чтобы понять, подходит ли она под наши задачи.
  2. Включить асинхронную репликацию реальных событий, чтобы проверить, как база справляется с нагрузками. Перед этим заказать достаточное количество ресурсов базы. Удостовериться, что показатели предельного числа обрабатываемых запросов и таймингов ответов для нас приемлемы.
  3. Реализовать полный цикл обработки в YDB. Сравнить производительность с PostgreSQL и убедиться, что дальнейшая миграция существующих потребителей целесообразна. Только после этого можно разрешить новым клиентам использовать YDB в качестве хранилища.


Тестовый сервис


Сначала нам нужно было разобраться с возможностями и ограничениями YDB. Для этого мы завели тестовую базу и тестовый сервис на userver, который загружал и читал из неё данные. Затем нужно было провести нагрузочное тестирование и получить значения таймингов/RPS.

Конфигурация тестового стенда:

  • Виртуальная машина с сервисом: 6 ядер, 24 ГБ ОЗУ, 220 ГБ SSD.
  • База данных из трёх реплик в разных дата-центрах: 10 ядер, 50 ГБ ОЗУ, 690 ГБ SSD.


Мы изучили список ограничений из официальной документации YDB. Наиболее существенными из них оказались:

  • 200 000 таблеток таблицы (каждая из них до 2 ГБ) — это потенциально до 400 ТБ данных.
  • 1000 строк в качестве результата запроса.


Мы уточнили, что эти ограничения можно изменить для конкретной базы, к тому же для нас эти ограничения были не так важны, поэтому дальше мы реализовали тестовый сервис. Прежде всего мы сделали несколько тестовых эндпоинтов:

  • Upsert-row — создавал событие c заданным fsm_id и event_id. Остальные поля первой схемы не заполнялись.
  • Get-order-event-ids — получал список идентификаторов событий по конкретному автомату.


Для начального наполнения таблицы мы использовали реальные архивные данные с вырезанной конфиденциальной информацией. С помощью специального служебного инструмента из YT мы загрузили 10 000 000 строк за шесть часов. По созданным событиям endpoint провели нагрузочное тестирование, в результате которого получилось, что endpoint вставки имеет тайминги на 99 перцентиле примерно 20 мс. Для сравнения, в PostgreSQL тайминги составляют 25–30 мс.

Итак, ограничения YDB оказались не проблемой для разрабатываемого сервиса. Количество RPS и тайминги работы были достаточными с учётом того, что реальный сервис использует десятки машин. Мы решили продолжить наш увлекательный эксперимент — впереди нас ожидало самое интересное.

Репликация событий


Дальше нам предстояло выяснить, как YDB работает под настоящей нагрузкой — 5000 RPS и события размером до нескольких десятков КБ. Теперь заказали более серьёзную конфигурацию:

  • Виртуальную машину, куда входило 50 основных машин процессинга: 4 ядра, 5 ГБ ОЗУ, 200 ГБ SSD.
  • Базу данных на девяти динамических нодах, по три в трёх разных дата-центрах: 10 ядер, 50 ГБ ОЗУ, 920 ГБ SSD.


Перед началом обработки событие стало асинхронно копироваться в YDB, чтобы не замедлять работу основного процессинга. При этом вся сложная логика сериализации лога событий при вставке оставалась на стороне PostgreSQL. Так что результаты этого шага ожидались более оптимистичными, чем при переносе всей логики в YDB. В итоге мы выяснили, что YDB хорошо масштабируется, тайминги увеличились на 10 мс в сравнении с тестовым сервисом, который давал в десятки раз меньшую нагрузку. Но, к нашему удивлению, всё ещё получались примерно те же 30 мс на 99 перцентиле, что и в PostgreSQL, — достойный результат.

Мы решили продолжить эксперимент и наладить полноценную обработку событий на стороне YDB.

Полноценная обработка событий в YDB


Нам оставался последний шаг, самый трудный — реализовать полноценную обработку событий на стороне YDB и сравнить предельное число обрабатываемых запросов и тайминги ответов с PostgreSQL. Первая пришедшая в голову идея — просто перенести схему и запросы из PostgreSQL без изменений. Но у этого решения было несколько проблем:

  • Несовместимость некоторых языковых конструкций: нет оператора NOW (), нет ограничений кроме первичного ключа и нет условных индексов.
  • Отличающаяся архитектура, в которой при неаккуратном обращении можно легко сделать многошардовую транзакцию (а она медленнее на 1.5 RTT).


Для сериализации лога событий важны следующие элементы схемы:

  • fsm_id — идентификатор, который используется для адресации автомата.
  • event_id — уникальный идентификатор события внутри автомата.
  • idempotency_token — средство достижения идемпотентности API.
  • order_key — последовательный счётчик событий в порядке добавления.
  • handling_order_key — последовательный счётчик событий в порядке обработки.


Первичным ключом изначально были поля fsm_id и event_id.

Далее для примера рассмотрим оптимизацию запроса вставки события, который используется в ручку create-event. Изначально запрос был записан аналогично тому, как он был реализован в PostgreSQL. Вот его основные шаги:

xc2eohc179dscpbhjwb3s2rue2c.jpeg


План запроса:

  • Чтобы посчитать максимальный order_key в автомате, нужно прочитать все события в нём.
  • Чтобы проверить, есть ли события с idempotency_token текущего запроса, нужно проверить все события по автомату.
  • Новое событие конструируется с учётом данных, полученных на прошлых шагах, без запросов к базе.
  • Сконструированное событие вставляется в базу.


При таком лобовом подходе 99 перцентиль ручки create-event был 150 мс, что нас категорически не устраивало.

Первой оптимизаций стало удаление второй стадии. Этого удалось достичь, добавив COUNT_IF (idempotency_token) для поиска старого события в первую стадию. Такой несложной оптимизацией мы выиграли порядка 40 мс — было приятно.

На следующий шаг нас вдохновило то, что INSERT — это комбинация чтения и записи, а в YDB есть оператор «слепой записи» UPSERT, который не выполняет чтение и просто меняет значение по первичному ключу, если ключ уже существует. Мы заменили INSERT в конце запроса на UPSERT (это можно сделать, потому что операция идемпотентна) и таким образом выиграли ещё одно чтение. Итоговые тайминги получились порядка 60 мс на 99 перцентиле, что было уже довольно неплохо.

Так, на этом этапе мы смогли переписать запросы по YDB и получили тайминги порядка 60 мс на create-event. В процессе оптимизации мы пришли от двух multilookup событий по автомату и чтения и вставки по первичному ключу к одному multilookup событию по автомату и вставке. Но это всё ещё было примерно в два раза хуже PostgreSQL, поэтому мы стали искать новые идеи для оптимизации.

Для большинства потребителей сервиса мы решили сделать fast_flow обработки. В нём события упорядочиваются по timestamp клиента в момент вставки. Дополнительная сериализация происходит в момент вычитки перед обработкой, где тайминги не столь существенны. Избавление от сложной логики позволило нам получить тайминги, сравнимые с PostgreSQL в ручке create-event. Считаем это достойным результатом.

Для большинства потребителей потеря строгого порядка событий допустима. Поэтому получилось оставить только три шага:

dxthz0fgkeevzvhee9lm8n1i_ss.jpeg


План запроса:

  • Чтобы проверить, есть ли события с idempotency_token текущего запроса, нам нужно проверить все события по автомату.
  • Новое событие конструируется с учётом данных, полученных на прошлых задачах, без запросов к базе.
  • Сконструированное событие вставляется в базу.


В этот момент мы решили заменить первичный ключ с fsm_id, event_id на fsm_id, idempotency_token, чтобы сканирование по автомату на первом шаге заменить на поиск по первичному ключу. В итоге осталось одно чтение и одна запись по первичному ключу. Тайминги при таком подходе составили 40 мс на 99 перцентиле, что уже было сравнимо с 30 мс в PostgreSQL.

rculq1z4x-znnzczylgogk5twz0.png

Ещё одним интересным местом, с точки зрения оптимизации запросов, был поиск событий abandon. Напомним, что при получении события ProcaaS сначала вставляет его в базу, а потом асинхронно запускает его обработчик. Если по какой-то причине, например, недоступность сервера, выполняющего задачи, или падение самой машины процессинга, обработчик не отработал до конца, то в базе остаётся необработанное событие. Abandon — необработанное вовремя (в момент поля due) событие, которое пролежало в базе какое-то константное время, например минуту.

Поле due отвечает за время, после которого должна быть обработка события. Оно заполняется при первой вставке события и дальше не меняется. Если потребитель его не задал изначально, в него проставляется время вставки события в базу. Также некоторые обработчики могут перепланировать обработку события — для этого есть поле reschedule_due. Получается, что abandon — это необработанные (need_handle=True) события, чьё COALESCE (reschedule_due, due) меньше некоторого дедлайна (в этом сервисе он сконфигурирован как текущее время минус 1 минута). Специальный модуль сервиса регулярно перезапускается и ищет такие события.

Для ускорения выполнения таких запросов в PostgreSQL мы использовали условный индекс fsm_id need_handle=true.

К сожалению, YDB не предоставляет механизм условных индексов. Сначала мы попробовали вручную поддерживать такую структуру в виде вспомогательной таблицы. Но пришлось от этого отказаться из-за сильного влияния на тайминги (30 мс), поскольку транзакции, затрагивающие несколько таблиц, дорогие из-за архитектуры базы (на них тратится 1.5 дополнительных RTT).

Прокрутив в голове несколько идей, мы придумали новое решение — использовать асинхронный вторичный индекс. В YDB он может отставать от актуальных данных на доли секунды, но нам не так важно, если мы добьём abandon даже на минуту позже. Зато мы получим выигрыш в производительности, который достигается ослаблением гарантий. С точки зрения базы это означает, что ей не надо коммуницировать с другими узлами, чтобы убедиться в актуальности значения.

При посимвольном переписывании запроса у нас возникли неожиданные проблемы с производительностью. И это несмотря на то, что мы взяли даже чуть более широкий fsm_id, need_start, due, reschedule_due, чтобы вообще не обращаться к основной таблице. После анализа плана запроса стало ясно, что проблема в том, что почему-то возникает multillokup по fsm_id. Дело в том, что вторичные индексы не могут быть использованы при вычислении оператора COALESCE. Это стало для нас неприятным сюрпризом и немного расстроило.

Для решения этой проблемы индекс переделали на (fsm_id, need_start, reschedule_due, due), а запрос разбили на два:

reschedule_due IS NULL AND due <= $due
reschedule_due IS NOT NULL AND reschedule_due <= $due


Благодаря этой оптимизации запрос поиска abandon стал выполняться за 50 мс вместо изначальных 450 мс — уже можно было чуть-чуть порадоваться.

Результаты


После всех оптимизаций в YDB мы наконец-то добились сравнимой с PostgreSQL производительности.

При этом, как и планировалось, мы смогли:

  • Избавиться от разделения на холодное и горячее хранилище и тем самым упростить архитектуру сервиса и API для клиентов.
  • Улучшить масштабируемость, получив автоматическое шардирование.
  • Улучшить отказоустойчивость. Uptime для новых потребителей составил 99.99% вместо 99.95% для старых.


С учётом всех этих преимуществ решили постепенно мигрировать потребителей на YDB.

© Habrahabr.ru