Apache Kafka и EDA

Архитектура, управляемая событиями (Event Driven Architecture, EDA) получила широкое распространение при разработке программного обеспечения, способного легко масштабироваться и обрабатывать большие объемы данных в режиме реального времени. В этой статье мы подробно рассмотрим EDA и продемонстрируем, как распределенная потоковая платформа Apache Kafka, может быть использована для создания масштабируемых систем.

О событиях

Event‑Driven Architecture построена на концепции событий. Событие — это значительное изменение состояния, о котором необходимо сообщить в системе. В качестве источников событий выступают производители событий (Event Producers). Они собственно и создают события. Например, форма регистрации пользователя генерирует событие «Новый пользователь».

Помимо генераторов у нас также есть потребители событий. Например, служба уведомлений, которая отправляет приветственное письмо при получении события о новом пользователе.

И каналы событий — это средства, через которые передаются события, например брокеры сообщений типа Kafka, о которых мы и будем говорить далее.

Архитектура EDA обладает рядом преимуществ, позволяющим эффективно ее использовать при разработке приложений.

Начнем с масштабируемости. Системы могут обрабатывать большие объемы данных и масштабироваться за счет добавления новых потребителей. При этом, производители и потребители событий слабо связаны друг с другом, что делает систему более гибкой и простой в обслуживании.

Также, важным преимуществом является то, что события обрабатываются по мере их возникновения, что позволяет анализировать и реагировать на них в режиме реального времени. При этом, системы могут оперативно реагировать на изменения и новые данные.

EDA vs. SOA

Архитектура EDA является заменой традиционных монолитных систем или систем SOA (Service‑Oriented Architecture). Прежде, чем сравнивать эти архитектуры, давайте сперва разберемся, что из себя представляет каждая из них.

Исторически SOA была ответом на старую изолированную программную инфраструктуру, где каждое подразделение в бизнесе имело собственное программное обеспечение, специально предназначенное для его целей, не связанное ни с какими другими системами и служившее только одной задаче. SOA позволяет отдельным элементам обмениваться информацией в рамках всего бизнеса. При использовании SOA в качестве архитектуры вашей системы сервисы являются наиболее важным аспектом.

При этом SOA — это концепция для всего предприятия: каждая часть программной инфраструктуры должна быть связана, объединена языком взаимодействия. SOA представляет собой сеть, соединяющую различные сервисы, причем соединения с каждым сервисом могут быть легко воспроизведены для разных сервисов в рамках всей системы.

Сервисная архитектура определяет, как различные компоненты системы работают вместе. Система, построенная с учетом SOA, имеет определяющий язык или протокол, который объединяет все части системы: это может быть HTTP, JSON или другие веб‑языки. Каждый сервис в системе может общаться с другими сервисами на общем языке, а отдельные сервисы могут контролироваться или работать вместе через общий язык. Это помогает объединить разрозненные языки без ущерба для их производительности.

В отличие от SOA, архитектура EDA ориентирована на события и рассматривает все, что происходит в вашей системе, как события и по сути, представляет собой коренное переосмысление архитектуры вашего бизнеса. Если сделать события основой вашей системы, разработчики смогут создать единый источник истины в архитектуре программного обеспечения, на который смогут опираться все остальные системы. Система, состоящая на фундаментальном уровне из производителей и потребителей событий, каждый из которых получает информацию из единого источника, позволяет пользователям принимать более эффективные бизнес‑решения и быть полностью информированными обо всей необходимой информации. Она также позволяет различным сервисам работать независимо друг от друга: один производитель событий может информировать двух потребителей для разных целей и результатов, и вы можете рассчитывать на то, что оба потребителя будут корректны.

В архитектуре, ориентированной на события, именно сами события являются наиболее важным аспектом вашей системы, а не сервисы внутри нее. Приоритет отдается событиям (размещенным заказам, измеренным показаниям, полученным отзывам и т. д.), а особенно записи того, что происходит с этими событиями. Хотя SOA обеспечивает доставку сообщений системы, она не может их записывать. EDA не только доставляет сообщения системы, но и расставляет приоритеты и обеспечивает контекст.

В отличие от SOA, EDA не обязательно усложняет бизнес по мере его роста. Построение бизнес‑архитектуры на основе событий упрощает управление сложностью: производители и потребители могут быть добавлены в поток бизнес‑событий и удалены эффективным образом. При использовании EDA вопрос «как» не стоит, на первый план выходят вопросы «где» и «что наиболее эффективно», что избавляет команду разработчиков от лишних сложностей.

Что использовать?

Использование SOA или архитектуры, управляемой событиями, зависит от потребностей вашего бизнеса. SOA отлично подходит для компаний, которые не в состоянии переосмыслить свою архитектуру или то, что они хотят получить от своих данных. Если в вашем бизнесе существуют разрозненные приложения, требующие интеграции, то, возможно, вам придется использовать SOA в качестве шага на пути к более унифицированной, современной архитектуре.

Архитектура, ориентированная на события, наиболее эффективна для предприятий, которые хотят вести учет всех событий и их истории. Многие успешные компании в самых разных отраслях использовали события в качестве основы своей системной архитектуры. Использование событий в качестве движущей силы архитектуры позволяет компаниям получать доступ к большим объемам данных по широкому спектру сервисов и хранить их в компактном виде.

Таким образом, EDA не опирается на прямую связь между сервисами. Вместо этого сервисы взаимодействуют через события, что обеспечивает лучшую масштабируемость и отказоустойчивость.

Разобравшись с преимуществами данной архитектуры перейдем к рассмотрению одного из наиболее распространенных инструментов для ее реализации — распределенной потоковой платформе Apache Kafka.

Apache Kafka

Основные компоненты Kafka включают:

  • Топики: Категории, в которые публикуются записи.

  • Разделы: Подразделы тем, которые помогают распределить нагрузку.

  • Продюсеры: Клиенты, которые публикуют события в топики Kafka.

  • Консьюмеры: Клиенты, которые читают события из топиков Kafka.

  • Брокеры: Серверы Kafka, которые управляют хранением и получением данных.

Экосистема Kafka включает в себя несколько мощных инструментов:

  • Kafka Connect: Для интеграции Kafka с различными источниками и поглотителями данных.

  • Kafka Streams: Для создания приложений потоковой обработки данных.

  • KSQL: SQL‑подобный интерфейс для запросов и обработки данных в Kafka.

Установка Apache Kafka

Процесс установки Kafka достаточно прост. Загружаем последнюю версию и распаковываем архив:

$ wget https://dlcdn.apache.org/kafka/4.0.0/kafka_2.13-4.0.0.tgz

$ tar -xzf kafka_2.13-4.0.0.tgz

$ cd kafka_2.13-4.0.0.tgz

Далее генерируем Cluster UUID:

$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

Применяем настройки форматирования:

$ bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties

И запускаем сервер:

$ bin/kafka-server-start.sh config/server.properties

А можно запустить из образа Docker, как вариант для ленивых:

$ docker run -p 9092:9092 apache/kafka-native:4.0.0

Для проверки работы нашей установки давайте создадим топик и протестируем нашу настройку Kafka с помощью простых продюсера и консьюмера:

Создание топика:

bin/kafka-topics.sh -create -topic test-topic -bootstrap-server localhost:9092 -partitions 1 -replication-factor 1

Запуск продюсера:

bin/kafka-console-producer.sh -topic test-topic -bootstrap-server localhost:9092

Запуск консьюмера:

bin/kafka-console-consumer.sh -topic test-topic -from-beginning -bootstrap-server localhost:9092

После выполнения этих команд мы можем вводить сообщения в консоль продюсера, и они должны появиться в консоли консьюмера.

Однако, Kafka также можно эффективно использовать в приложениях, работающих в режиме реального времени.

Обработка потоков с помощью Kafka Streams

Kafka Streams — это клиентская библиотека для создания приложений, которые обрабатывают и преобразуют данные в Kafka. Она позволяет создавать приложения реального времени, которые могут фильтровать, агрегировать и объединять потоки данных.

В качестве примера разберем демонстрационное приложение WordCount предназначенное для работы с бесконечным, неограниченным потоком данных. Как и в случае с ограниченным вариантом, это алгоритм с состоянием, который отслеживает и обновляет количество слов. Однако, поскольку он должен принимать потенциально неограниченные входные данные, он будет периодически выводить свое текущее состояние и результаты, продолжая обрабатывать новые данные, поскольку он не может знать, когда обработает «все» входные данные.

Для начала подготовим топик для входных слов, которыемы будем считать и выходной топик для результатов:

$ bin/kafka-topics.sh --create \

    --bootstrap-server localhost:9092 \

    --replication-factor 1 \

    --partitions 1 \

    --topic streams-plaintext-input

$ bin/kafka-topics.sh --create \

    --bootstrap-server localhost:9092 \

    --replication-factor 1 \

    --partitions 1 \

    --topic streams-wordcount-output \

    --config cleanup.policy=compact

Само приложение будет иметь следующий код:

final Serde stringSerde = Serdes.String();
final Serde longSerde = Serdes.Long();


KStream textLines = builder.stream(
      "streams-plaintext-input",
      Consumed.with(stringSerde, stringSerde)
    );

KTable wordCounts = textLines
    // Split each text line, by whitespace, into words.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))


    // Group the text words as message keys
    .groupBy((key, value) -> value)


    // Count the occurrences of each word (message key).
    .count();
 

// Store the running counts as a changelog stream to the output topic.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

Для запуска приложения необходимо выполнить:

$ bin/kafka‑run‑class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

Приложение будет читать из входного топика streams‑plaintext‑input, выполнять вычисления алгоритма WordCount над каждым из прочитанных сообщений и непрерывно записывать текущие результаты в выходной топик streams‑wordcount‑output. Таким образом, в STDOUT не будет выводиться ничего, кроме записей журнала, поскольку результаты будут записываться обратно в Kafka.

Теперь давайте напишем какое‑нибудь сообщение с помощью консольного продюсера в топик ввода streams‑plaintext‑input, введя одну строку текста. В результате мы отправим новое сообщение в тему ввода, где ключ message будет равен null, а значение message — строка текста в строковой кодировке, которую вы только что ввели (на практике входные данные для приложений обычно непрерывно поступают в Kafka, а не вводятся вручную):

ead43203df582e9d4951278495db8647.png

Здесь первый столбец — это ключ сообщения Kafka в формате java.lang.String, представляющий слово, которое мы собственно и считаем, а второй столбец — это значение сообщения в формате java.lang.Long, представляющее последнее подсчитанное слово.

$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input

>all streams lead to kafka

>hello kafka streams

При дальнейшей записи входных сообщений в тему ввода вы будете наблюдать добавление новых сообщений в тему streams‑wordcount‑output, представляющих число слов, подсчитанное приложением WordCount. Давайте введем еще одну текстовую строку «join kafka summit» в консоли продюсера топика ввода streams‑plaintext‑input:

$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input

>all streams lead to kafka

>hello kafka streams

>join kafka summit

Топик streams‑wordcount‑output впоследствии покажет соответствующие обновленные количества слов (см. последние три строки):

8b3915749e50a69b87f07af8bc993190.png

Как видно, выходные данные приложения Wordcount представляют собой непрерывный поток обновлений, где каждая выходная запись (то есть каждая строка в исходном выводе выше) — это обновленный подсчет слов, а также ключ записи, например «kafka». Для нескольких записей с одним и тем же ключом каждая последующая запись является обновлением предыдущей.

Заключение

В этой статье мы рассмотрели основы архитектуры, управляемой событиями, и то, как Apache Kafka может помочь в построении масштабируемых систем. Мы поговорили об основах Kafka, и немного углубились в обработку потоков с помощью Kafka Streams. Реализовав EDA с Kafka, вы сможете создавать производительные системы реального времени, способные эффективно обрабатывать большие объемы данных.

Если вы работаете с распределёнными системами и стремитесь глубже понимать ключевые технические решения, приглашаем на открытые уроки, где разбираем реальные подходы и инструменты.

Темы ближайших встреч:

  • 7 апреля: Добиваемся согласованности данных в распределённых коммуникациях с помощью Transaction Outbox. Подробнее

  • 14 апреля: Patroni и его применение с Postgres. Подробнее

Habrahabr.ru прочитано 6466 раз