Безболезненная миграция с NATS на Kafka

Привет, Хабр! Меня зовут Максим, я Go-разработчик из Wildberries. Свою дебютную статью я хочу посвятить довольно популярной теме, когда на проекте приходится уходить с одной технологии на другую. Данная статья будет полезна разработчикам, кто активно использует асинхронный способ передачи данных в своих проектах. Статья несет исключительно опыт автора.

В первую очередь заглянем поглубже во внутреннее устройство каждого брокера и особенности при работе с ним.

Nats

NATS — это высокопроизводительный брокер сообщений, написанный на Go. Он создан Дереком Коллисоном, за плечами которого более 20 лет работы над распределенными очередями сообщений. Для работы с сообщениями в NATS используется простой текстовый протокол в стиле публикации/подписки. Клиенты подключаются и взаимодействуют с сервером через обычный сокет TCP/IP, используя небольшой набор протокольных операций, которые заканчиваются новой строкой. В простой схеме это выглядит следующим образом.

5dc976b25963656b2b30ba0f3258721e.png

Так как мы пишем на Go, нас не может не радовать та новость, что NATS написан на Go. Ко всему прочему в NATS в отличие от Kafka (о которой будет идти речь позже) поддерживает параллельную обработку сообщений «из коробки», но детали реализации и степень параллелизма зависят от выбранного режима подписки, настроек и логики приложения.

Клиентская библиотека NATS (например, NATS Go) не накладывает ограничений на то, как сообщения обрабатываются в коде. Вы можете запускать обработку сообщений в отдельных горутинах для повышения параллелизма.

Одна из важных конфигураций, которая позволяет быстро обрабатывать сообщений это MaxInflight. MaxInflight — это одна из настроек в библиотеке NATS Go, которая используется для управления потоковыми данными при подписке на темы в NATS (распределённой системе обмена сообщениями). Она задаёт максимальное количество неподтверждённых сообщений (то есть сообщений, полученных подписчиком, но ещё не подтверждённых), которые сервер может отправить подписчику.

Из этого вытекает следующая особенность натса, отличающая его от кафки. Acknowledgment system — система подтверждений в NATS используется для обеспечения надежной доставки сообщений. Это позволяет гарантировать, что каждое сообщение будет обработано, даже если подписчик временно недоступен или сбойнул. Таким образом нам нет необходимости сдвигать оффсеты как в Apache Kafka, что делает NATS удобным при обработки сообщений, нам не нужно задумываться, когда сдвигать offset, мы просто ack’аем его.##

Как работает система Ack?

  1. Доставка сообщения:

    • Сервер отправляет сообщение подписчику.

    • Если установлен AckWaitTimeout, сервер начинает отсчитывать таймер.

  2. Ожидание подтверждения:

  3. Обработка подтверждения:

  4. Повторная доставка:

    • Если подтверждение не получено, сервер повторно отправляет сообщение, либо подписчику, либо другому клиенту в группе подписки.

Kafka

Наконец, перейдем к технологии Apache Kafka. Kafka, изначально разработанная в 2011 году в компании LinkedIn, на сегодняшний день обеспечивает высокую надежность и масштабируемость, позволяя хранить огромные объемы данных. Kafka предоставляет высокопроизводительную шину сообщений, которая позволяет обрабатывать все проходящие через нее данные в реальном времени, даже при экстремально больших нагрузках.

Простейшая схема работы Kafka показана на картинке ниже.

6c795edf5b30f6dcf345800873098059.png

В целом уже по картинке можно понять один нюанс работы с Kafka — очередность считывания сообщений. Для нас было важно, чтобы все полученные сообщения обрабатывались в том же порядке, в котором они были созданы, а Kafka это гарантирует только внутри одного брокера. Также немаловажной проблемой для нас стала невозможность реализации из коробки параллельного чтения, такое чтение ограничивалось количеством созданных партиций внутри Kafka.

Поставленная задача

После чтения из топика или сабджекта необходимо выполнить достаточно медленный сетевой запрос. При этом запрос не всегда проходит успешно и может возвращать ошибку 500, что связано с особенностями области, в которой я работаю.

При работе с NATS о смещениях (оффсетах) задумываться не пришлось. Мы просто настроили MaxInflight 200 и пошли выполнять операции в 200 горутини запустили обработку в 200 горутинах. Каждая из них самостоятельно решает, ack’ать или нет.

Работа с Kafka оказалась сложнее, чем ожидалось. Например, создать 200 партиций сразу оказалось невозможным. Кроме того, нельзя сместить оффсет, пока запрос не обработан корректно, иначе сообщение будет перезаписано и потеряно. Однако и слишком долго задерживаться на обработке сообщения нельзя, так как это может привести к затягиванию процесса или полной остановке, особенно если сообщение было отправлено с ошибкой.

Покопавшись в интернете я выделил два решения, которые больше всего для меня подходили.

Retry topic

Что происходит, если условия, необходимые для обработки события, недоступны, когда приложение пытается его обработать? Например, представьте приложение, которое обрабатывает запросы на покупку товаров. Цена товара может быть рассчитана другим приложением и может отсутствовать в момент получения запроса.

Добавление топика повторных попыток дает возможность обработать большинство событий сразу, а другие события откладываются до тех пор, пока не будут выполнены необходимые условия. В приведенном примере, если цена товара недоступна, событие направляется в тему повторных попыток. Это восстановимое условие не следует рассматривать как ошибку, а нужно периодически пытаться обработать событие до тех пор, пока условия не будут выполнены.

6c8044b2d4ae78d70c455eaa9f25d427.png

Это упрощенная схема взаимодействия с данной проблемой. Мы читаем сообщения, при получении ошибки, отправляем его в Retry Topic, из которого будет читать соответсвующий бизнес-сервис из сервисного слоя приложения.

Такое решение принимается, но все еще не решен вопрос упорядоченной обработки сообщений, также не совсем удобно отслеживать обработанные и необработанные сообщения, но здесь можно и обойтись логами.

Transactional Inbox

И вот решение, которое мне показалось идеальным в данной ситуации. Паттерн микросервисов Inbox. На первом шаге мы создаем таблицу, которая будет служить в качестве «входящего ящика» для наших сообщений. После того как новое сообщение поступает, мы не начинаем его обработку сразу, а лишь вставляем сообщение в таблицу и сдвигаем offset в Kafka. При правильной настройке таблицы, запросов в БД и обработки сообщений консьюмером, такая реализация с 99% вероятностью будет стабильно читать и записывать данные в нашу таблиц inbox. Затем фоновый процесс по мере удобства извлекает записи из инбокса и запускает их обработку. По завершении работы, если наше событие обработано успешно, то происходит commit и запись удаляется, в противном случае, сообщение будет помечено ошибкой и в таблице бд обновится запись.

601766228e0e045970019d7fd7b6a17b.png

Пропускную способность можно улучшить за счет увеличения параллелизма. Когда несколько воркеров одновременно читают инбокс, важно не забывать блокировать строки, которые уже были забраны другими воркерами, данную логику можно воспроизвести благодаря locked_until атрибуту.

Паттерн Inbox может быть очень полезен, когда важен порядок сообщений. Иногда порядок гарантируется системой обмена сообщениями (например, Kafka с включенной конфигурацией идемпотентности), но это не всегда так для каждого брокера. Точно так же HTTP-запросы могут перемешиваться, если клиент не отправляет их по порядку в одном потоке.

К счастью, если сообщения содержат монотонно увеличивающийся идентификатор, порядок можно восстановить в процессе обработки, когда воркер читает данные из инбокса. Он может обнаружить отсутствующие сообщения, подождать их поступления и затем обработать их в правильной последовательности.

Как я внедрял Inbox

Для начала определимся с сущностями. Сущность, с которой будет работать consumer и inbox worker назовем Event.

type Event struct {
  	ID        int64     `db:"id"`
	Payload   []byte    `db:"payload"`
	CreatedAt time.Time `db:"created_at"`
	Success   bool      `db:"-"`
	Err       string    `db:"err"`
}

Эта сущность будет обозначать событие, пришедшие к нам из брокера.

Далее консьюмеру необходимо обработать сообщение из Kafka и положить его в инбокс-таблицу. Для этого я написал следующий код.

type EntityHandler struct {
	logger  *slog.Logger
	service *api_service.ApiService
}

func (h *EntityHandler) Receive(ctx context.Context, events ...cloudevents.Event) error {
	entities, err := eventToEntity(events...)
	if err != nil {
		return err
	}

	err = h.service.SaveEntities(ctx, entities)
	if err != nil {
		return err
	}

	return nil
}

Переменные и структуры названы entity это дженерик название, чтобы для необходимых доменов можно было подменить имя entity на требующее название для бизнес-логики приложения. После записи сущностей в инбокс, начинается основной процесс их обработки.

Процесс обработки в общем случае будет выглядеть следующим образом.

type EventProcessor struct {
	logger  *slog.Logger
	name    string
	handler Handler
}

type Handler interface {
	Get(ctx context.Context) ([]Event, error)
	Process(ctx context.Context, events []Event) (processedEvents []Event, err error)
	Complete(ctx context.Context, events []Event) error
	CompleteFailed(ctx context.Context, events []Event) error
}

func (p EventProcessor) Start(ctx context.Context) error {
	events, err := p.handler.Get(ctx)
	if err != nil {
		return err
	}

	if len(events) == 0 {
		return ErrPause
	}

	events, err = p.handler.Process(ctx, events)
	if err != nil {
		p.logger.Error(
			fmt.Sprintf("[%s] failed to process event", p.name),
			slog.Any("err", err),
		)
	}

	processedEvents := make([]Event, 0)
	failedEvents := make([]Event, 0)

	for _, event := range events {
		if event.Success {
			processedEvents = append(processedEvents, event)

			continue
		}

		failedEvents = append(failedEvents, event)
	}

	if len(processedEvents) != 0 {
		err = p.handler.Complete(ctx, processedEvents)
		if err != nil {
			return err
		}
	}

	if len(failedEvents) != 0 {
		err = p.handler.CompleteFailed(ctx, failedEvents)
		if err != nil {
			return err
		}
	}

	return nil
}

Вся суть этого подхода заключается в том, что независимо от используемой технологии межсервисного взаимодействия, мы можем гибко добавлять воркеров в метод Process по мере необходимости, а также использовать систему ack для удобного коммита как успешно обработанных, так и ошибочных сообщений. Для дополнительного ускорения чтения из Kafka можно реализовать чтение сообщений батчами, что позволит снизить количество сетевых вызовов для получения данных.

Вывод

В процессе перехода с NATS на Kafka, я столкнулся с рядом трудностей, связанных с особенностями работы с разными брокерами, но, к счастью, решение не заставило себя долго ждать. Использование таблицы Inbox, с одной стороны, позволяет гибко обрабатывать сообщения в порядке их поступления, а с другой — предоставляет механизмы для гарантированной доставки и обработки сообщений (at least once) с учетом возможных ошибок или временных задержек.

Этот подход дает несколько преимуществ:

  1. Гибкость и масштабируемость: Мы можем настроить количество воркеров, параллельность обработки и условия для retry, что позволяет эффективно управлять нагрузкой.

  2. Управление порядком сообщений: В случае с важностью порядка сообщений, мы можем гарантировать правильную последовательность их обработки, несмотря на возможные сбои.

  3. Управление ошибками: Благодаря механизму ack и retry, система становится устойчивой к временным ошибкам, не теряя данных.

© Habrahabr.ru