[Перевод] Kafka и Chronicle Queue
Хотя облачные сервисы удобны и гибки, эксплуатационные затраты на приложения, развёрнутые в облаке, иногда могут быть существенными. В этой статье мы расскажем о способе существенного снижения эксплуатационных затрат в чувствительных к задержке Java-приложений с Event-Driven Architecture (EDA) при помощи миграции с Kafka на Chronicle Queue — опенсорсной, более эффективно использующей ресурсы реализации очереди с пониженной задержкой.
Что такое EDA?
EDA-приложение — это распределённое приложение, в котором выполняется создание, распознавание и потребление событий (в виде сообщений или DTO), а также реагирование на них. «Распределённое» означает, что приложение может выполняться на разных машинах или на одной машине, но в отдельных процессах или потоках. В этой статье используется последний вариант, а сообщения сохраняются в очередях.
Подготовка к работе
Предположим, у нас есть EDA-приложение с цепочкой из пяти сервисов; при этом имеется требование, что 99,9% сообщений, отправленных от первого их создателя до последнего потребителя, должно иметь задержку менее 100 мс при частоте 1000 сообщений в минуту.
Рисунок 1 — пять сервисов и бенчмарк, взаимосвязанные шестью топиками/очередями
Иными словами, время между отправкой сообщения (например, при помощи топика 0) потоком Benchmark до получения конечного сообщения снова потоком Benchmark (например, при помощи топика 5) может быть больше 100 мс в среднем только для одного из 1000 сообщений, отправляемых каждую секунду.
Используемые в этой статье сообщения просты. Они содержат метку времени в наносекундах в формате long, в которой хранится изначальная метка времени первой публикации сообщения при помощи топика 0, и значение int, увеличивающееся каждый раз, когда сообщение передаётся от одного сервиса к следующему (на самом деле это значение не используется, однако оно иллюстрирует рудиментарную логику сервисов). Когда сообщение возвращается в поток Benchmark, текущее время в наносекундах сравнивается с исходным временем в первоначальном сообщении в топике 0, чтобы можно было вычислить общую задержку во всей цепочке сервисов. Затем сэмплы задержки последовательно передаются в гистограмму для дальнейшего анализа.
Как видно из Рисунка 1, количество топиков/очередей равно количеству сервисов плюс один. Следовательно, поскольку сервисов пять, топиков/очередей шесть.
Главный вопрос
Главный вопрос этой статьи таков: сколько экземпляров таких цепочек мы можем создать на выделенном оборудовании, не нарушая требований к задержкам?
Иными словами, сколько таких приложений мы можем запустить и платить ту же цену за используемое оборудование?
Стандартная настройка
Для этой статьи я решил использовать Apache Kafka, потому что это один из самых распространённых типов очередей, используемых на рынке. Также я выбрал Chronicle Queue за возможность обеспечения низкой задержки и ресурсоэффективность.
И Kafka, и Chronicle Queue имеют множество настраиваемых опций, в том числе репликацию данных между несколькими серверами. В этой статье будет использоваться одна очередь без репликации. Из соображений производительности брокер Kafka будет работать на той же машине, что и серверы, что позволяет использовать локальный сетевой loopback-интерфейс.
Экземпляры Kafka Producer сконфигурированы так, чтобы быть оптимизированными под низкую задержку (например, «acks=1»), то же самое относится и к экземплярам KafkaConsumer.
Экземпляры Chronicle Queue созданы со стандартной настройкой без явных оптимизаций. Следовательно, расширенные функции производительности Chronicle Queue наподобие CPU-core pinning и busy spin-waiting не используются.
Kafka
Apache Kafka — это опенсорсная распределённая платформа стриминга событий для высокопроизводительных конвейеров данных, стриминговой аналитики, интегрирования данных и критически важных приложений, активно используемых в различных EDA-приложениях, особенно в случаях, когда необходимо агрегировать и потреблять информацию от множества источников, расположенных в различных местах.
В этом бенчмарке каждый тестовый экземпляр создаст шесть отдельных топиков Kafka с названиями topicXXXX0, topicXXXX1, …, topicXXXX5, где XXXXX — случайное число.
Chronicle Queue
Опенсорсный Chronicle Queue — это фреймворк передачи сообщений с низкой задержкой для высокопроизводительных и критически важных приложений. Любопытно, что Chronicle Queue использует память вне кучи и распределение памяти для снижения влияния давления на память и сборки мусора, благодаря чему этот продукт популярен в сфере финтеха, где детерминированная передача сообщений с низкой задержкой критически важна.
В этом втором бенчмарке каждый тестовый экземпляр создаст шесть экземпляров Chronicle Queue с названиями topicXXXX0, topicXXXX1, …, topicXXXX5, где XXXXX — случайное число.
Код
Ниже показаны внутренние циклы двух разных реализаций потоков сервисов. Оба они опрашивают свою очередь ввода, пока не получат команду завершения работы и, если сообщения отсутствуют, они будут ждать в течение одной восьмой от ожидаемого времени между сообщениями, а потом будут совершать повторную попытку.
Вот как выглядит код:
▍ Kafka
while (!shutDown.get()) {
ConsumerRecords records =
inQ.poll(Duration.ofNanos(INTER_MESSAGE_TIME_NS / 8));
for (ConsumerRecord record : records) {
long beginTimeNs = record.value();
int value = record.key();
outQ.send(new ProducerRecord<>(topic, value + 1, beginTimeNs));
}
}
Использование записи key()
для хранения значения int может показаться немного необычным шагом, однако он позволяет нам улучшить производительность и упростить код.
▍ Chronicle Queue
while (!shutDown.get()) {
try (final DocumentContext rdc = tailer.readingDocument()) {
if (rdc.isPresent()) {
ValueIn valueIn = rdc.wire().getValueIn();
long beginTime = valueIn.readLong();
int value = valueIn.readInt();
try (final DocumentContext wdc =
appender.writingDocument()) {
final ValueOut valueOut = wdc.wire().getValueOut();
valueOut.writeLong(beginTime);
valueOut.writeInt(value + 1);
}
} else {
LockSupport.parkNanos(INTER_MESSAGE_TIME_NS / 8);
}
}
}
Бенчмарки
Бенчмарки прошли этап первоначального «разогрева», в течение которого — компилятор C2 JVM спрофилировал и скомпилировал для гораздо большей производительности. Результаты сэмплирования с этапа разогрева не учитывались.
В процессе тестирования вручную запускались новые тестовые экземпляры (каждый со своими пятью сервисами) до тех пор, пока система не переставала удовлетворять требованиям к задержке. При выполнении бенчмарков также при помощи команды «top» для всех экземпляров отслеживалось использование ЦП (центрального процессора) и показания нескольких секунд усреднялись.
В бенчмарках не учитывалось coordinated omission, тесты проводились на Ubuntu Linux (5.11.0–49-generic) с 16-ядерными процессорами AMD Ryzen 9 5950X (3,4 Ггц) и 64 ГБ ОЗУ, а приложения выполнялись на изолированных ядрах 2–8 (суммарно 7 ядер ЦП), а очереди сохранялись на флэш-устройстве NVMe объёмом 1 ТБ. Использовался OpenJDK 11 (11.0.14.1).
Все значения задержек указаны в мс, 99% означает 99 перцентиль, а 99,9% означает 99,9 перцентиль.
Kafka
Брокер Kafka и бенчмарки запускались с префиксом «taskset -c 2–8», за которым следовала соответствующая команда (например, taskset -c 2–8 mvn exec: java@Kafka). Для Kafka были получены следующие результаты:
Таблица 1 — экземпляры Kafka, задержки и использование ЦП
(*) Более 100 мс в 99,9 перцентиле.
Как мы видим, только одновременно может выполняться только один экземпляр системы EDA. Запуск двух экземпляров увеличил 99,9 перцентиль и он превысил ограничение в 100 мс. Экземпляры и брокер Kafka быстро пришли к насыщению доступных ресурсов ЦП.
Вот снэпшот результатов выполнения команды «top» при работе двух экземпляров и брокера (PID 3132946):
3134979 per.min+ 20 0 20.5g 1.6g 20508 S 319.6 2.6 60:27.40 java
3142126 per.min+ 20 0 20.5g 1.6g 20300 S 296.3 2.5 19:36.17 java
3132946 per.min+ 20 0 11.3g 1.0g 22056 S 73.8 1.6 9:22.42 java
Chronicle Queue
Бенчмарки выполнялись при помощи команды «taskset -c 2–8 mvn exec: java@ChronicleQueue», при этом были получены следующие результаты: Таблица 2 — экземпляры Chronicle Queue, задержки и использование ЦП
В этих бенчмарках становится очевидной эффективность Chronicle Queue — одновременно могут работать 500 экземпляров, то есть одновременно мы можем обрабатывать 3000 очередей и 3000000 сообщений в секунду всего на 7 ядрах с задержкой менее 100 мс в 99,9 перцентиле.
Сравнение
Ниже показан график количества экземпляров и 99,9 перцентиля для двух типов очередей (чем меньше, тем лучше):
График 1 — задержки экземпляров в мс для 99,9 перцентиля
Как мы видим, кривая для Kafka поднимается с 30 мс до 106 мс всего за один шаг, поэтому рост задержки для Kafka на этом графике выглядит как стена.
Вывод
В случае работы с чувствительными к задержке EDA-приложениями при переходе с Kafka на Chronicle Queue на одном и том же оборудовании может работать примерно в четыреста раз больше приложений.
График 2 — нормализованные затраты относительно типа очереди (чем меньше, тем лучше)
Как видно из Графика 2, увеличение максимального количества приложений в примерно четыреста раз соответствует потенциалу снижения затрат на облако или оборудование приблизительно на 99,8%. На самом деле, при использованном масштабе затраты едва видны.