Kafka Streams ч2: DSL, Processor API
DSL — это то, что делает Kafka Streams таким простым и позволяет подняться на более высокий уровень абстракции. В прошлой статье мы рассматривали следующий код:
KStream source = builder.stream("input-topic");
KStream processed = source.mapValues(value -> value.toUpperCase());
processed.to("output-topic");
Всё это — часть DSL Kafka Streams. Теперь хотелось бы глубже погрузиться в его особенности и в следующих статьях ещё шире исследовать инструментарий.
Архитектурно Kafka Streams строится на топологии обработчиков, опираясь на парадигму Dataflow Programming (DFP). Вместо классического построения программы в виде последовательности шагов используется Directed Acyclic Graph (DAG) — направленный ациклический граф.
Узел источника (Source Processor) — точка, откуда поступают данные.
Узел обработчика потока (Stream Processor) — место применения логики обработки данных, таких методов как filter, map, flatMap
и других.
Узел приёмника (Sink Processor) — куда направляются отфильтрованные, обогащённые и преобразованные данные.
Данные проходят по топологии, и за один проход через неё обрабатывается исключительно одна запись. Например, такая схема работать не будет:
В итоге у нас выстраивается пирамида абстракций: чем выше уровень абстракции, тем меньше контроля над деталями. В Kafka Streams попытались объединить преимущества разных уровней, чтобы «усидеть на двух стульях одновременно».
DSL прекрасно подходит для работы с данными — различных преобразований и трансформаций. Однако для работы с метаданными, возможности планировать периодичность функций и контролировать время выполнения определённых операций уже требуют обращения к низкоуровневому API узлов (Processor API).
Настройка Apache Kafka и наполнение данных
Для начала необходимо запустить Apache Kafka. Я использую Docker и запускаю Kafka вместе с UI с помощью docker-compose.yml
. Вот конфигурация:
version: '3.8'
services:
kafka:
image: confluentinc/confluent-local:7.5.6.arm64
hostname: kafka
container_name: kafka
ports:
- "8082:8082"
- "9092:9092"
- "9101:9101"
environment:
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
KAFKA_LISTENERS: 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092'
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- 9999:8080
environment:
DYNAMIC_CONFIG_ENABLED: true
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
Примечание: Если вы используете другую платформу или настройки, адаптируйте конфигурацию под себя.
Запустите Docker Compose
docker-compose up -d
Создадим топик с названием users-topic и добавим в него несколько значений: Bob, Alice, John, Jake, Nancy.
Можно использовать Kafka UI для создания топика и добавления сообщений. Если вы предпочитаете командную строку, используйте следующие команды:
# Создание топика
docker exec -it kafka kafka-topics --create --topic users-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
# Отправка сообщений в топик
docker exec -it kafka kafka-console-producer --topic users-topic --bootstrap-server localhost:9092
После выполнения последней команды введите имена по одному в каждой строке:
Bob
Alice
John
Jake
Nancy
Нажмите Ctrl+C
для выхода из продюсера.
Теперь у нас есть настроенная среда и данные для дальнейшего изучения возможностей Kafka Streams.
Конфигурация репозитория
Теперь нам нужна правильная конфигурация build.gradle.kts
plugins {
id("java")
id("application")
}
group = "kz.nitec"
version = "1.0-SNAPSHOT"
repositories {
mavenCentral()
}
dependencies {
implementation("org.apache.kafka:kafka-streams:3.8.0")
}
tasks.test {
useJUnitPlatform()
}
// Конфигурация для запуска приложения для DSL
tasks.register("runDSL") {
mainClass.set("kz.nitec.DslExample")
classpath = sourceSets["main"].runtimeClasspath
}
// Конфигурация для запуска приложения для Processor API
tasks.register("runProcessorApi") {
mainClass.set("kz.nitec.ProcessorApiExample")
classpath = sourceSets["main"].runtimeClasspath
}
./gradlew build
библиотека Kafka Stream добавлена и готова к использованию теперь можно начать писать код для приложения
DSL решение
Создайте новый класс DslExample со следующим исходным кодом:
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
public class DslExample {
public static void main(String[] args) {
// Настройка свойств приложения
Properties props = new java.util.Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "dsl-example-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// чтобы читать данные с начала топика в явном виде указываем начальное смещение
props.put("consumer.auto.offset.reset", "earliest");
// Создание билдера для построения топологии
StreamsBuilder builder = new StreamsBuilder();
// Чтение данных из входного топика
KStream source = builder.stream("users-topic");
// Обработка данных и вывод в консоль
source.foreach((key, value) -> System.out.println("Hello, " + value));
// Создание и запуск Kafka Streams приложения
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Добавление shutdown hook для корректного завершения работы
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
после запуска ./gradlew runDSL --info
в консоли будут следующие значения
Hello Bob
Hello Alice
...
остальные имена
Processor API решение
Processor API предоставляет более низкоуровневый доступ к обработке потоков в Kafka Streams, что даёт больше контроля над процессом обработки данных.
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.*;
import java.util.Properties;
public class ProcessorApiExample {
public static void main(String[] args) {
// Настройка свойств приложения
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "processor-api-example-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Чтобы читать данные с начала топика, указываем начальное смещение
props.put(StreamsConfig.consumerPrefix("auto.offset.reset"), "earliest");
// Создание топологии
var topology = new Topology();
// Добавляем источник (source) - читаем из топика "users-topic"
topology.addSource("Source", "users-topic")
// Добавляем процессор (processor) для обработки сообщений
.addProcessor("Processor", (ProcessorSupplier) () -> new Processor<>() {
@Override
public void init(org.apache.kafka.streams.processor.ProcessorContext context) {
// Инициализация процессора
}
@Override
public void process(String key, String value) {
// Обработка сообщения и вывод в консоль
System.out.println("Hello, " + value);
}
@Override
public void close() {
// Закрытие ресурсов при необходимости
}
}, "Source");
// Создание и запуск Kafka Streams приложения
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
// Добавление shutdown hook для корректного завершения работы
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
после запуска ./gradlew runProcessorApi --info
в консоли будет идентичный результат как и при DSL примере.
DSL или Processor API
Processor API предоставляет более низкоуровневый доступ к обработке потоков в Kafka Streams. Это даёт больше контроля и гибкости, но требует большего объёма кода и глубокого понимания внутренних механизмов системы.
В конкретном случае удобнее использовать DSL, так как он предоставляет простой доступ к абстракциям потоков (Streams) и таблиц (Tables). DSL позволяет писать более понятный и лаконичный код, сосредотачиваясь на логике обработки данных, а не на инфраструктурных деталях. Это упрощает разработку и снижает вероятность ошибок, делая процесс более эффективным.