[Из песочницы] Подписываемся на Kafka по HTTP или как упростить себе Веб-хуки

Существует множество способов обработки сообщений из Pub-Sub систем: использование отдельного сервиса, выделение изолированного процесса, оркестрация пулом процессов/потоков, сложные IPC, Poll-over-Http и многие другие. Сегодня я хочу рассказать о том, как использовать Pub-Sub по HTTP и про свой сервис, написанный специально для этого.

Использование готового HTTP -бэкенда сервисов в некоторых случаях является идеальным решением для обработки очереди сообщений:

  1. Балансировка из коробки. Обычно, бэкенд и так стоит за балансировщиком и имеет готовую к нагрузкам инфраструктуру, что сильно упрощает работу с сообщениями.
  2. Использование обычного REST-контроллера (любой HTTP-ресурс). Потребление сообщений по HTTP сводит к минимуму затраты на реализацию консюмеров под разные языки, если бэкенд разношерстный.
  3. Упрощение использования Веб-хуков других сервисов. Сейчас почти каждый сервис (Jira, Gitlab, Mattermost, Slack…) так или иначе поддерживает Веб-хуки для взаимодействия с внешним миром. Можно облегчить жизнь, если научить очередь выполнять функции HTTP-диспатчера.


Этот подход имеет и минусы:

  1. Можно забыть о легковесности решения. HTTP тяжёлый протокол, а использование фреймворков на стороне консюмера мгновенно приведёт к увеличению задержки (latency) и нагрузки.
  2. Лишаемся сильных сторон Poll-подхода, получая слабые стороны Push.
  3. Обработка сообщений теми же инстансами сервиса, которые обрабатывают клиентов, может сказаться на отзывчивости. Это несущественно, так как лечится балансировкой и изоляцией.


Я реализовал идею в виде сервиса Queue-Over-Http, о котором и пойдёт речь далее. Проект написан на Kotlin с использованием Spring Boot 2.1. В качестве брокера сейчас доступна только Apache Kafka.
Далее в статье подразумевается, что читатель знаком с Kafka и знает про коммиты (commit) и оффсеты (offset) сообщений, принципы групп (group) и консюмеров (consumer), а также понимает, чем партиция (partition) отличается от топика (topic). Если есть пробелы, советую ознакомиться с этим разделом документации по Kafka перед продолжением чтения.
Queue-Over-Http представляет из себя сервис, который выступает посредником между брокером сообщений и конечным HTTP-консюмером (сервис позволяет легко реализовать поддержку отправки сообщений консюмерам любым другим способом, например, различными *RPC). На данный момент доступны только операции подписки, отписки и просмотра списка консюмеров Отправка сообщений брокеру (produce) по HTTP пока не реализована в силу невозможности гарантировать порядок сообщений без специальной поддержки со стороны продюсера.

Ключевой фигурой сервиса является консюмер, который может подписаться как на конкретные партиции, так и просто на топики (паттерн топика поддерживается). В первом случае выключается автобаланс партиций. После подписки, указанный HTTP-ресурс начинает получать сообщения из назначенных партиций Kafka. Архитектурно каждый подписчик ассоциируется с нативным Java-клиентом Kafka.

занимательная история про KafkaConsumer
У Kafka есть замечательный Java-клиент, который умеет многое. Его использую в адаптере очереди для получения сообщений от брокера и дальнейшей отправки в локальные очереди сервиса. Стоит оговориться, что клиент работает исключительно в контексте одного потока.

Идея адаптера простая. Запускаем в одном потоке, пишем простейший планировщик нативных клиентов, делая упор на уменьшение latency. То есть пишем что-то похожее:

while (!Thread.interrupted()) {
    var hasWork = false

    for (consumer in kafkaConsumers) {
        val queueGroup = consumers[consumer] ?: continue
        invalidateSubscription(consumer, queueGroup)

        val records = consumer.poll(Duration.ZERO)

        /* здесь раскидываем в локальные очереди */

        if (!records.isEmpty) {
            hasWork = true
        }
    }

    val committed = doCommit()

    if (!hasWork && committed == 0) {
        // засыпаем, если нечего делать
        Thread.sleep(1)
    }
}


Казалось бы, всё замечательно, latency минимальный даже при наличии десятков консюмеров. На практике получилось, что KafkaConsumer к такому режиму эксплуатации совершенно не готов и даёт allocation rate около 1.5 МБ/сек в простое. При 100 консюмерах allocation rate достигает 150 МБ/сек и заставляет GC чаще вспоминать о приложении. Конечно, весь этот мусор находится в young области, GC вполне справляется с этим, но всё же, решение не идеально.

Очевидно, нужно идти типичным для KafkaConsumer путём и каждого подписчика размещаю теперь в своём потоке. Это даёт оверхед по памяти и диспетчеризации, но другого выхода нет.

Переписываю код сверху, убирая внутренний цикл и меняя Duration.ZERO на Duration.ofMillis(100). Получается хорошо, allocation rate падает до приемлемых 80–150 КБ/сек на одного консюмера. Однако, Poll с таймаутом в 100 мс задерживает всю очередь коммитов на эти самые 100 мс, а это неприемлемо много.

В процессе поиска решений проблемы вспоминаю про KafkaConsumer::wakeup, который бросает WakeupException и прерывает любую блокирующую операцию на консюмере. С этим методом путь к low-latency прост: когда приходит новый запрос на коммит, кладём его в очередь, а на нативном консюмере вызываем wakeup. В рабочем цикле ловим WakeupException и идём коммитить то, что накопилось. За передачу управления с помощью исключений нужно сразу давать по рукам, но раз уж по-другому никак…

Оказывается, и этот вариант далёк от совершенства, так как любая операция на нативном консюмере теперь выкидывает WakeupException, в том числе, сам коммит. Обработка этой ситуации захламит код флагом, разрешающим делать wakeup.

Прихожу к выводу, что было бы неплохо модифицировать метод KafkaConsumer::poll, чтобы он мог прерываться штатно, по дополнительному флагу. В итоге, был рождён франкенштейн из рефлексии, который в точности копирует оригинальный метод poll, добавляя выход из цикла по флагу. Этот флаг устанавливается отдельным методом interruptPoll, который, к тому же, на селекторе клиента вызывает wakeup, чтобы снять блокировку потока на операции ввода-вывода.

Реализовав таким образом клиент, получаю скорость реакции с момента поступления запроса на коммит до его обработки до 100 микросекунд, и отличный latency на выборку сообщений из брокера, что вполне устраивает.


Каждая партиция представлена отдельной локальной очередью, куда адаптер пишет сообщения из брокера. Воркер забирает из неё сообщения и отдаёт их на исполнение, то есть, на отправку по HTTP.

Сервис поддерживает пакетную обработку сообщений для увеличения пропускной способности. При подписке можно указать concurrencyFactor каждого топика (распространяется на каждую назначенную партицию независимо). Например, concurrencyFactor=1000 означает, что одновременно могут быть отправлены потребителю 1000 сообщений в виде HTTP-запросов. Как только все сообщения из пачки были однозначно отработаны консюмером, сервис принимает решение об очередном коммите оффсета последнего по порядку сообщения в Kafka. Отсюда второе значение concurrencyFactor — максимальное число повторно обработанных сообщений потребителем в случае падения Kafka или Queue-Over-Http.

Для уменьшения задержек очередь имеет loadFactor = concurrencyFactor * 2, что позволяет считывать из брокера в два раза больше сообщений, чем может быть отправлено. Так как автокоммит на нативном клиенте отключен, такая схема не нарушает гарантий At-Least-Once.
Высокое значение concurrencyFactor увеличивает пропускную способность очереди (throughput) за счёт уменьшения количества коммитов, которые занимают до 10 мс в худшем случае. При этом, повышается нагрузка на потребителя.

Очерёдность отправки сообщений в рамках пачки не гарантирована, но её можно достигнуть, если установить concurrencyFactor=1.


Коммиты — важная часть работы сервиса. Когда очередная пачка данных готова, оффсет последнего сообщения из пачки тут же коммитится в Kafka, и только после успешного коммита становится доступна для обработки следующая пачка. Часто этого недостаточно и требуется автокоммит. Для этого существует параметр autoCommitPeriodMs, который имеет мало общего с классическим периодом автокоммита у нативных клиентов, которые коммитят последнее прочитанное из партиции сообщение. Представим, что concurrencyFactor=10. Сервис отослал все 10 сообщений и ждёт готовности каждого из них. Первым завершается обработка сообщения 3, потом сообщения 1, а затем, сообщения 10. В этот момент наступает время автокоммита. Важно не нарушить At-Least-Once семантику. Поэтому, можно коммитить только первое сообщение, то есть, оффсет 2, так как только оно на этот момент успешно обработано. Далее, до следующего автокоммита обрабатываются сообщения 2, 5, 6, 4, и 8. Теперь необходимо коммитить только оффсет 7, и так далее. Автокоммит почти не влияет на пропускную способность.
В штатном режиме работы сервис отправляет сообщение консюмеру один раз. Если по каким-то причинам оно вызвало 4xx или 5xx ошибку, то сервис будет повторно отправлять сообщение, ожидая успешной обработки. Время между попытками может быть сконфигурировано отдельным параметром.

Так же, возможно задать количество попыток, по истечению которых сообщение будет помечено как обработанное, что прекратит повторные отправки не зависимо от статуса ответа. Не советую использовать это для чувствительных данных, ситуации отказа консюмеров всегда нужно корректировать вручную. Залипания сообщения можно отслеживать по логам сервиса и мониторингу статусов ответа консюмера.

про залипания

Обычно, HTTP-сервер, отдавая 4xx или 5xx статус ответа, отсылает ещё и заголовок Connection: close. Закрытое таким образом TCP-соединение остаётся в статусе TIME_WAITED, пока не будет подчищено операционной системой спустя какое-то время. Проблема в том, что такие соединения занимают целый порт, который невозможно переиспользовать до освобождения. Это может вылиться в отсутствие свободных портов на машине для установки TCP-соединения и сервис будет сыпаться исключениями в логи на каждую отправку. На практике, на Windows 10 порты кончаются спустя 10–20 тысяч отправок ошибочных сообщений в течение 1–2 минут. В стандартном режиме работы это не проблема.


Каждое сообщение, извлечённое из брокера, отправляется консюмеру по HTTP на указанный при подписке ресурс. По умолчанию сообщение отправляется POST-запросом в теле. Это поведение можно изменить, указав любой другой метод. Если метод не поддерживает отправку данных в теле, можно указать название строкового параметра, в котором будет отправлено сообщение. Помимо этого, при подписке можно указать дополнительные заголовки, которые будут добавлены к каждому сообщению, что удобно для базовой авторизации с помощью токенов. К каждому сообщению добавляются заголовки с указанием идентификатора консюмера, топика и партиции, откуда сообщение было прочитано, номер сообщения, partition key, если применим, а также название самого брокера.
Для оценки производительности я использовал ПК (Windows 10, OpenJDK-11 (G1 без тюнинга), i7–6700K, 16GB), на котором запущен сервис и ноутбук (Windows 10, i5–8250U, 8GB), на котором крутился продюсер сообщений, HTTP-ресурс консюмера и Kafka с дефолтными настройками. ПК подключен к роутеру по проводному соединению 1Gb/s, ноутбук по 802.11ac. Продюсер каждые 100 мс в течении 1000 секунд записывает сообщения, длиной в 110 байт, в назначенные топики, на которые подписаны консюмеры (concurrencyFactor=500, автокомит выключен) из разных групп. Стенд далёк от идеального, но некоторую картину получить можно.

Ключевым измеряемым параметром является влияние сервиса на latency.

Пусть:
— tq — временная метка получения сервисом сообщения от нативного клиента
— dt0 — время между tq и временем отправки сообщения из локальной очереди в пул экзекьютеров
— dt — время между tq и временем отправки HTTP-запроса. Именно dt является влиянием сервиса на latency сообщения.

В ходе измерений были получены следующие результаты (C — консюмеры, T — топики, M — сообщения):

p4r7pqavkke1d3glzc7u8o6a5gu.png

В стандартном режиме работы сервис сам по себе почти не влияет latency, а потребление памяти минимально. Максимальные значения dt (около 60 мс) не указаны специально, так как зависят от работы GC, а не от самого сервиса. Сгладить разброс максимальных значений может помочь специальный тюнинг GC или замена G1 на Shenandoah.

Всё кардинально меняется, когда консюмер не справляется с потоком сообщений из очереди и сервис включает режим тротлинга. В этом режиме увеличивается потребление памяти, так как время ответа на запросы сильно вырастает, что мешает своевременной очистке ресурсов. Влияние на latency здесь остаётся на уровне с предыдущими результатами, а высокие значения dt вызваны предзагрузкой сообщений в локальную очередь.

К сожалению, протестировать на бОльшей нагрузке не имеется возможности, так как ноутбук загибается уже на 1300 RPS. Если кто-то может помочь с организацией замеров на больших нагрузках, с радостью предоставлю сборку для тестов.


Теперь перейдём к демонстрации. Для этого нам понадобится:

  • Kafka брокер, готовый к работе. Я возьму поднятый на 192.168.99.100:9092 инстанс от Bitnami.
  • HTTP-ресурс, который будет принимать сообщения. Для наглядности я взял Web-hooks у Slack.


Прежде всего, необходимо поднять сам сервис Queue-Over-Http. Для этого создим в пустой директории application.yml следующего содержания:

spring:
  profiles: default

logging:
  level:
    com:
      viirrtus:
        queueOverHttp: DEBUG

app:
  persistence:
    file:
      storageDirectory: "persist"
  brokers:
    - name: "Kafka"
      origin: "kafka"
      config:
        bootstrap.servers: "192.168.99.100:9092"


Здесь мы указываем сервису параметры подключения конкретного брокера, а также, где хранить подписчиков, чтобы между запусками они не терялись. В `app.brokers[].config` можно указывать любые, поддерживаемые нативным клиентом Kafka параметры подключения, полный список можно посмотреть здесь.

Так как файл конфигурации обрабатывается Spring«ом, вы можете писать туда много интересного. В том числе, настраивать логирование.

Теперь запускаем сам сервис. Используем самый простой способ — docker-compose.yml:

version: "2"

services:
  app:
    image: viirrtus/queue-over-http:0.1.3
    restart: unless-stopped
    command: --debug
    ports:
      - "8080:8080"
    volumes:
      - ./application.yml:/application.yml
      - ./persist:/persist


Если этот вариант не устраивает, вы можете собрать сервис из исходников. Инструкция по сборке в Readme проекта, ссылка на который дана в конце статьи.

Следующим шагом регистрируем первого подписчика. Для этого необходимо выполнить HTTP-запрос к сервису с описанием консюмера (Consumer):

POST localhost:8080/broker/subscription
Content-Type: application/json

{
  "id": "my-first-consumer",
  "group": {
    "id": "consumers"
  },
  "broker": "Kafka",
  "topics": [
    {
      "name": "slack.test",
      "config": {
        "concurrencyFactor": 10,
        "autoCommitPeriodMs": 100
      }
    }
  ],
  "subscriptionMethod": {
    "type": "http",
    "delayOnErrorMs": 1000,
    "retryBeforeCommit": 10,
    "uri": "",
    "additionalHeaders": {
      "Content-Type": "application/json"
    }
  }
}


Если всё прошло успешно, в ответе будет почти тот же самый отправленный контент.

Пройдёмся по каждому параметру:

  • Consumer.id — идентификатор нашего подписчика
  • Consumer.group.id — идентификатор группы
  • Consumer.broker — указываем на какой из брокеров сервиса нужно подписаться
  • Consumer.topics[0].name — название топика, из которого хотим получать сообщения
  • Consumer.topics[0].config. concurrencyFactor — максимальное количество одновременно отправленных сообщений
  • Consumer.topics[0].config. autoCommitPeriodMs — период принудительного коммита готовых сообщений
  • Consumer.subscriptionMethod.type — тип подписки. В данный момент доступна только HTTP.
  • Consumer.subscriptionMethod.delayOnErrorMs — время до повторной отправки сообщения, которое закончилось ошибкой
  • Consumer.subscriptionMethod.retryBeforeCommit — количество попыток повторной отправки ошибочного сообщения. Если 0 — сообщение будет крутиться до успешной обработки. В нашем случае гарантия полной доставки не так важна, как постоянство потока.
  • Consumer.subscriptionMethod.uri — ресурс, на который будут отправляться сообщения
  • Consumer.subscriptionMethod.additionalHeader — дополнительные заголовки, которые будут отправлены с каждым сообщением. Пометим, что в теле каждого сообщения будет JSON, чтобы Slack мог правильно интерпретировать запрос.


В данном запросе указание HTTP-метода опущено, так как умолчание, POST, Slack вполне устраивает.

С этого момента сервис следит за назначенными партициями топика slack.test на предмет новых сообщений.

Для записи сообщений в топик я воспользуюсь встроенным в Kafka утилитами, которые расположены в /opt/bitnami/kafka/bin запущенного образа Kafka (расположение утилит в других инстансах Kafka может отличаться):

kafka-console-producer.sh --broker-list localhost:9092 --topic slack.test
> {"text”: "Hello!”}


В этот же момент Slack оповестит о новом сообщении:

klehz7ev6x1y2eaqpf_ylpnjic4.png

Чтобы отписать потребителя достаточно сделать POST запрос на `broker/unsubscribe` с тем же контентом, что был при подписке.


На данный момент реализован лишь базовый функционал. Далее планируется улучшить batching, попытаться реализовать Exactly-once семантику, добавить возможность отправки сообщений брокеру по HTTP и, самое главное, добавить поддержку других популярных Pub-Sub.

Сервис Queue-Over-Http сейчас находится в стадии активного развития. Версия 0.1.3 является достаточно стабильной для тестирования на dev и stage стендах. Работоспособность была проверена на Windows 10, Debian 9 и Ubuntu 18.04. Использовать в prod можно на свой страх и риск. Если вы хотите помочь с разработкой или дать любой фидбэк по сервису — добро пожаловать на Github проекта.

© Habrahabr.ru