Почему стриминг на KSQL и Kafka Streams — это непросто

Привет, Хабр!

Меня зовут Саша, я лид-разработчик в GlowByte Consulting. Мы с командой сделали неплохой стриминговый движок для одного крупного банка. Сейчас в продакшене крутится онлайн обработка банковских авторизаций, визитов клиентов в офис и еще ряд более мелких процессов, при этом все работает на KSQL и Kafka Streams. Хочу поделиться тем, на какие грабли мы наступили в процессе.

Если интересны подробности, прошу под кат.

image

Для тех, кто в танке


Если вы не знакомы с Kafka, советую почитать вводную часть с официального сайта Apache: kafka.apache.org/intro, либо с сайта Confluent:
docs.confluent.io/current/kafka/introduction.html, чтобы было понятно, о чем вообще речь.

Если же вы работали с Kafka как с шиной для обмена сообщениями, то у вас мог возникнуть вопрос: как сделать на этом стриминговый движок? Отвечаю, все благодаря Kafka Streams API и KSQL.

Kafka Streams — это Java API, которое позволяет перекладывать данные из топика в топик, по пути совершая различные преобразования этих данных.

KSQL — это надстройка над Kafka Streams, которая позволяет вместо написания Java кода использовать SQL-подобный язык, который автоматически генерирует Kafka Streams приложения.

Что мы сделали


image

Архитектура решения GlowByte

Наш PROD-кластер Kafka состоит из 3 нод. Конфигурация каждой примерно такая:

  • 48 ядер CPU (96 с HT) по 2.1 ГГц;
  • 512 ГБ RAM;
  • 12x HDD 10 ТБ;
  • Внутренняя сеть 40 Гбит/с.


В качестве входных данных используются:

  • Плоские файлы, падающие на сетевой диск с частотой раз в минуту;
  • Оперативные данные из витрин в Oracle, загружаемые через Oracle Golden Gate;
  • Медленные данные с витринами и справочниками, нужными для процессинга, которые грузятся из Hadoop.


Все входные данные обрабатываются в режиме реального времени и кладутся в топики Kafka, которые далее вычитываются через SAS RTDM (Real-Time Decision Manager), а также выгружаются в NOSQL базу HBase, куда SAS RTDM ходит за точечными лукапами. Далее уже SAS RTDM принимает решение о проведении маркетинговых коммуникаций посредством SMS, PUSH-уведомлений и т.д.

Пример сценария (не обязательно настоящего)

Клиент совершает покупку по карте в каком-либо магазине. Определяется, что магазин находится в крупном торговом центре, где также есть магазины-партнеры банка, выпустившего карту. Так что клиенту почти сразу после совершения покупки приходит сообщение с предложением посетить также и эти магазины.

Почти всю логику мы реализовали на KSQL-скриптах. Пишутся они, в отличие от обычного SQL не так просто. Но все же проще, чем Java-код. К тому же, их можно исправлять без компиляции (удобно менять какие-нибудь простые фильтры), а также они могут быть прочитаны человеком, абсолютно не знакомым с Java и с классическим программированием вообще.

Kafka Streams API мы использовали только в тех случаях, где KSQL оказался бессилен. Например, при сложных one-to-many джойнах.

Как надо и как не надо


Теперь перейду к основной сути поста. А конкретно, к граблям, на которые мы напоролись.

НЕ НАДО: Надеяться, что все заработает «из коробки»
НАДО: Изучить все технические детали

Казалось бы, очевидный пункт. Но когда реализуешь большое и технически сложное решение, все равно на что-то да наткнешься. Основные наши проблемы были:

  • Мы использовали везде одинаковое дефолтное число партиций.

    Партицирование в Kafka нужно для регулирования возможностей параллелизма. Мы поставили везде по 20 партиций «с запасом». В итоге же оказалось, что большое количество партиций сильно увеличило нагрузку на диски. А диски у нас — обычные HDD, пусть и большого объема;

  • Нужно сильно «тюнить» параметры Kafka-брокеров, Zookeeper«а и т.д.

    Несколько раз при попытке планово перезапустить Kafka (для внесения пары незначительных параметров), Kafka просто не поднималась. Приходилось оперативно разбираться с проблемой и чаще всего увеличивать те или иные timeout«ы в конфигах.


Диски и железо


НЕ НАДО: Использовать одни и те же диски для всего, просто потому что места хватает
НАДО: Хорошо подобрать конфигурацию железа (особенно диски)

image

Основная проблема, с которой мы столкнулись — использование обычных HDD под State Store.
Kafka Streams и, соответственно, KSQL под собой всегда имеют State Store в виде RocksDB для джойнов и вычисления агрегатов. На State Store приходится очень большое количество операций чтения-записи. Собственно, мы не ожидали, насколько много. Один обычный HDD вполне тянет (по IO) пару средних KSQL-процессов (два-три джойна) или один очень большой (пять джойнов + агрегация). Но процессов у нас очень много (десятки). А дисков всего 12 на ноду, причем почти все заняты топиками Kafka и нагружать их еще сверху — плохая идея.

В итоге мы вынесли почти весь State Store в RAM. Работает хорошо, но занимает несколько сотен ГБ памяти.

Смена парадигмы


НЕ НАДО: Просто переносить SQL-прототипы в стриминг, ожидая, что все заработает
НАДО: Перейти от парадигмы ETL в парадигму стриминга

Наша компания очень активно делает разработку регламентных ETL-процессов для различных заказчиков. ETL-процессы обычно представляют собой батчевые расчеты, проводимые раз в сутки, раз в неделю или реже. В 90%+ случаев под всеми обертками ETL-процесс — это последовательное выполнение SQL-запросов в той или иной БД.

Так как писали мы на KSQL, то в начале вполне логичным решением казалось просто адаптировать все SQL-прототипы, сделанные аналитиками, под синтаксис KSQL. Оказалось, так это не работает. KSQL — это лишь SQL-подобный язык, но работает он совершенно по-другому.

Ниже картинка, показывающая разницу в обновлении данных в БД и в Kafka. В Kafka записи не обновляются. Старая остается и добавляется новая.

image

Еще пример, на этот раз с перемешиванием записей. В процессе преобразования какие либо два сообщения могут сначала быть в одной партиции, потом сменить ключ, а потом снова оказаться в одной партиции. В итоге их порядок может измениться. А так как в некоторых случаях порядок важен, получаем проблему.

image

Ниже будет пример адаптации одного скрипта, который в парадигме ETL считался довольно простым:

SQL-скрипт:

  • 95 строк кода;
  • 1 шаг;
  • 1 full join;
  • Около 1–2 человеко-дней на разработку и тестирование.


KSQL-скрипт:

  • 874 строки кода;
  • 12 шагов;
  • 2 left join + 1 group by;
  • 40+ человеко-дней на разработку и тестирование.


Батчи вместо стриминга


НЕ НАДО: Делать на стриминге то, для чего хватит батчей
НАДО: Правильно выбирать, какие задачи можно сделать, а какие нет

Вначале планировалось, что все необходимые процессы будут реализованы на KSQL и Kafka Streams. В процессе же оказалось, что некоторые вещи, оперативность которых не столь критична, а сложность реализации очень высока, лучше вынести в батч. Такие батчи можно запускать раз в час, несколько раз в день или реже, в зависимости от критичности. Но это будет явно лучше, чем рассчитывать в онлайне все подряд.
Для себя мы решили:

Можно делать в стриминге:

  • Фильтрация;
  • Скалярное изменение атрибутов;
  • Точечные lookup«ы;
  • Простые агрегации.


Лучше по возможности вынести в батч

  • Джойны один ко многим и многие ко многим;
  • Сложные агрегации;
  • Пересчет за старые даты.


Проблема пересчета


НЕ НАДО: Ставить небольшой retention на все входящие события
НАДО: В задачах онлайн-расчетов быть готовыми к тому, что все придется пересчитать

В жизни никогда нельзя быть уверенным, что ты все сразу сделал правильно. Может возникнуть ситуация, что в расчеты, которые делались онлайн половину месяца, закралась ошибка (или требования поменялись задним числом). Конечно, во всех stateless операциях это не важно. Что было, то прошло. Но вот если в онлайне считаются, например, агрегаты (например, оборот клиента по карте), то это проблема. Если ничего не сделать, то они продолжат считаться неправильно. Приходится все пересчитывать. Поэтому, в отдельных случаях мы храним данные в топиках Kafka вплоть до трех месяцев. При этом пересчет в стриминге — штука не простая.

База данных

  1. Делается пересчет нужной витрины данных за нужные даты;
  2. Данные в целевой таблице заменяются пересчитанными.


Streaming

  1. Отключается онлайн-обработка данных;
  2. Очищается state store;
  3. Сбрасываются оффсеты;
  4. Идет репроцессинг данных;
  5. Если нужно, данные выгружаются в базу.


Вместо итога


Разумеется, я описал далеко не все проблемы, с которыми мы столкнулись в процессе реализации стриминг-движка. Но это точно одни из самых основных граблей, которые первыми приходят в голову. Если кому-то отдельные пункты показались очевидными или банальными — прошу прощения. Искренне надеюсь, что было не очень скучно, и что хотя бы кому-то эта статья поможет.

© Habrahabr.ru