Как Kafka стала былью

d1z0m1b3kvl5hobn4_4nnxuxc3o.png

Привет, Хабр!

Я работаю в команде Tinkoff, которая занимается разработкой собственного центра нотификаций. По большей части я разрабатываю на Java с использованием Spring boot и решаю разные технические проблемы, возникающие в проекте.

Большинство наших микросервисов асинхронно взаимодействуют друг с другом через брокер сообщений. Ранее в качестве брокера мы использовали IBM MQ, который перестал справляться с нагрузкой, но при этом обладал высокими гарантиями доставки.

В качестве замены нам предложили Apache Kafka, которая обладает высоким потенциалом масштабирования, но, к сожалению, требует практически индивидуального подхода к конфигурированию для разных сценариев. Кроме того, механизм at least once delivery, работающий в Kafka по умолчанию, не позволял поддерживать необходимый уровень консистентности из коробки. Далее я поделюсь нашим опытом конфигурации Kafka, в частности расскажу, как настроить и жить с exactly once delivery.


Гарантированная доставка и не только

Параметры, о которых пойдет речь далее, помогут предотвратить ряд проблем с настройками подключения по умолчанию. Но сначала хочется уделить внимание одному параметру, который облегчит возможный дебаг.

В этом поможет client.id для Producer и Consumer. На первый взгляд, в качестве значения можно использовать имя приложения, и в большинстве случаев это будет работать. Хотя ситуация, когда в приложении используется несколько Consumer«ов и вы задаете им одинаковый client.id, приводит к следующему предупреждению:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Если вы хотите использовать JMX в приложении с Kafka, то это может быть проблемой. Для этого случая лучше всего использовать в качестве значения client.id комбинацию из имени приложения и, например, имени топика. Результат нашей конфигурации можно посмотреть в выводе команды kafka-consumer-groups из утилит от Confluent:
1eqh1zw485osmsizmx6gu9_d8ne.png

Теперь разберем сценарий гарантированной доставки сообщения. У Kafka Producer есть параметр acks, который позволяет настраивать, после скольких acknowledge лидеру кластера необходимо считать сообщение успешно записанным. Этот параметр может принимать следующие значения:


  • 0 — acknowledge не будут считаться.
  • 1 — параметр по умолчанию, необходим acknowledge только от 1 реплики.
  • −1 — необходимы acknowledge от всех синхронизированных реплик (настройка кластера min.insync.replicas).

Из перечисленных значений видно, что acks равный −1 дает наиболее сильные гарантии, что сообщение не потеряется.

Как мы все знаем, распределенные системы ненадежны. Чтобы защититься от временных неисправностей, Kafka Producer предоставляет параметр retries, который позволяет задавать количество попыток повторной отправки в течение delivery.timeout.ms. Так как параметр retries имеет значение по умолчанию Integer.MAX_VALUE (2147483647), количество повторных отправок сообщения можно регулировать, меняя только delivery.timeout.ms.


Движемся к exactly once delivery

Перечисленные настройки позволяют нашему Producer«у доставлять сообщения с высокой гарантией. Давайте теперь поговорим о том, как гарантировать запись только одной копии сообщения в Kafka-топик? В самом простом случае для этого на Producer нужно установить параметр enable.idempotence в значение true. Идемпотентность гарантирует запись только одного сообщения в конкретную партицию одного топика. Предварительным условием для включения идемпотентности являются значения acks = all, retry > 0, max.in.flight.requests.per.connection ≤ 5. Если эти параметры не заданы разработчиком, то автоматически будут выставлены указанные выше значения.

Когда идемпотентность настроена, необходимо добиться того, чтобы одинаковые сообщения попадали каждый раз в одни и те же партиции. Это можно сделать, настраивая ключ и параметр partitioner.class на Producer. Давайте начнем с ключа. Для каждой отправки он должен быть одинаковым. Этого легко добиться, используя какой-либо бизнес-идентификатор из оригинального сообщения. Параметр partitioner.class имеет значение по умолчанию — DefaultPartitioner. При этой стратегии партиционирования по умолчанию действуем так:


  • Если партиция явно указана при отправке сообщения, то используем ее.
  • Если партиция не указана, но указан ключ — выбираем партицию по хэшу от ключа.
  • Если партиция и ключ не указаны — выбираем партиции по очереди (round-robin).

Кроме того, использование ключа и идемпотентной отправки с параметром max.in.flight.requests.per.connection = 1 дает вам упорядоченную обработку сообщений на Consumer. Отдельно стоит помнить, что, если на вашем кластере настроено управление доступом, то вам понадобятся права на идемпотентную запись в топик.

Если вдруг вам не хватает возможностей идемпотентной отправки по ключу или логика на стороне Producer требует сохранения консистентности данных между разными партициями, то на помощь придут транзакции. Кроме того, с помощью цепной транзакции можно условно синхронизировать запись в Kafka, например, с записью в БД. Для включения транзакционной отправки на Producer необходимо, чтобы он обладал идемпотентностью, и дополнительно задать transactional.id. Если на вашем Kafka-кластере настроено управление доступом, то для транзакционной записи, как и для идемпотентной, понадобятся права на запись, которые могут быть предоставлены по маске с использованием значения, хранящегося в transactional.id.

Формально в качестве идентификатора транзакции можно использовать любую строку, например имя приложения. Но если вы запускаете несколько инстансов одного приложения с одинаковым transactional.id, то первый запущенный инстанс будет остановлен с ошибкой, так как Kafka будет считать его зомби-процессом.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Чтобы решить эту проблему, мы добавляем к имени приложения суффикс в виде имени хоста, который получаем из переменных окружения.

Producer настроен, но транзакции на Kafka управляют только областью видимости сообщения. Вне зависимости от статуса транзакции, сообщение сразу попадает в топик, но обладает дополнительными системными атрибутами.

Чтобы такие сообщения не считывались Consumer«ом раньше времени, ему необходимо установить параметр isolation.level в значение read_committed. Такой Consumer сможет читать нетранзакционные сообщения как и раньше, а транзакционные — только после коммита.
Если вы установили все перечисленные ранее настройки, то вы настроили exactly once delivery. Поздравляю!

Но есть еще один нюанс. Transactional.id, который мы настраивали выше, на самом деле является префиксом транзакции. На менеджере транзакций к нему дописывается порядковый номер. Полученный идентификатор выдается на transactional.id.expiration.ms, который конфигурируется на Kafka кластере и обладает значением по умолчанию »7 дней». Если за это время приложение не получало никаких сообщений, то при попытке следующей транзакционной отправки вы получите InvalidPidMappingException. После этого координатор транзакций выдаст новый порядковый номер для следующей транзакции. При этом сообщение может быть потеряно, если InvalidPidMappingException не будет правильно обработан.


Вместо итогов

Как можно заметить, недостаточно просто отправлять сообщения в Kafka. Нужно выбирать комбинацию параметров и быть готовым к внесению быстрых изменений. В этой статье я постарался в деталях показать настройку exactly once delivery и описал несколько проблем конфигураций client.id и transactional.id, с которыми мы столкнулись. Ниже в краткой форме приведены настройки Producer и Consumer.

Producer:


  1. acks = all
  2. retries > 0
  3. enable.idempotence = true
  4. max.in.flight.requests.per.connection ≤ 5 (1 — для упорядоченной отправки)
  5. transactional.id = ${application-name}-${hostname}

Consumer:


  1. isolation.level = read_committed

Чтобы минимизировать ошибки в будущих приложениях, мы сделали свою обертку над spring-конфигурацией, где уже заданы значения для некоторых из перечисленных параметров.

А вот пара материалов для самостоятельного изучения:


© Habrahabr.ru