Брокер очередей Capella Queue
Привет!
Я часто видел заголовки подобные «Apache Kafka vs RabbitMQ vs NATS», но что делать если что-то не устраивает в готовых решениях? Можно подстроиться, а можно изобрести что-то своё. Я пошел вторым путём. В этой статье я хотел бы рассказать про свою реализацию брокера сообщений. Если стало интересно, добро пожаловать под кат.
1.1 Сохранение заказов
Некоторое время назад я работал в очень крупном интернет магазине. Была общая задача: не «пропустить» ни один заказ. То есть сделать так, что бы была возможность сохранить заказ в несколько мест, и при доступности хотя бы одного из них сохранить заказ.
Уточнение 1: обрабатывая заказ нужно однозначно избежать ситуации когда один заказ обрабатывается дважды.
Уточнение 2: было несколько ДЦ и допускалось, что они могут иногда не иметь между собой связь, а еще внутри ДЦ может рушиться сеть.
Уточнение 3: разумеется отправлять на сборку нужно сразу (считаем каждый склад расположенным на своих независимых нескольких ДЦ).
Постановка задачи в таком виде увы не имеет явного решения :(Однако при наличии определённых допущений всё же реализуема.
Как эту задачу реализовали мы, я тут рассказывать не буду. Но сама постановка задачи предлагает использование очередей сообщений, и при этом рассматриваемые готовые решения увы не подходили по разным причинам.
И у меня зародилось желание написать мутанта сочетающего в себе плюсы разных подходов.
1.2 Портативный анализатор
Некоторое время назад мне позвонил друг и пожаловался, что у него не получается организовать передачу данных от «портативного анализатора». Как выяснилось, требуется сделать так, что бы потребителей данных было много, причём часть из них желает обращаться непосредственно к анализатору, а для части нужно, что бы анализатор сам послал им данные. Причём такая необходимость разного способа получения данных обусловлена внешними требованиям, повлиять на которые, увы, нет возможности.
У меня всплыла в голове идея мутанта, и окончательно сформировалась.
2. Цель
Для начала я решил выписать требования, поставить задачу и понять, а нет ли готового чего-то что удовлетворяет целям.
Основная цель — иметь брокер сообщений.
Требование 0: брокер должен хранить сообщения и предоставлять доступ к ним в течении какого-то времени (Блокирующее).
Требование 1: очередь должна работать на слабом железе (Блокирующее). Интересно что это требование было и в «крупном интернет магазине» ибо выделение железа было затруднительным, а выделяемые ресурсы не всегда были стабильными.
Требование 2: если узел брокера умеет принимать сообщение, то нужно иметь возможность послать туда сообщение, даже если у него нет связи с другими узлами (Блокирующее).
Требование 3: если сообщение было послано в один узел за сообщения определённого типа, то оно в конечном счёте должно появиться во всех узлах отвечающих за сообщение этого типа доступным для получения (Желательное).
Требование 4: если одинаковое сообщение было послано в два узла брокера, отвечающего за сообщения одного типа, то сообщения не должны задублироваться в конечном счёте (Желательное).
*`В конечном счёте` — по прошествии времени. Если были разрывы соединений, то после их восстановления.
Начиная подбирать готовые решения мы начали упираться в их ограничения.
Kafka — требует понимание как работает и требует железо и это блокирующий минус. Плюсы в Kafka — концепция журналирования, то есть возможность прочитать повторно сообщения начиная с какого-то момента времени.
RabbitMQ — блокирующая проблема — нет простого механизма перепрочитать сообщения, хотя извратиться можно. Плюс — крутой механизм маршрутизации.
NATS Streaming — он простой в настройке и быстрый это плюс. Однако у NATS Streaming кластеров нет возможности у узлов работать независимо при потере связи. Мы попробовали использовать NATS Streaming для сохранения заказов и разворачивали несколько независимых кластеров для обеспечения отказоустойчивости. Увы, при использовании в качестве хранилища PostgreSQL NATS Streaming вёл себя не стабильно на боевых нагрузках. Я конечно допускаю, что мы его не правильно готовили, но опыт других команд показал, что готовили его неправильно не только мы. Между прочим для передачи событий между сервисами, где не требуется стабильная работа, NATS Streaming вполне подошел.
Других готовых решений я не рассматривал.
Я подумал и решил на досуге реализовать свою версию очередей, хотя бы частично реализовывающую требования описанные выше.
3. Реализация (краткое описание)
3.1 Общие понятия
Cluster — сервис в котором хранятся очереди, обработчики и ссылки на другие кластера.
Queue — очередь. Очередь хранится в кластере. Очередь принимает сообщения, сохраняет их в хранилище и выдаёт по требованию. Что бы получить сообщение нужно передать ID последнего прочитанного сообщения.
Handler — обработчик. Они используются для того, что бы передать данные из очереди в очередь, для того что бы выгрузить из памяти данные очередей, и что бы перенести или удалить старые данные очереди.
ExternalCluster — ссылки на другие кластера. Можно например положить сообщения из своего кластера в другой. Или наоборот забрать из другого кластера сообщения и положить в очереди этого кластера.
3.2 Очередь изнутри
Очередь представляет из себя набор блоков. Запись ведётся в последний блок. Блок сохраняется целиком. Каждый блок содержит ограниченное количество сообщений. Ограничения накладываются на суммарный объём, на время прошедшее от создания блока и на количество сообщений в блоке.
Предполагается 4 вида сохранения:
сохранить сразу после добавления сообщения
ничего не делать после добавления сообщения
помечать очередь, что она изменилась после добавления сообщения
дождаться, что сообщение сохранится после добавления сообщения, сохранение происходит периодически
Очереди поддерживают подписку. Однако подписка реализована так, что одна подписка — один читатель и подписки работают независимо.
Идентификатор сообщения в очереди формируется независимо для каждой очереди. В сообщении есть поля для определения источника, и внешнего идентификатора.
3.3 Хранение данных
Для хранения данных сообщения было решено выделить хранение в отдельный слой. То есть хранилище может быть как диком (уже реализовано) так и неким облачным хранилищем (s3) и даже БД (предстоит реализовать).
Блоки от одной очереди могут храниться в разных хранилищах. Для реализации этого механизма у каждого блока есть отметка в каком хранилище хранить. С помощью обработчиков эту отметку можно поменять.
Один из предполагаемых сценариев: сохранять активные записи (свежие) на быстрый диск, а неиспользуемые записи архивировать и сохранять на более простые и дешёвые диски с меньшей пропускной способностью.
Обработчики могут не только перемещать блоки. На данный момент реализованы следующие обработчики:
Выгрузка блоков из памяти
Перемещение блоков между местами хранения
Удаление старых блоков
Копирование сообщений между очередями
Копирование сообщений между очередями
Для копирования используется механизм вставки «уникального сообщения». Уникальность определяется по уникальности пары «источник + внешний ID». Благодаря такому подходу можно настроить копирования очереди в двунаправленном режиме. Особенности реализации таковы, что наибольшая производительность достигается если копирование идёт с нарастающим внешним ID и при этом этого сообщения ещё нет в очереди. То есть в однонаправленном копировании. Так же рекомендуется ограничить количество источников разумным количеством (например 1000) в рамках одной очереди.
FIFO
Сообщения в рамках очереди сохраняются в том порядке в котором они были сохранены в очередь. При копировании порядок вставляемых сообщений (новых сообщений, которых не было в очереди) сохраняется
3.4 Примеры сценариев работы
Сохранение события для сервисов в дата центрах (например сохранение заказа)
Подготовка кластеров:
Разворачивается N кластеров Capella Queue
В каждом кластере создаётся очередь для приёма события
Для каждой очереди определяются обработчики, для удаления, выгрузки из памяти, и перемещения старых сообщений
Для каждой очереди определяется обработчик копирования сообщений в другие очереди (в каждую или по кругу или по любой другой схеме)
Реализация сохранения:
Для сервиса который сохраняет событие определяется К кластеров Capella Queue из подготовленных и определяется надёжность M (M < K)
Далее при необходимости сохранения события Сервис:
Определяет для сообщения глобально уникальную связку источник+внешний ID. Так же рекомендуется указать сегмент
Сохраняет сообщение в M Кластеров
Если какое-то сохранение прошло не успешно, то подбирается другой кластер из К определённых для сервиса и производится попытка сохранить сообщение туда.
Обработчик событий, может подписаться на одну любую очередь и получать оттуда все сообщения. Чтение по сегментам еще не реализовано, но запланировано.
Сохранение события для удалённых устройств (например для анализатора)
Подготовка кластеров:
Разворачивается кластер на устройстве
Разворачиваются кластеры в датацентрах, куда хочется выгружать данные
В каждом кластере создаётся очередь для приёма события
Настраиваются обработчики для удаления, выгрузки из памяти и архивирования
Настраиваются обработчики для копирования сообщений (настраиваться может как на устройстве так и в ДЦ)
Реализация сохранения:
События сохраняются в очередь на локальном кластере. Нужно определить для сообщения глобально уникальную связку источник+внешний ID. Благодаря обработчикам копирования сообщения будут доступны как на устройстве так и в ДЦ
4. Ближайшие планы
Сделать туториал, с описанием основных кейсов.
Прикрутить безопасность.
Прикрутить использование сервисом SSL сертификатов.
Добавить сегментацию — возможность чтения и переливки данных по сегментам.
Обновление параметров очередей, кластеров и обработчиков.
Функционал для контроля того, что сообщение отреплицировалось в другие кластера.
Метрики.
Код
На github