Повторная обработка событий, полученных из Kafka

zzjflf8yi_hvdyq61jdxs4_nieq.png

Привет, Хабр.

Недавно я поделился опытом о том, какие параметры мы в команде чаще всего используем для Kafka Producer и Consumer, чтобы приблизиться к гарантированной доставке. В этой статье хочу рассказать, как мы организовали повторную обработку события, полученного из Kafka, в результате временной недоступности внешней системы.

Современные приложения работают в очень сложной среде. Бизнес-логика, обернутая в современный технологический стек, работающая в Docker-образе, который управляется оркестратором вроде Kubernetes или OpenShift, и коммуницирующая с другими приложениями или enterprise-решениями через цепочку физических и виртуальных маршрутизаторов. В таком окружении всегда что-то может сломаться, поэтому повторная обработка событий в случае недоступности одной из внешних систем — важная часть наших бизнес-процессов.


Как было до Kafka

Ранее в проекте мы использовали IBM MQ для асинхронной доставки сообщений. При возникновении какой-либо ошибки в процессе работы сервиса полученное сообщение могло быть помещено в dead-letter-queue (DLQ) для дальнейшего ручного разбора. DLQ создавался рядом с входящей очередью, перекладывание сообщения происходило внутри IBM MQ.

Если ошибка имела временный характер и мы могли это определить (например, ResourceAccessException при HTTP-вызове или MongoTimeoutException при запросе в MongoDb), то в силу вступала стратегия повторных вызовов. Вне зависимости от ветвления логики приложения, исходное сообщение перекладывалось или в системную очередь для отложенной отправки, или в отдельное приложение, которое когда-то давно было сделано для повторной отправки сообщений. При этом в заголовок сообщения записывается номер повторной отправки, который привязан к интервалу задержки или к концу стратегии на уровне приложения. Если мы достигли конца стратегии, но внешняя система все еще недоступна, то сообщение будет помещено в DLQ для ручного разбора.


Поиск решения

Поискав в интернете, можно найти следующее решение. Если коротко, то предлагается завести по топику на каждый интервал задержки и реализовать на стороне приложения Consumer«ы, которые будут вычитывать сообщения с необходимой задержкой.

ejfdboofwtebvqtdcrdq8adtxkw.png

Несмотря на большое количество положительных отзывов, оно кажется мне не совсем удачным. В первую очередь потому, что разработчику, кроме реализации бизнес-требований, придется потратить много времени на реализацию описанного механизма.

Кроме того, если на Kafka-кластере включено управление доступами, то придется потратить какое-то время на заведение топиков и обеспечение необходимых доступов к ним. В дополнение к этому нужно будет подбирать правильный параметр retention.ms для каждого из ретрай-топиков, чтобы сообщения успевали повторно отправляться и не пропадали из него. Реализацию и запрос доступов придется повторять для каждого существующего или нового сервиса.

Давайте теперь посмотрим, какие механизмы для повторной обработки сообщения предоставляет нам spring в целом и spring-kafka в частности. Spring-kafka имеет транзитивную зависимость на spring-retry, который предоставляет абстракции для управления разными BackOffPolicy. Это довольно гибкий инструмент, но его значительным недостатком является хранение сообщений для повторной отправки в памяти приложения. Это значит, что перезапуск приложения из-за обновления или ошибки во время эксплуатации приведет к потере всех сообщений, ожидающих повторной обработки. Так как этот пункт критичен для нашей системы, мы не стали рассматривать его дальше.

Сама spring-kafka предоставляет несколько реализаций ContainerAwareErrorHandler, например SeekToCurrentErrorHandler, с помощью которого можно, не смещая offset в случае возникновения ошибки, обработать сообщение позже. Начиная с версии spring-kafka 2.3 появилась возможность задавать BackOffPolicy.

Этот подход позволяет повторно обрабатываемым сообщениям переживать рестарт приложения, но механизм DLQ по-прежнему отсутствует. Именно этот вариант мы выбрали в начале 2019 года, оптимистично полагая, что DLQ не понадобится (нам повезло и он действительно не понадобился за несколько месяцев эксплуатации приложения с такой системой повторной обработки). Временные ошибки приводили к срабатыванию SeekToCurrentErrorHandler. Остальные ошибки печатались в лог, приводили к смещению offset, и обработка продолжалась со следующим сообщением.


Итоговое решение

Реализация, основанная на SeekToCurrentErrorHandler, подтолкнула нас к разработке собственного механизма для повторной отправки сообщений.

Прежде всего мы хотели использовать уже имеющийся опыт и расширить его в зависимости от логики приложения. Для приложения с линейной логикой оптимальным было бы прекратить считывание новых сообщений в течение небольшого промежутка времени, заданного в рамках стратегии повторных вызовов. Для остальных приложений хотелось иметь единую точку, которая бы обеспечивала выполнение стратегии повторных вызовов. В дополнение эта единая точка должна обладать функциональностью DLQ для обоих подходов.

Сама стратегия повторных вызовов должна храниться в приложении, которое отвечает за получение следующего интервала при возникновении временной ошибки.


Остановка Consumer«a для приложения с линейной логикой

При работе с spring-kafka код для остановки Consumer«a может выглядеть примерно так:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

В примере retryAt — это время, когда нужно заново запустить MessageListenerContainer, если он еще работает. Повторный запуск произойдет в отдельном потоке, запущенном в TaskScheduler, реализацию которого тоже предоставляет spring.

Значение retryAt мы находим следующим способом:


  1. Ищется значение счетчика повторных вызовов.
  2. В соответствии со значением счетчика ищется текущий интервал задержки в стратегии повторных вызовов. Стратегия объявляется в самом приложении, для ее хранения мы выбрали формат JSON.
  3. Найденный в JSON-массиве интервал содержит в себе количество секунд, через которое необходимо будет повторить обработку. Это количество секунд прибавляется к текущему времени, образуя значение для retryAt.
  4. Если интервал не найден, то значение retryAt равно null и сообщение отправится в DLQ для ручного разбора.

При таком подходе остается только сохранить количество повторных вызовов для каждого сообщения, которое находится сейчас на обработке, например в памяти приложения. Сохранение счетчика попыток в памяти не критично для этого подхода, так как приложение с линейной логикой не может выполнять обработку в целом. В отличие от spring-retry перезапуск приложения приведет не к потере всех сообщений для повторной обработки, а просто к перезапуску стратегии.

Этот подход помогает снять нагрузку с внешней системы, которая может быть недоступна из-за очень большой нагрузки. Другими словами, в дополнение к повторной обработке мы добились реализации паттерна circuit breaker.

В нашем случае порог ошибки равен всего 1, а чтобы минимизировать простой системы из-за временного сетевого перебоя, мы используем очень гранулярную стратегию повторных вызовов с небольшими интервалами задержки. Это может не подойти для всех приложений группы компаний, поэтому соотношение между порогом ошибки и величиной интервала нужно подбирать, опираясь на особенности системы.


Отдельное приложение для обработки сообщений от приложений с недетерминированной логикой

Вот пример кода, отправляющего сообщение в такое приложение (Retryer), которое выполнит повторную отправку в топик DESTINATION при достижении времени RETRY_AT:


public  void retry(ConsumerRecord record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List
arrayOfHeaders = new ArrayList<>(Arrays.asList(headers.toArray())); updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes); updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes); updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, () -> Integer.toString(record.partition()).getBytes()); if (nonNull(retryAt)) { updateHeader(arrayOfHeaders, COUNTER, counter::getBytes); updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes); updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes); } else { updateHeader(arrayOfHeaders, REASON, ExceptionUtils.getStackTrace(e)::getBytes); updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes); } ProducerRecord messageToSend = new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders); kafkaTemplate.send(messageToSend); }

Из примера видно, что много информации передается в хедерах. Значение RETRY_AT находится так же, как и для механизма повтора через остановку Consumer«a. Помимо DESTINATION и RETRY_AT мы передаем:


  • GROUP_ID, по которому группируем сообщения для ручного анализа и упрощения поиска.
  • ORIGINAL_PARTITION, чтобы постараться сохранить тот же Consumer для повторной обработки. Этот параметр может быть равен null, в таком случае новая partition будет получена по ключу record.key () оригинального сообщения.
  • Обновленное значение COUNTER, чтобы следовать стратегии повторных вызовов.
  • SEND_TO — константа, показывающая, отправить ли сообщение на повторную обработку по достижении RETRY_AT или поместить в DLQ.
  • REASON — причина, по которой обработка сообщения была прервана.

Retryer сохраняет сообщения для повторной отправки и ручного разбора в PostgreSQL. По таймеру запускается задача, которая находит сообщения с наступившим RETRY_AT и отправляет их обратно в партицию ORIGINAL_PARTITION топика DESTINATION с ключом record.key ().

После отправки сообщения удаляются из PostgreSQL. Ручной разбор сообщений происходит в простом UI, который взаимодействует с Retryer по REST API. Основными его особенностями являются переотправка или удаление сообщений из DLQ, просмотр информации об ошибке и поиск сообщений, например по имени ошибки.

Так как на наших кластерах включено управление доступом, необходимо дополнительно запрашивать доступы к топику, который слушает Retryer, и дать возможность Retryer«у писать в DESTINATION топик. Это неудобно, но, в отличие от подхода с топиком на интервал, у нас появляется полноценная DLQ и UI для управления ею.

Бывают случаи, когда входящий топик читают несколько разных consumer-групп, приложения которых реализуют разную логику. Повторная обработка сообщения через Retryer для одного из таких приложений приведет к дубликату на другом. Чтобы защититься от этого, мы заводим отдельный топик для повторной обработки. Входящий и retry-топик может читать один и тот же Consumer без каких-либо ограничений.

bfptu6xsnygcurmxqjkbtj3kay0.png

По умолчанию этот подход не предоставляет возможности circuit breaker«a, однако его можно добавить в приложение с помощью spring-cloud-netflix или нового spring cloud circuit breaker, обернув места вызовов внешних сервисов в соответствующие абстракции. Кроме того, появляется возможность выбора стратегии для bulkhead паттерна, что тоже может быть полезно. Например, в spring-cloud-netflix это может быть thread pool или семафор.


Вывод

В результате у нас получилось отдельное приложение, которое позволяет повторить обработку сообщения при временной недоступности какой-либо внешней системы.

Одним из главных преимуществ приложения является то, что им могут пользоваться внешние системы, работающие на том же Kafka-кластере, без значительных доработок на своей стороне! Такому приложению необходимо будет только получить доступ к retry-топику, заполнить несколько Kafka-заголовков и отправить сообщение в Retryer. Не нужно поднимать никакой дополнительной инфраструктуры. А чтобы уменьшить количество перекладываемых сообщений из приложения в Retryer и обратно, мы выделили приложения с линейной логикой и сделали в них повторную обработку через остановку Consumer.

© Habrahabr.ru