Реализация итоговой согласованности. Разбор библиотеки event-outbox

99f9d36f8c6f2cf260ae5da511c1a2a9

Здравствуйте. Меня зовут Юрий Кехтер, я backend-разработчик на Python.

В этой статье я хотел бы рассказать об архитектурных шаблонах Transactional Outbox и Idempotent Consumer. Кроме того, я хотел бы показать собственную реализацию, содержащую интересное сочетание технологий, выходящее за рамки этих шаблонов, значительно упрощающее реализацию и эксплуатацию:

  • Альтернатива поллингу (polling) при публикации событий.

  • Автоматическое разделение работы по публикации событий.

  • Автоматическое удаление устаревших данных.

  • Возможность запуска в одном процессе с HTTP-сервером.

Я предпочитаю не смешивать разные языки (languages), поэтому постараюсь указывать в скобках название термина на английском, чтобы избежать разночтений.

Библиотека event-outbox написана на Python в порыве энтузиазма, во время «вынужденной паузы» на моей текущей работе. По состоянию на июнь 2024 года, она еще не добралась до версии 1.0.0 и ни разу не использовалась по настоящему. К тому же, ей не хватает полноценной документации.

Код в статье предоставлен исключительно для демонстрации и отличается от исходного кода библиотеки. Я использую синтаксис ... (ellispis), чтобы опустить некоторые несущественные детали, но предоставить достаточно контекста для понимания кода.

Если Вы по какой-то причине решите использовать эту библиотеку, прошу вас связаться со мной, ведь я заинтересован в её развитии. Если Вы чувствуете непреодолимое желание помочь проекту за идею, то приглашаю вас объединить усилия в open-source.

Проблема

Требуется гарантировать итоговую согласованность данных (eventual consistency) при выполнении двух упорядоченных действий на двух разных сервисах в распределенной системе. Между действиями допустима задержка. Общение между сервисами происходит по нестабильной сети. Сервисы могут падать.

Если пренебречь условием «на разных сервисах», то речь идет о монолите (monolith), а решение очевидно. Действия происходят один за другим, изменения данных накапливаются в транзакции. После фиксации транзакции (commit), согласованные изменения записываются в базу данных. Допустимость задержки между действиями игнорируется. Проблема нестабильной сети обходится стороной. Транзакция спасает целостность данных от отказа сервиса в процессе обработки запроса.

Если все же речь идет о двух разных сервисах, то для гарантии целостности (consistency) данных, проблемы нестабильной сети и падения сервисов игнорировать не получится.

Обозначим порядок обработки:

  1. Первый сервис получает запрос (request) от клиента.

  2. Первый сервис выполняет первое действие и публикует событие, инициирующее выполнение второго действия на втором сервисе.

  3. Первый сервис отправляет ответ (response) клиенту.

  4. Второй сервис получает событие и выполняет второе действие.

Коммуникация между клиентом и первым сервисом выходит за рамки решаемой проблемы. Считаем, что запрос (request) все-таки пришел на первый сервис, а ответ (response) все-таки будет доставлен клиенту.

Когда первый сервис выполняет действие и публикует событие, возможны несколько точек отказа:

  • Событие не опубликовано. Если изменения в базе данных зафиксированы (commit) до публикации события, то отсутствует гарантия публикации. Второй сервис или даже система обмена сообщениями могут быть недоступны. Как итог, второе действие может остаться невыполненным.

  • Опубликовано лишнее событие. Если событие опубликовано до фиксирования (commit) изменений в базе данных, то не гарантируется изменение данных. База данных может оказаться недоступной. Как итог, фиксация результатов первого действия может не произойти.

Когда второй сервис получает событие и выполняет второе действие, возможны несколько точек отказа:

  • Событие не доставлено. Второй сервис не получил событие. Если отсутствует гарантия доставки события, то второе действие может остаться невыполненным.

  • Событие обработано неоднократно. Есть система доставки событий с несколькими попытками (retries), требующая от сервиса подтверждения (ack). Если второй сервис выполнил второе действие, но не смог оповестить такую систему об успешной обработке, то может быть предпринята еще одна попытка. Как итог, событие может быть обработано несколько раз.

В любом из этих случаев согласованность данных не гарантируется.

Решение

Примечание:
У проблемы существует несколько решений. Например Two-Phase Commit Protocol. В рамках этой статьи будет рассмотрено альтернативное решение — шаблоны Transactional Outbox и Idempotent Consumer.

В идеальном мире, от механизма доставки и обработки событий требуется:

  • Гарантия публикации события ровно один раз (exactly once).

  • Гарантия доставки события ровно один раз (exactly once).

  • Гарантия обработки события ровно один раз (exactly once).

Мне не известен способ реализации этих гарантий в настоящем мире. Однако, можно использовать другую систему гарантий:

  • Гарантия публикации события хотя бы один раз (at least once)

  • Гарантия доставки события хотя бы один раз (at least once)

  • Гарантия обработки события хотя бы один раз (at least once)

  • Гарантия идемпотентной обработки события (idempotency)

Даже если событие будет опубликовано 100 раз, доставлено 50 раз, а обработано 25 раз, то идемпотентность обработки события (idempotency) позволит гарантировать итоговую согласованность (eventual consistency).

За гарантию итоговой согласованности придется заплатить:

  • В случае отказов сети, сервисов или транзакций, вычислительные ресурсы могут быть потрачены впустую, ведь возможно выполнение работы, вообще не влияющей на состояние системы.

  • Гарантии «хотя бы один раз» (at least once) требуют механизма повторов (retries).

  • Иногда требуется реализовать компенсирующую транзакцию, которая отменит (rollback) первое действие на первом сервисе. Во время допустимой задержки между действиями, второе действие может стать невыполнимым в принципе.

Далее будет рассмотрен механизм доставки, оформленный в виде библиотеки event-outbox.

Технологии

Библиотека имеет ряд конкретных зависимостей и пока не позиционируется как универсальное решение для любого стека. Я попытался взять от используемых технологий максимум, чтобы обеспечить наибольшую полезность при наименьших затратах собственного времени.

MongoDB

MongoDB — это основная СУБД, с которой я работаю последние несколько лет. Некоторые её особенности напрямую повлияли на конечное решение:

  • Transactions — гарантирует атомарную согласованность (transactional consistency) при изменении документов в разных коллекциях.

  • Change Streams — позволяет подписаться на изменения в коллекции и снизить нагрузку на базу данных.

  • Partial TTL indexes — позволяет автоматически удалять опубликованные и обработанные события из базы данных.

Для подключения к базе данных используется асинхронный драйвер motor.

Apache Kafka

Apache Kafka — платформа потоковой передачи событий, с которой я не работал (по состоянию на июнь 2024), но изучал ради интереса. Некоторые её особенности также повлияли на конечное решение:

  • Consumer Groups — позволяет организовать независимую обработку одних и тех же событий в разных группах.

  • Consumer Rebalance Protocol — выдает консюмеру (consumer) эксклюзивные права на обработку событий из партишнов топика (topic partitions) в рамках одной группы консюмеров (consumer group) и автоматическое перераспределение при их подключении / отключении.

  • Manual Offset Management — позволяет гарантировать обработку события хотя бы один раз (at least once) за счет фиксирования смещения (offset commit) непосредственно после обработки события.

  • Custom Partitioner — позволяет использовать собственный алгоритм выбора партишна (partition), в который публикуется событие.

Для публикации и потребления (consume) событий используется асинхронный клиент aiokafka.

Pydantic

Библиотека pydantic используется для описания модели данных (data model) публикуемых событий.

Публикация событий

Transactional Outbox

Transactional Outbox — шаблон, гарантирующий публикацию событий хотя бы один раз (at least once). Суть: отделить намерение (intent) от публикации события.

При выполнении действия, в одной транзакции оказываются:

  • Изменения данных, т.е. результат выполнения действия.

  • Намерения опубликовать события.

После успешной фиксации (commit) такой транзакции, в отдельной коллекции базы данных надежно хранятся намерения опубликовать события. Если произойдет отказ транзакции (abort), то такие намерения просто не будут зафиксированы.

Для публикации событий запускается специальный цикл, читающий документы из коллекции с намерениями и публикующий их в систему обмена сообщениями. После подтверждения публикации, событие помечается опубликованными и больше не публикуется.

В случае отказа системы обмена сообщениями (Kafka), событие останется неопубликованным. Попытки опубликовать событие будут предприниматься до тех пор, пока система обмена сообщениями не станет доступна и не подтвердит публикацию события.

В случае отказа сети, система обмена сообщениями (Kafka) не сможет подтвердить публикацию. При следующей попытке публикации, в системе обмена сообщениями может появиться дубликат события.

При остановке цикла, публикующего события, публикация останавливается до тех пор, пока цикл не будет перезапущен. Так как после публикации, события помечаются в базе данных (MongoDB) опубликованными, цикл может восстановить работу с первого неопубликованного события.

За гарантии доставки и целостность данных придется заплатить увеличением нагрузки на базу данных. Её можно немного уменьшить, задействовав специальные механизмы слежения за изменениями.

Сохранение намерений в базу данных

Начнем с кода:

from contextlib import AbstractAsyncContextManager

from motor.motor_asyncio import AsyncIOMotorClientSession
from pydantic import BaseModel


class Event(BaseModel):
    ...


class EventListener:
    def event_occurred(self, event: Event) -> None:
        ...


class EventOutbox:
    def event_listener(
        self, mongo_session: AsyncIOMotorClientSession
    ) -> AbstractAsyncContextManager[EventListener]:
        ...

Интерфейс EventListener — это синхронный слушатель событий. Он объявляет единственный метод event_occurred, который принимает event — произошедшее событие.

Класс EventOutbox используется как менеджер контекста (context manager) таких слушателей. С помощью метода event_listener можно асинхронно открыть контекст, передав сессию (session) базы данных:

from motor.motor_asyncio import AsyncIOMotorClient

from event_outbox import Event, EventOutbox

async def handler(
    mongo_client: AsyncIOMotorClient,
    outbox: EventOutbox,
) -> None:
    db = mongo_client.get_default_database()
    async with await mongo_client.start_session() as session:
        async with outbox.event_listener(session) as listener:
            await db["collection"].insert_one({}, session=session)
            listener.event_occurred(
                Event(
                    topic="bounded_context",
                    content_schema="EventOccurred",
                )
            )

Сессия базы данных нужна для того, чтобы намерения опубликовать события попали в одну транзакцию с результатом выполнения действия. Таким образом, контекстный менеджер (context manager) EventOutbox.event_listener сам открывает и фиксирует (commit) транзакцию.

Класс Event — это модель данных pydantic. Она используется для представления (serialize/dump) события в json и передачи по сети. Новый класс событий предполагается объявлять наследником класса Event и описывать в нем все данные события:

from typing import Literal

from event_outbox import Event

class EventOccurred(Event):
    topic: Literal["bounded_context"] = "bounded_context"
    content_schema: Literal["EventOccurred"] = "EventOccurred"
    extra_data: int

Изменения вставляются в коллекцию пачкой (insert many) при выходе из контекста:

from contextlib import asynccontextmanager
from typing import Any, AsyncIterator, Mapping

from motor.motor_asyncio import AsyncIOMotorClientSession, AsyncIOMotorCollection

outbox: AsyncIOMotorCollection = ...


class EventListener:
    ...


class EventOutbox:
    mongo_outbox: AsyncIOMotorCollection

    @asynccontextmanager
    async def event_listener(
        self, mongo_session: AsyncIOMotorClientSession
    ) -> AsyncIterator[EventListener]:
        documents: list[Mapping[str, Any]] = ...
        listener: EventListener = ...
        async with self.mongo_session.start_transaction():
          yield listener
          await self.mongo_outbox.insert_many(
              documents,
              session=mongo_session,
          )

Чтение намерений из базы данных

При запуске или перезапуске цикла публикации, все накопившиеся события будут последовательно прочитаны через обычный find и опубликованы.

Когда документов в коллекции не остается, необходимо каким-то образом подождать появления новых. Классическое решение — организовать поллинг (polling) коллекции. Change Streams позволяют реализовать альтернативу поллингу (polling) и снизить нагрузку на базу данных. Можно подписаться на операции insert в коллекции и ждать, когда MongoDB сама оповестит о новом документе.

from typing import Any, AsyncIterator, Mapping

from bson import Timestamp
from motor.motor_asyncio import AsyncIOMotorClientSession, AsyncIOMotorCollection


class EventPublisher:
    mongo_session: AsyncIOMotorClientSession
    mongo_outbox: AsyncIOMotorCollection

    async def subscribe_to_change_stream(
        self, start_at_operation_time: Timestamp
    ) -> AsyncIterator[Mapping[str, Any]]:
        async with self.mongo_outbox.watch(
            [{"$match": {"operationType": {"$in": ["insert"]}}}],
            start_at_operation_time=start_at_operation_time,
            session=self.mongo_session,
        ) as change_stream:
            async for change_event in change_stream:
                yield change_event["fullDocument"]

Эксклюзивные права на публикацию

Consumer Rebalance Protocol позволяет Kafka автоматически назначать (assign) консюмерам (consumer) партишны топиков (topic partition) таким образом, чтобы из партишна одновременно читал только один консюмер группы.

Что именно было назначено консюмеру (consumer), можно узнать во время выполнения:

from aiokafka import AIOKafkaConsumer

kafka_consumer: AIOKafkaConsumer = ...
assignment = kafka_consumer.assignment()

Если консюмер (consumer) и продюсер (producer) запускаются в одном процессе, то способность Kafka назначать партишны консюмерам (consumer) может быть использована для иной цели — предоставления продюсерам (producer) эксклюзивного права на публикацию в партишны (partition):

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from pydantic import BaseModel


class Event(BaseModel):
    topic: str
    partition_key: int


class EventPublisher:
    kafka_consumer: AIOKafkaConsumer
    kafka_producer: AIOKafkaProducer

    async def publish_event(self, event: Event) -> None:
        partition = event.partition_key % len(
            self.kafka_consumer.partitions_for_topic(event.topic)
        )
        assignment = self.kafka_consumer.assignment()
        if any(
            topic_partition.partition == partition
            for topic_partition in assignment
            if topic_partition.topic == event.topic
        ):
            await self.kafka_producer.send_and_wait(
                event.topic,
                event.model_dump_json().encode(),
                partition=partition,
            )

Таким образом, существует возможность запустить несколько параллельных процессов публикации событий и использовать Kafka для разделения работы между ними. Каждый из продюсеров (producuer) публикует события только в выделенные ему партишны топика (topic partitions).

Доставка событий

При попадании события в кластер, доставка хотя бы один раз (at least once) гарантируется самой Kafka.

Обработка событий

Idempotent Consumer

Idempotent Consumer — шаблон, гарантирующий идемпотентность (idempotency) обработки событий. Суть: зафиксировать (commit) факт обработки вместе с результатом обработки.

Когда событие поступает из сети, оно сохраняется в базу данных. Выполняется обработка. В одну транзакцию попадают:

  • Изменение флага у входящего события.

  • Изменения данных, т.е. результат обработки.

  • Намерения опубликовать другие события.

После фиксации (commit) такой транзакции, событие считается обработанным и больше не обрабатывается.

Если по какой-то причине произошла одновременная обработка одного и того же события, то будет зафиксирована только одна транзакция, а вторая будет отменена (abort).

Если необходимо выполнить запрос во внешнюю систему, то рекомендуется передать ключ идемпотентности. Тогда повторная обработка событий гарантировано не приведет к повторному выполнению действия во внешней системе.

Обработчик событий

Для начала рассмотрим интерфейс обработчика событий — протокол EventHandler:

from typing import Protocol

from motor.motor_asyncio import AsyncIOMotorClientSession


class Event:
    topic: str
    content_schema: str


class EventOutbox:
    ...


class EventHandler(Protocol):
    async def __call__(
        self,
        event: Event,
        mongo_session: AsyncIOMotorClientSession,
        /,
    ) -> None:
        pass

С ним совместима функция, принимающая 2 аргумента — обрабатываемое событие и сессию базы данных с открытой транзакцией, в которой необходимо выполнять запросы.

Событие приходит в обработчик как экземпляр класса Event. Библиотека не реализует маршрутизацию (routing) по типам событий. Специального механизма внедрения зависимостей в обработчик тоже нет. Для внедрения экземпляра EventOutbox или любых других зависимостей, можно написать небольшой lambda-адаптер:

from motor.motor_asyncio import AsyncIOMotorClientSession

from event_outbox import Event, EventHandler, EventOutbox


async def event_handler(
    event: Event,
    session: AsyncIOMotorClientSession,
    outbox: EventOutbox,
    answer: int,
) -> None:
    match (event.topic, event.content_schema):
        case ("bounded_context", "EventOccurred"):
            ...


def create_adapter() -> EventHandler:
    outbox: EventOutbox = ...
    return lambda event, session: (
        event_hander(
            event,
            session,
            outbox,
            answer=42,
        )
    )

Эксклюзивные права на обработку

Consumer Rebalance Protocol выдает консюмеру (consumer) в группе эксклюзивные права на обработку событий из назначенных ему партишнов топиков (topic partitions).

Сохранение входящего события в базу данных

Пришедшее из Kafka событие сохраняется в специальную коллекцию входящих событий. MongoDB позволяет использовать словарь в качестве идентификатора, чтобы воспользоваться встроенным (default) уникальным индексом на _id для обеспечения уникальности по нескольким полям.

from motor.motor_asyncio import AsyncIOMotorClientSession, AsyncIOMotorCollection
from pydantic import BaseModel


class Event(BaseModel):
    topic: str
    content_schema: str
    idempotency_key: str


class EventConsumer:
    mongo_inbox: AsyncIOMotorCollection
    mongo_session: AsyncIOMotorClientSession

    async def handle_events(self) -> None:
        while True:
            event: Event = ...
            document_id = event.model_dump(
                mode="json",
                include={"topic", "content_schema", "idempotency_key"},
            )
            await self.mongo_inbox.insert_one(
                {"_id": document_id, "handled": False},
                session=self.mongo_session,
            )
            ...

Оптимистическая блокировка

Перед обработкой события открывается транзакция, в которой необработанное событие помечается обработанным. Фактически, это оптимистическая блокировка по полю handled. При наличии нескольких конкурирующих транзакций, изменяющих флаг handled, зафиксирована (commit) будет только одна.

from datetime import UTC, datetime
from typing import Any, Mapping, Protocol

from motor.motor_asyncio import AsyncIOMotorClientSession, AsyncIOMotorCollection


class Event:
    ...


class EventHandler(Protocol):
    async def __call__(
        self,
        event: Event,
        mongo_session: AsyncIOMotorClientSession,
        /,
    ) -> None:
        pass


class EventConsumer:
    mongo_session: AsyncIOMotorClientSession
    mongo_inbox: AsyncIOMotorCollection
    event_handler: EventHandler

    async def handle_event(self, document_id: Mapping[str, Any], event: Event) -> None:
        async with self.mongo_session.start_transaction():
            result = await self.mongo_inbox.update_one(
                {"_id": document_id, "handled": False},  # <-- Here
                {
                    "$set": {
                        "handled": True,
                        "handled_at": datetime.now(tz=UTC),
                    }
                },
                session=self.mongo_session,
            )
            if result.modified_count:
                await self.event_handler(event, self.mongo_session)

Идемпотентный запрос к внешней системе

Событие Event содержит ключ идемпотентности (idempotency key). Ключ генерируется при создании экземпляра Event как hex-представление UUID4. Этот ключ может использоваться для идемпотентных запросов к внешним системам:

from motor.motor_asyncio import AsyncIOMotorClientSession

from event_outbox import Event


async def send_email(idempotency_key: str) -> None:
    ...


async def event_handler(
    event: Event,
    session: AsyncIOMotorClientSession,
) -> None:
    await send_email(event.idempotency_key)

Подтверждение обработки

Manual Offset Management позволяет вручную управлять смещением (commit offset), чтобы зафиксировать в Kafka факт обработки события непосредственно после обработки. Для этого консюмер создается с флагом enable_auto_commit=False:

from aiokafka import AIOKafkaConsumer


class EventConsumer:
    kafka_consumer: AIOKafkaConsumer

    async def handle_events(self) -> None:
        while True:
            kafka_consumer_record = await self.kafka_consumer.getone()
            ...
            await self.kafka_consumer.commit()  # <-- Here


async def initialize() -> None:
    """
    Your service initialization code
    """
    topics: list[str] = ...
    bootstrap_servers: str = ...
    group_id: str = ...
    async with AIOKafkaConsumer(
        *topics,
        bootstrap_servers=bootstrap_servers,
        group_id=group_id,
        auto_offset_reset="earliest",
        enable_auto_commit=False,  # <-- Here
    ) as kafka_consumer:
        ...

В случае перезапуска цикла обработки событий, цикл продолжит работу с первого необработанного события.

Удаление устаревших событий

Чтобы избежать повторной обработки, необходимо достаточно долго держать в базе данных информацию об обработанных событиях. В связи с этим, в базе данных накапливается большое количество устаревших документов. Для удаления устаревших данных используется Partial TTL indexes. Событие автоматически удаляется из базы данных после публикации или обработки, через заранее определенное время. Это позволяет переложить задачу очистки базы данных на MongoDB:

from datetime import timedelta

from motor.motor_asyncio import AsyncIOMotorCollection


class EventOutbox:
    mongo_outbox: AsyncIOMotorCollection
    mongo_inbox: AsyncIOMotorCollection

    async def create_indexes(self) -> None:
        await self.mongo_outbox.create_index(
            "published_at",
            name="expiration",
            partialFilterExpression={"published": True},
            expireAfterSeconds=timedelta(days=1).total_seconds(),
        )
        await self.mongo_inbox.create_index(
            "handled_at",
            name="expiration",
            partialFilterExpression={"handled": True},
            expireAfterSeconds=timedelta(days=1).total_seconds(),
        )

Инициализация и запуск

Асинхронные циклы публикации в Kafka и обработки событий из Kafka запускаются в одном процессе. Например, их можно запустить вместе с HTTP-фреймворком. Тогда приложение будет состоять из трех основных циклов:

  • Цикл обработки HTTP-запросов.

  • Цикл публикации событий.

  • Цикл идемпотентной обработки событий.

Например, при использовании HTTP-фреймворка FastAPI, инициализацию можно выполнить в lifespan:

from contextlib import asynccontextmanager
from typing import AsyncIterator

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from fastapi import FastAPI
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorClientSession

from event_outbox import Event, EventOutbox


async def event_handler(
    event: Event,
    session: AsyncIOMotorClientSession,
    outbox: EventOutbox,
) -> None:
    ...


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    mongo_client: AsyncIOMotorClient = ...
    kafka_producer: AIOKafkaProducer = ...
    kafka_consumer: AIOKafkaConsumer = ...

    event_outbox = EventOutbox(
        mongo_client,
        kafka_producer,
        kafka_consumer,
    )
    await event_outbox.create_indexes()
    async with event_outbox.run_event_handler(
        lambda event, session: event_handler(
            event,
            session,
            event_outbox,
        )
    ):
        yield

def create_app() -> FastAPI:
    return FastAPI(lifespan=lifespan)

Заключение

Реализация итоговой согласованности (eventual consistency) за счет гарантий доставки и идемпотентной обработки — это мощный механизм, который часто ускользает из виду. Возможность запускать циклы обработки и публикации в одном процессе с HTTP-сервером позволяет интегрировать решение в проект без запуска дополнительных процессов (worker). Использование механизмов отслеживания изменений в базе для ожидания новых событий является альтернативой поллингу (polling) и снижает нагрузку на базу данных. Эксплуатирование механизма назначения партишнов (partition) между консюмерами (consumer) позволяет также разделить работу по публикации событий между продюсерами (producer). Автоматическое удаление устаревших событий из базы данных позволяет снизить затраты на хранение.

В целом, я доволен реализованным решением. Мне очень хотелось бы услышать мнение сообщества. Я первый раз в open-source. Буду рад, если проект окажется полезным.

Если после прочтения у вас остались вопросы, предлагаю ознакомиться с исходным кодом или обсудить в комментариях.

Спасибо за внимание!

© Habrahabr.ru