Реализация итоговой согласованности. Разбор библиотеки event-outbox
Здравствуйте. Меня зовут Юрий Кехтер, я backend-разработчик на Python.
В этой статье я хотел бы рассказать об архитектурных шаблонах Transactional Outbox и Idempotent Consumer. Кроме того, я хотел бы показать собственную реализацию, содержащую интересное сочетание технологий, выходящее за рамки этих шаблонов, значительно упрощающее реализацию и эксплуатацию:
Альтернатива поллингу (polling) при публикации событий.
Автоматическое разделение работы по публикации событий.
Автоматическое удаление устаревших данных.
Возможность запуска в одном процессе с HTTP-сервером.
Я предпочитаю не смешивать разные языки (languages), поэтому постараюсь указывать в скобках название термина на английском, чтобы избежать разночтений.
Библиотека event-outbox написана на Python в порыве энтузиазма, во время «вынужденной паузы» на моей текущей работе. По состоянию на июнь 2024 года, она еще не добралась до версии 1.0.0
и ни разу не использовалась по настоящему. К тому же, ей не хватает полноценной документации.
Код в статье предоставлен исключительно для демонстрации и отличается от исходного кода библиотеки. Я использую синтаксис ...
(ellispis), чтобы опустить некоторые несущественные детали, но предоставить достаточно контекста для понимания кода.
Если Вы по какой-то причине решите использовать эту библиотеку, прошу вас связаться со мной, ведь я заинтересован в её развитии. Если Вы чувствуете непреодолимое желание помочь проекту за идею, то приглашаю вас объединить усилия в open-source.
Проблема
Требуется гарантировать итоговую согласованность данных (eventual consistency) при выполнении двух упорядоченных действий на двух разных сервисах в распределенной системе. Между действиями допустима задержка. Общение между сервисами происходит по нестабильной сети. Сервисы могут падать.
Если пренебречь условием «на разных сервисах», то речь идет о монолите (monolith), а решение очевидно. Действия происходят один за другим, изменения данных накапливаются в транзакции. После фиксации транзакции (commit), согласованные изменения записываются в базу данных. Допустимость задержки между действиями игнорируется. Проблема нестабильной сети обходится стороной. Транзакция спасает целостность данных от отказа сервиса в процессе обработки запроса.
Если все же речь идет о двух разных сервисах, то для гарантии целостности (consistency) данных, проблемы нестабильной сети и падения сервисов игнорировать не получится.
Обозначим порядок обработки:
Первый сервис получает запрос (request) от клиента.
Первый сервис выполняет первое действие и публикует событие, инициирующее выполнение второго действия на втором сервисе.
Первый сервис отправляет ответ (response) клиенту.
Второй сервис получает событие и выполняет второе действие.
Коммуникация между клиентом и первым сервисом выходит за рамки решаемой проблемы. Считаем, что запрос (request) все-таки пришел на первый сервис, а ответ (response) все-таки будет доставлен клиенту.
Когда первый сервис выполняет действие и публикует событие, возможны несколько точек отказа:
Событие не опубликовано. Если изменения в базе данных зафиксированы (commit) до публикации события, то отсутствует гарантия публикации. Второй сервис или даже система обмена сообщениями могут быть недоступны. Как итог, второе действие может остаться невыполненным.
Опубликовано лишнее событие. Если событие опубликовано до фиксирования (commit) изменений в базе данных, то не гарантируется изменение данных. База данных может оказаться недоступной. Как итог, фиксация результатов первого действия может не произойти.
Когда второй сервис получает событие и выполняет второе действие, возможны несколько точек отказа:
Событие не доставлено. Второй сервис не получил событие. Если отсутствует гарантия доставки события, то второе действие может остаться невыполненным.
Событие обработано неоднократно. Есть система доставки событий с несколькими попытками (retries), требующая от сервиса подтверждения (ack). Если второй сервис выполнил второе действие, но не смог оповестить такую систему об успешной обработке, то может быть предпринята еще одна попытка. Как итог, событие может быть обработано несколько раз.
В любом из этих случаев согласованность данных не гарантируется.
Решение
Примечание:
У проблемы существует несколько решений. Например Two-Phase Commit Protocol. В рамках этой статьи будет рассмотрено альтернативное решение — шаблоны Transactional Outbox и Idempotent Consumer.
В идеальном мире, от механизма доставки и обработки событий требуется:
Гарантия публикации события ровно один раз (exactly once).
Гарантия доставки события ровно один раз (exactly once).
Гарантия обработки события ровно один раз (exactly once).
Мне не известен способ реализации этих гарантий в настоящем мире. Однако, можно использовать другую систему гарантий:
Гарантия публикации события хотя бы один раз (at least once)
Гарантия доставки события хотя бы один раз (at least once)
Гарантия обработки события хотя бы один раз (at least once)
Гарантия идемпотентной обработки события (idempotency)
Даже если событие будет опубликовано 100 раз, доставлено 50 раз, а обработано 25 раз, то идемпотентность обработки события (idempotency) позволит гарантировать итоговую согласованность (eventual consistency).
За гарантию итоговой согласованности придется заплатить:
В случае отказов сети, сервисов или транзакций, вычислительные ресурсы могут быть потрачены впустую, ведь возможно выполнение работы, вообще не влияющей на состояние системы.
Гарантии «хотя бы один раз» (at least once) требуют механизма повторов (retries).
Иногда требуется реализовать компенсирующую транзакцию, которая отменит (rollback) первое действие на первом сервисе. Во время допустимой задержки между действиями, второе действие может стать невыполнимым в принципе.
Далее будет рассмотрен механизм доставки, оформленный в виде библиотеки event-outbox.
Технологии
Библиотека имеет ряд конкретных зависимостей и пока не позиционируется как универсальное решение для любого стека. Я попытался взять от используемых технологий максимум, чтобы обеспечить наибольшую полезность при наименьших затратах собственного времени.
MongoDB
MongoDB — это основная СУБД, с которой я работаю последние несколько лет. Некоторые её особенности напрямую повлияли на конечное решение:
Transactions — гарантирует атомарную согласованность (transactional consistency) при изменении документов в разных коллекциях.
Change Streams — позволяет подписаться на изменения в коллекции и снизить нагрузку на базу данных.
Partial TTL indexes — позволяет автоматически удалять опубликованные и обработанные события из базы данных.
Для подключения к базе данных используется асинхронный драйвер motor.
Apache Kafka
Apache Kafka — платформа потоковой передачи событий, с которой я не работал (по состоянию на июнь 2024), но изучал ради интереса. Некоторые её особенности также повлияли на конечное решение:
Consumer Groups — позволяет организовать независимую обработку одних и тех же событий в разных группах.
Consumer Rebalance Protocol — выдает консюмеру (consumer) эксклюзивные права на обработку событий из партишнов топика (topic partitions) в рамках одной группы консюмеров (consumer group) и автоматическое перераспределение при их подключении / отключении.
Manual Offset Management — позволяет гарантировать обработку события хотя бы один раз (at least once) за счет фиксирования смещения (offset commit) непосредственно после обработки события.
Custom Partitioner — позволяет использовать собственный алгоритм выбора партишна (partition), в который публикуется событие.
Для публикации и потребления (consume) событий используется асинхронный клиент aiokafka.
Pydantic
Библиотека pydantic используется для описания модели данных (data model) публикуемых событий.
Публикация событий
Transactional Outbox
Transactional Outbox — шаблон, гарантирующий публикацию событий хотя бы один раз (at least once). Суть: отделить намерение (intent) от публикации события.
При выполнении действия, в одной транзакции оказываются:
Изменения данных, т.е. результат выполнения действия.
Намерения опубликовать события.
После успешной фиксации (commit) такой транзакции, в отдельной коллекции базы данных надежно хранятся намерения опубликовать события. Если произойдет отказ транзакции (abort), то такие намерения просто не будут зафиксированы.
Для публикации событий запускается специальный цикл, читающий документы из коллекции с намерениями и публикующий их в систему обмена сообщениями. После подтверждения публикации, событие помечается опубликованными и больше не публикуется.
В случае отказа системы обмена сообщениями (Kafka), событие останется неопубликованным. Попытки опубликовать событие будут предприниматься до тех пор, пока система обмена сообщениями не станет доступна и не подтвердит публикацию события.
В случае отказа сети, система обмена сообщениями (Kafka) не сможет подтвердить публикацию. При следующей попытке публикации, в системе обмена сообщениями может появиться дубликат события.
При остановке цикла, публикующего события, публикация останавливается до тех пор, пока цикл не будет перезапущен. Так как после публикации, события помечаются в базе данных (MongoDB) опубликованными, цикл может восстановить работу с первого неопубликованного события.
За гарантии доставки и целостность данных придется заплатить увеличением нагрузки на базу данных. Её можно немного уменьшить, задействовав специальные механизмы слежения за изменениями.
Сохранение намерений в базу данных
Начнем с кода:
from contextlib import AbstractAsyncContextManager
from motor.motor_asyncio import AsyncIOMotorClientSession
from pydantic import BaseModel
class Event(BaseModel):
...
class EventListener:
def event_occurred(self, event: Event) -> None:
...
class EventOutbox:
def event_listener(
self, mongo_session: AsyncIOMotorClientSession
) -> AbstractAsyncContextManager[EventListener]:
...
Интерфейс EventListener
— это синхронный слушатель событий. Он объявляет единственный метод event_occurred
, который принимает event
— произошедшее событие.
Класс EventOutbox
используется как менеджер контекста (context manager) таких слушателей. С помощью метода event_listener
можно асинхронно открыть контекст, передав сессию (session) базы данных:
from motor.motor_asyncio import AsyncIOMotorClient
from event_outbox import Event, EventOutbox
async def handler(
mongo_client: AsyncIOMotorClient,
outbox: EventOutbox,
) -> None:
db = mongo_client.get_default_database()
async with await mongo_client.start_session() as session:
async with outbox.event_listener(session) as listener:
await db["collection"].insert_one({}, session=session)
listener.event_occurred(
Event(
topic="bounded_context",
content_schema="EventOccurred",
)
)
Сессия базы данных нужна для того, чтобы намерения опубликовать события попали в одну транзакцию с результатом выполнения действия. Таким образом, контекстный менеджер (context manager) EventOutbox.event_listener
сам открывает и фиксирует (commit) транзакцию.
Класс Event
— это модель данных pydantic. Она используется для представления (serialize/dump) события в json
и передачи по сети. Новый класс событий предполагается объявлять наследником класса Event
и описывать в нем все данные события:
from typing import Literal
from event_outbox import Event
class EventOccurred(Event):
topic: Literal["bounded_context"] = "bounded_context"
content_schema: Literal["EventOccurred"] = "EventOccurred"
extra_data: int
Изменения вставляются в коллекцию пачкой (insert many) при выходе из контекста:
from contextlib import asynccontextmanager
from typing import Any, AsyncIterator, Mapping
from motor.motor_asyncio import AsyncIOMotorClientSession, AsyncIOMotorCollection
outbox: AsyncIOMotorCollection = ...
class EventListener:
...
class EventOutbox:
mongo_outbox: AsyncIOMotorCollection
@asynccontextmanager
async def event_listener(
self, mongo_session: AsyncIOMotorClientSession
) -> AsyncIterator[EventListener]:
documents: list[Mapping[str, Any]] = ...
listener: EventListener = ...
async with self.mongo_session.start_transaction():
yield listener
await self.mongo_outbox.insert_many(
documents,
session=mongo_session,
)
Чтение намерений из базы данных
При запуске или перезапуске цикла публикации, все накопившиеся события будут последовательно прочитаны через обычный find
и опубликованы.
Когда документов в коллекции не остается, необходимо каким-то образом подождать появления новых. Классическое решение — организовать поллинг (polling) коллекции. Change Streams позволяют реализовать альтернативу поллингу (polling) и снизить нагрузку на базу данных. Можно подписаться на операции insert
в коллекции и ждать, когда MongoDB сама оповестит о новом документе.
from typing import Any, AsyncIterator, Mapping
from bson import Timestamp
from motor.motor_asyncio import AsyncIOMotorClientSession, AsyncIOMotorCollection
class EventPublisher:
mongo_session: AsyncIOMotorClientSession
mongo_outbox: AsyncIOMotorCollection
async def subscribe_to_change_stream(
self, start_at_operation_time: Timestamp
) -> AsyncIterator[Mapping[str, Any]]:
async with self.mongo_outbox.watch(
[{"$match": {"operationType": {"$in": ["insert"]}}}],
start_at_operation_time=start_at_operation_time,
session=self.mongo_session,
) as change_stream:
async for change_event in change_stream:
yield change_event["fullDocument"]
Эксклюзивные права на публикацию
Consumer Rebalance Protocol позволяет Kafka автоматически назначать (assign) консюмерам (consumer) партишны топиков (topic partition) таким образом, чтобы из партишна одновременно читал только один консюмер группы.
Что именно было назначено консюмеру (consumer), можно узнать во время выполнения:
from aiokafka import AIOKafkaConsumer
kafka_consumer: AIOKafkaConsumer = ...
assignment = kafka_consumer.assignment()
Если консюмер (consumer) и продюсер (producer) запускаются в одном процессе, то способность Kafka назначать партишны консюмерам (consumer) может быть использована для иной цели — предоставления продюсерам (producer) эксклюзивного права на публикацию в партишны (partition):
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from pydantic import BaseModel
class Event(BaseModel):
topic: str
partition_key: int
class EventPublisher:
kafka_consumer: AIOKafkaConsumer
kafka_producer: AIOKafkaProducer
async def publish_event(self, event: Event) -> None:
partition = event.partition_key % len(
self.kafka_consumer.partitions_for_topic(event.topic)
)
assignment = self.kafka_consumer.assignment()
if any(
topic_partition.partition == partition
for topic_partition in assignment
if topic_partition.topic == event.topic
):
await self.kafka_producer.send_and_wait(
event.topic,
event.model_dump_json().encode(),
partition=partition,
)
Таким образом, существует возможность запустить несколько параллельных процессов публикации событий и использовать Kafka для разделения работы между ними. Каждый из продюсеров (producuer) публикует события только в выделенные ему партишны топика (topic partitions).
Доставка событий
При попадании события в кластер, доставка хотя бы один раз (at least once) гарантируется самой Kafka.
Обработка событий
Idempotent Consumer
Idempotent Consumer — шаблон, гарантирующий идемпотентность (idempotency) обработки событий. Суть: зафиксировать (commit) факт обработки вместе с результатом обработки.
Когда событие поступает из сети, оно сохраняется в базу данных. Выполняется обработка. В одну транзакцию попадают:
Изменение флага у входящего события.
Изменения данных, т.е. результат обработки.
Намерения опубликовать другие события.
После фиксации (commit) такой транзакции, событие считается обработанным и больше не обрабатывается.
Если по какой-то причине произошла одновременная обработка одного и того же события, то будет зафиксирована только одна транзакция, а вторая будет отменена (abort).
Если необходимо выполнить запрос во внешнюю систему, то рекомендуется передать ключ идемпотентности. Тогда повторная обработка событий гарантировано не приведет к повторному выполнению действия во внешней системе.
Обработчик событий
Для начала рассмотрим интерфейс обработчика событий — протокол EventHandler
:
from typing import Protocol
from motor.motor_asyncio import AsyncIOMotorClientSession
class Event:
topic: str
content_schema: str
class EventOutbox:
...
class EventHandler(Protocol):
async def __call__(
self,
event: Event,
mongo_session: AsyncIOMotorClientSession,
/,
) -> None:
pass
С ним совместима функция, принимающая 2 аргумента — обрабатываемое событие и сессию базы данных с открытой транзакцией, в которой необходимо выполнять запросы.
Событие приходит в обработчик как экземпляр класса Event
. Библиотека не реализует маршрутизацию (routing) по типам событий. Специального механизма внедрения зависимостей в обработчик тоже нет. Для внедрения экземпляра EventOutbox
или любых других зависимостей, можно написать небольшой lambda
-адаптер:
from motor.motor_asyncio import AsyncIOMotorClientSession
from event_outbox import Event, EventHandler, EventOutbox
async def event_handler(
event: Event,
session: AsyncIOMotorClientSession,
outbox: EventOutbox,
answer: int,
) -> None:
match (event.topic, event.content_schema):
case ("bounded_context", "EventOccurred"):
...
def create_adapter() -> EventHandler:
outbox: EventOutbox = ...
return lambda event, session: (
event_hander(
event,
session,
outbox,
answer=42,
)
)
Эксклюзивные права на обработку
Consumer Rebalance Protocol выдает консюмеру (consumer) в группе эксклюзивные права на обработку событий из назначенных ему партишнов топиков (topic partitions).
Сохранение входящего события в базу данных
Пришедшее из Kafka событие сохраняется в специальную коллекцию входящих событий. MongoDB позволяет использовать словарь в качестве идентификатора, чтобы воспользоваться встроенным (default) уникальным индексом на _id
для обеспечения уникальности по нескольким полям.
from motor.motor_asyncio import AsyncIOMotorClientSession, AsyncIOMotorCollection
from pydantic import BaseModel
class Event(BaseModel):
topic: str
content_schema: str
idempotency_key: str
class EventConsumer:
mongo_inbox: AsyncIOMotorCollection
mongo_session: AsyncIOMotorClientSession
async def handle_events(self) -> None:
while True:
event: Event = ...
document_id = event.model_dump(
mode="json",
include={"topic", "content_schema", "idempotency_key"},
)
await self.mongo_inbox.insert_one(
{"_id": document_id, "handled": False},
session=self.mongo_session,
)
...
Оптимистическая блокировка
Перед обработкой события открывается транзакция, в которой необработанное событие помечается обработанным. Фактически, это оптимистическая блокировка по полю handled
. При наличии нескольких конкурирующих транзакций, изменяющих флаг handled
, зафиксирована (commit) будет только одна.
from datetime import UTC, datetime
from typing import Any, Mapping, Protocol
from motor.motor_asyncio import AsyncIOMotorClientSession, AsyncIOMotorCollection
class Event:
...
class EventHandler(Protocol):
async def __call__(
self,
event: Event,
mongo_session: AsyncIOMotorClientSession,
/,
) -> None:
pass
class EventConsumer:
mongo_session: AsyncIOMotorClientSession
mongo_inbox: AsyncIOMotorCollection
event_handler: EventHandler
async def handle_event(self, document_id: Mapping[str, Any], event: Event) -> None:
async with self.mongo_session.start_transaction():
result = await self.mongo_inbox.update_one(
{"_id": document_id, "handled": False}, # <-- Here
{
"$set": {
"handled": True,
"handled_at": datetime.now(tz=UTC),
}
},
session=self.mongo_session,
)
if result.modified_count:
await self.event_handler(event, self.mongo_session)
Идемпотентный запрос к внешней системе
Событие Event
содержит ключ идемпотентности (idempotency key). Ключ генерируется при создании экземпляра Event
как hex-представление UUID4
. Этот ключ может использоваться для идемпотентных запросов к внешним системам:
from motor.motor_asyncio import AsyncIOMotorClientSession
from event_outbox import Event
async def send_email(idempotency_key: str) -> None:
...
async def event_handler(
event: Event,
session: AsyncIOMotorClientSession,
) -> None:
await send_email(event.idempotency_key)
Подтверждение обработки
Manual Offset Management позволяет вручную управлять смещением (commit offset), чтобы зафиксировать в Kafka факт обработки события непосредственно после обработки. Для этого консюмер создается с флагом enable_auto_commit=False
:
from aiokafka import AIOKafkaConsumer
class EventConsumer:
kafka_consumer: AIOKafkaConsumer
async def handle_events(self) -> None:
while True:
kafka_consumer_record = await self.kafka_consumer.getone()
...
await self.kafka_consumer.commit() # <-- Here
async def initialize() -> None:
"""
Your service initialization code
"""
topics: list[str] = ...
bootstrap_servers: str = ...
group_id: str = ...
async with AIOKafkaConsumer(
*topics,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
auto_offset_reset="earliest",
enable_auto_commit=False, # <-- Here
) as kafka_consumer:
...
В случае перезапуска цикла обработки событий, цикл продолжит работу с первого необработанного события.
Удаление устаревших событий
Чтобы избежать повторной обработки, необходимо достаточно долго держать в базе данных информацию об обработанных событиях. В связи с этим, в базе данных накапливается большое количество устаревших документов. Для удаления устаревших данных используется Partial TTL indexes. Событие автоматически удаляется из базы данных после публикации или обработки, через заранее определенное время. Это позволяет переложить задачу очистки базы данных на MongoDB:
from datetime import timedelta
from motor.motor_asyncio import AsyncIOMotorCollection
class EventOutbox:
mongo_outbox: AsyncIOMotorCollection
mongo_inbox: AsyncIOMotorCollection
async def create_indexes(self) -> None:
await self.mongo_outbox.create_index(
"published_at",
name="expiration",
partialFilterExpression={"published": True},
expireAfterSeconds=timedelta(days=1).total_seconds(),
)
await self.mongo_inbox.create_index(
"handled_at",
name="expiration",
partialFilterExpression={"handled": True},
expireAfterSeconds=timedelta(days=1).total_seconds(),
)
Инициализация и запуск
Асинхронные циклы публикации в Kafka и обработки событий из Kafka запускаются в одном процессе. Например, их можно запустить вместе с HTTP-фреймворком. Тогда приложение будет состоять из трех основных циклов:
Цикл обработки HTTP-запросов.
Цикл публикации событий.
Цикл идемпотентной обработки событий.
Например, при использовании HTTP-фреймворка FastAPI, инициализацию можно выполнить в lifespan
:
from contextlib import asynccontextmanager
from typing import AsyncIterator
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from fastapi import FastAPI
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorClientSession
from event_outbox import Event, EventOutbox
async def event_handler(
event: Event,
session: AsyncIOMotorClientSession,
outbox: EventOutbox,
) -> None:
...
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
mongo_client: AsyncIOMotorClient = ...
kafka_producer: AIOKafkaProducer = ...
kafka_consumer: AIOKafkaConsumer = ...
event_outbox = EventOutbox(
mongo_client,
kafka_producer,
kafka_consumer,
)
await event_outbox.create_indexes()
async with event_outbox.run_event_handler(
lambda event, session: event_handler(
event,
session,
event_outbox,
)
):
yield
def create_app() -> FastAPI:
return FastAPI(lifespan=lifespan)
Заключение
Реализация итоговой согласованности (eventual consistency) за счет гарантий доставки и идемпотентной обработки — это мощный механизм, который часто ускользает из виду. Возможность запускать циклы обработки и публикации в одном процессе с HTTP-сервером позволяет интегрировать решение в проект без запуска дополнительных процессов (worker). Использование механизмов отслеживания изменений в базе для ожидания новых событий является альтернативой поллингу (polling) и снижает нагрузку на базу данных. Эксплуатирование механизма назначения партишнов (partition) между консюмерами (consumer) позволяет также разделить работу по публикации событий между продюсерами (producer). Автоматическое удаление устаревших событий из базы данных позволяет снизить затраты на хранение.
В целом, я доволен реализованным решением. Мне очень хотелось бы услышать мнение сообщества. Я первый раз в open-source. Буду рад, если проект окажется полезным.
Если после прочтения у вас остались вопросы, предлагаю ознакомиться с исходным кодом или обсудить в комментариях.
Спасибо за внимание!