Как настроить обмен сообщениями с помощью брокера сообщений Kafka
Привет, меня зовут Вячеслав Бенедичук, я лид группы разработки в отрасли FMCG и наставник на курсе «Архитектура программного обеспечения» в Яндекс Практикуме.
Сейчас в промышленных IТ-системах часто требуется асинхронное взаимодействие между сервисами, но начинающим разработчикам бывает трудно разобраться с тем, как его организовать.
Есть разные варианты решения данной задачи. В этой статье я расскажу об одном из них — как организовать обмен сообщениями с использованием брокера сообщений Kafka, и дам примеры кода. Статья будет полезна тем, кто хочет быстро разобраться с использованием брокера Kafka и включить его в своё решение.
Что такое очереди сообщений и зачем они нужны
В IT все процессы так или иначе крутятся вокруг обработки, передачи или хранения информации. Достаточно часто встречаются задачи обработки большого объёма последовательных сообщений. Это может быть информация из биллинговой системы, поток биржевых транзакций, телеметрия, поступающая с бортовой системы дрона, или данные от миллиона датчиков погоды.
Поток данных может меняться со временем. Могут возникать пиковые нагрузки или спады. При использовании только API, который обрабатывает данные в реальном времени, инфраструктура может быть не готова к обработке таких скачков.
Конечно, мы можем развернуть мощное оборудование, которое сможет справиться с пиковой нагрузкой. Но если такая нагрузка будет возникать всего на несколько часов или минут в день, то большую часть времени мы будем иметь простаивающее дорогое оборудование.
Если же мы поставим более слабое оборудование, то время отклика системы под нагрузкой может сильно возрасти или система в принципе может перестать принимать запросы. Это может привести к потере данных.
Это как раз сценарий, когда очень удобны очереди сообщений. Они выступают как временное хранилище — буфер для информации, а также как транспортная система от одного сервиса к другому.
Отправители могут записывать данные в очереди, а получатели — получать их и обрабатывать с максимально возможной скоростью. Даже если в какой-то момент объём записываемых данных будет превышать обрабатываемые, это не приведёт к потере данных. Данные будут сохранены в очереди, и, когда входящая нагрузка спадёт, получатели смогут разобрать избыточный поток. Как результат, это даёт максимально использовать возможности оборудования для обработки в периоды пиковой нагрузки.
Да, очередь тоже имеет ограничения по объёму сохраняемых данных и её можно переполнить, поэтому нужно контролировать рост очереди сообщений. Но обычно лимит для переполнения очереди значительно больше возможностей сервисов по обработке информации в реальном времени.
Конечно, для этих целей можно использовать и другие решения: традиционные базы данных, хранилища типа «ключ — значение», кэши в памяти. Однако такие решения сфокусированы на хранении, а не на передаче данных и обычно требуют несколько более сложную логику для реализации такого сценария.
Что такое Kafka
Kafka — это распределённый программный брокер сообщений. Эта система поддерживается Apache Foundation, имеет открытый код, множество различных интеграций и широко распространена на рынке.
Благодаря своей высокой пропускной способности Kafka позволяет поддерживать очереди сообщений, способные пересылать более 1 млн сообщений в секунду. Это делает её отличным инструментом для потоковой обработки данных, многоэтапных конвейеров обработки данных и реализации event sourcing.
Конечно, Kafka — это не единственный брокер сообщений на рынке. Есть различные готовые реализации от облачных провайдеров. Есть варианты SaaS. Есть RabbitMQ и другие варианты брокеров, решающих ту же задачу. Даже в Redis есть минимум три способа организовать потоковый обмен сообщениями.
Но это темы для отдельных статей, которые, возможно, ещё появятся или уже есть на Хабре, а сегодня мы поговорим именно о Kafka.
Основные концепции Kafka
С точки зрения программиста, работающего с Kafka, устроена она достаточно просто. Есть несколько фундаментальных сущностей и понятий:
Producer (производитель). Как следует из названия, это сервис, который производит сообщения и отправляет их в очередь.
Consumer (потребитель). Сервис, который получает сообщения из очереди.
Topic (тема). Кластер Kafka может обслуживать десятки или сотни очередей, каждая со своими собственными потребителями и производителями, обменивающимися различными типами сообщений.
Для того чтобы различать эти очереди, было введено понятие «тема». Каждая тема — это именованная очередь сообщений (лог сообщений). Обычно для каждого типа сообщений заводится отдельная очередь.
Consumer group (группа потребителей). Обычно каждый потребитель получает сообщения независимо от других, но достаточно часто возникает необходимость, чтобы сообщения обрабатывались несколькими потребителями параллельно, при этом каждое сообщение обрабатывалось только один раз.
Группы потребителей и разделы (описаны далее) предназначены для использования в таких сценариях. Группа потребителей объединяет несколько потребителей, получающих сообщения из одной темы, и позволяет распределить нагрузку между ними.
Partition (раздел). В Kafka каждый топик делится на разделы. Количество разделов может варьироваться от одного до сотен. В одном из блогов мне встречалась цифра, что Kafka может поддерживать до 200 000 разделов на кластер.
Каждый раздел содержит некоторую часть сообщений темы. Это позволяет балансировать нагрузку внутри кластера и, совместно с группами потребителей, распределять работу между потребителями.
Для повышения отказоустойчивости разделы могут быть реплицированы между несколькими узлами в кластере. В этом случае один из узлов становится лидером раздела, он отвечает на запросы потребителей и производителей, но данные копируются на другие узлы.
В случае сбоя лидера производители и потребитель переключаются на другой узел, который становится новым лидером. Каждый раздел читается только одним потребителем, но при этом потребитель может получать данные из нескольких разделов.
Структура кластера Kafka
Kafka разворачивается в виде кластера. Обычно в рабочем окружении используется кластер как минимум из трёх узлов, но для локального тестирования одного узла может быть вполне достаточно. Такие узлы кластера называются брокерами.
Также ранее каждый кластер включал в себя сервис ZooKeeper, который отвечал за целостность кластера, хранил информацию о брокерах, расположении тем и разделов, но в последних версиях Kafka он признан устаревшим и будет полностью удалён.
Новый способ управления называется Kafka Raft или KRAFT. В новых версиях Kafka каждый инстанс может работать и как рядовой брокер, и одновременно в режиме контроллера.
Эти контроллеры общаются между собой по протоколу KRAFT метаинформацией о структуре кластера. При этом один из брокеров становится лидером, управляющим кластером. Роль лидера может перейти к другому брокеру в случае падения предыдущего лидера.
При планировании развёртывания нового кластера лучше сразу использовать KRAFT, так как он обеспечивает значительно большую скорость восстановления кластера в случае сбоя. Также он не требует развёртывания дополнительных сервисов, как в случае Zookeeper.
Структура кластера Kafka при использовании KRAFT. Пример развёртывания шести узлов Kafka: три совмещают роли брокера и контроллера, оставшиеся три выступают исключительно в роли брокера
Чтобы подключиться к кластеру и отправить/получить сообщение, потребители и производители должны получить информацию о темах и разделах. В варианте с Zookeeper единственным источником данной информации является сервис Zookeeper. В случае использования протокола KRAFT любой брокер в кластере может предоставить данную информацию клиенту.
Два варианта подключения потребителей к разделам очереди
В Consumer Group 1 два потребителя подключены к трём разделам. Так как потребителей меньше, чем разделов, то одному из потребителей приходится обрабатывать сообщения из двух разделов.
В Consumer Group 2 ситуация противоположна. Четыре потребителя подключаются к трём разделам. При этом у Kafka есть ограничение. Из одного раздела в рамках одной группы может читать только один потребитель. Поэтому потребитель, которому не досталось раздела, будет простаивать. Однако если один из работающих потребителей по какой-то причине отключится, простаивающий потребитель подключится вместо него, выполняя роль горячей подмены в группе.
Немного практики
Запуск Kafka в Docker
Если вам надоело читать и захотелось поделать что-то руками, давайте попробуем поработать с Kafka. Это очень просто. Для начала вам потребуется Docker.
Для тестирования вы можете скачать файл из моего репозитория и запустить его командой `docker compose up -d`. Этот файл запускает кластер, состоящий из шести инстансов Kafka: три работают в режиме брокера и контроллера, ещё три — только в режиме брокера.
Разберём этот файл более подробно. Брокеры и контроллеры имеют очень много общих конфигураций, поэтому конфигурации выполнены в виде шаблонов.
Шаблон `x-common-variables` содержит список переменных окружения, общих для всех сервисов.
KAFKA_KRAFT_CLUSTER_ID — определяет идентификатор кластера. Его можно задавать статически либо можно генерировать на лету, но это вне скоупа этой статьи.
KAFKA_CONTROLLER_QUORUM_VOTERS — содержит список контроллеров, которые участвуют в выборах контроллера-лидера.
KAFKA_CONTROLLER_LISTENER_NAMES — Kafka реализует концепцию слушателей, где каждый слушатель имеет уникальное имя и протокол обмена. Данный параметр определяет имя слушателя, через которое происходит взаимодействие с контроллерами.
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP — задаёт полный список слушателей и соответствующих им протоколов обмена.
External — определяет протокол для подключения внешних клиентов. В реальной жизни обычно используется SSL/TSL, но для примера мы используем нешифрованный вариант.
Internal — используется для подключений внутри кластера. Чаще всего в реальных системах используется plain text, но в случае если требования к безопасности повышены, то также может использоваться SSL.
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR — определяет количество реплик информации о текущем смещении в группах потребителей. При использовании одного или двух инстансов нужно выставить в 1 или 2 соответственно, если больше — можно оставить по умолчанию (3) или задать необходимое значение с точки зрения отказоустойчивости системы.
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR — определяет количество реплик для топика транзакций.
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR — минимальное количество реплик для топика транзакций, которые должны быть засинхронизированы, чтобы транзакция была признана успешной.
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS — при первом подключении нового потребителя к кластеру Kafka происходит перераспределение подключений к разделам между потребителями. Данный параметр задаёт задержку между подключением нового потребителя и началом этого процесса.
KAFKA_NUM_PARTITIONS — определяет количество разделов по умолчанию для новых тем.
KAFKA_DEFAULT_REPLICATION_FACTOR — определяет, сколько реплик тем будет создаваться по умолчанию.
Также есть 2 шаблона, специфичных для контроллеров (x-common-controller-variables) и брокеров (x-common-broker-variables).
KAFKA_PROCESS_ROLES — определяет роль инстанса: для выделенных брокеров используем значение `broker`, для брокеров/контроллеров —`broker, controller`.
KAFKA_LISTENERS — определяет список слушателей для данного инстанса и их порты. Для всех инстансов используем INTERNAL и EXTERNAL, а для контроллеров дополнительно CONTROLLER.
KAFKA_ADVERTISED_LISTENERS — определяет, какие адреса слушателей будут предоставляться пользователю. У нас тестовый пример, поэтому имена не важны, но в реальной жизни вам может потребоваться указать имя хоста, на котором крутится брокер, или DNS-имя для внешнего интерфейса. Возможно, я вернусь к этой теме в следующих статьях.
Ручной тест развёртывания
Если вы уже запустили кластер Kafka с помощью предложенного docker-compose файла, самое время попробовать его в действии.
Подключитесь к любому из брокеров. Например, к первому: `docker exec -it broker1 bash`
Перейдите в папку с инструментами командной строки Kafka: `cd /opt/kafka/bin/`
Создайте топик `test-1`: ` ./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-1`
Проверьте результат его создания: `./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test-1` Вы должны увидеть список партиций данного топика, список реплик для каждой партиции и их состояние.
Теперь отправьте сообщения в топик `./kafka-console-producer.sh --topic test-1` Эта команда позволит вам ввести одно или несколько сообщений. Каждое отдельное сообщение завершается по кнопке Enter. Чтобы завершить ввод сообщений, нажмите Ctrl+C.
Получите отправленные сообщения: `./kafka-console-consumer.sh --topic test-1 --from-beginning ` Эта команда прочитает все сообщения из очереди `test-1` и будет ждать поступления новых. Завершите её выполнение с помощью Ctrl+C.
Если вы в итоге увидели свои отправленные сообщения — поздравляю, ваш кластер Kafka работает.
Теперь давайте напишем пару микросервисов, которые будут взаимодействовать с вашим кластером.
Пример микросервиса
Consumer (Потребитель)
В реальной жизни отправка и получение сообщений делается из программного кода. Давайте для примера посмотрим, как это делается на .net.
Полный код примера на GitHub
Вся логика, относящаяся к Kafka, находится в общей библиотеке в папке Logic. Логика потребителя находится в классе Subscriber, в методе SubscribeAsync.
Чтобы начать получать сообщения из очереди, код создает экземпляр класса Consumer из библиотеки Kafka, а затем подписывается на список топиков, из которых необходимо получить сообщения.
Затем, в цикле, код получает следующее сообщение и обрабатывает его.
Обратите внимание на закомментированную строку 52 — `consumer.Commit ();` и на закомментированные строки 34 и 35 в данном методе.
Kafka поддерживает два способа отслеживания, какие сообщения были доставлены, а какие нет.
Auto-Commit. В этом режиме клиентская библиотека Kafka автоматически отмечает, до какой позиции в очереди сообщения обработаны.
При создании клиента можно задать параметр `AutoCommitIntervalMs`. В этом случае клиент будет отмечать текущую позицию в очереди как обработанную с выбранной периодичностью.
Такой подход ускорит обработку отдельных сообщений, но если вдруг ваш клиент перезапустится, есть шанс, что вы опять получите сообщения, которые были обработаны, но попали в интервал между автоматическими коммитами.
Принимайте это во внимание при проектировании логики.
Явный коммит. В этом режиме вы явно вызываете метод `consumer.Commit ();`. Этот вызов может слегка замедлить выполнение кода, но при этом результат будет более предсказуем.
По умолчанию режим автоматических коммитов включен и настроен интервал в 5000 миллисекунд, но вы можете настроить эти параметры в зависимости от ваших потребностей.
Producer (производитель)
Логика производителя реализована в классе Producer. Она достаточно проста. В конструкторе вы получаете настройки и создаёте экземпляр интерфейса IProducer
Метод SendAsync вызывает метод ProduceAsync и ожидает его завершения.
Также стоит обратить внимание, что интерфейс IProducer
Эта статья даёт пример использования Kafka в .NET Core. Однако вы можете использовать практически любой язык для работы с Kafka и даже создавать гетерогенные решения, в которых разные сервисы будут написаны на разных языках программирования. Хотя гетерогенные решения сложнее в поддержке, иногда они могут быть очень полезны в работе.