Kafka Streams ч2: DSL, Processor API

DSL — это то, что делает Kafka Streams таким простым и позволяет подняться на более высокий уровень абстракции. В прошлой статье мы рассматривали следующий код:

KStream source = builder.stream("input-topic");

KStream processed = source.mapValues(value -> value.toUpperCase());

processed.to("output-topic");

Всё это — часть DSL Kafka Streams. Теперь хотелось бы глубже погрузиться в его особенности и в следующих статьях ещё шире исследовать инструментарий.

Архитектурно Kafka Streams строится на топологии обработчиков, опираясь на парадигму Dataflow Programming (DFP). Вместо классического построения программы в виде последовательности шагов используется Directed Acyclic Graph (DAG) — направленный ациклический граф.

b7517043e1a6c7791c6b5cbcc717cc4e.png

Узел источника (Source Processor) — точка, откуда поступают данные.

Узел обработчика потока (Stream Processor) — место применения логики обработки данных, таких методов как filter, map, flatMap и других.

Узел приёмника (Sink Processor) — куда направляются отфильтрованные, обогащённые и преобразованные данные.

Данные проходят по топологии, и за один проход через неё обрабатывается исключительно одна запись. Например, такая схема работать не будет:

35601aac5404bb61634584242339dca8.png

В итоге у нас выстраивается пирамида абстракций: чем выше уровень абстракции, тем меньше контроля над деталями. В Kafka Streams попытались объединить преимущества разных уровней, чтобы «усидеть на двух стульях одновременно».

b9f97ee9b0f0a5a8a9986b6f38ab257c.png

DSL прекрасно подходит для работы с данными — различных преобразований и трансформаций. Однако для работы с метаданными, возможности планировать периодичность функций и контролировать время выполнения определённых операций уже требуют обращения к низкоуровневому API узлов (Processor API).

Настройка Apache Kafka и наполнение данных

Для начала необходимо запустить Apache Kafka. Я использую Docker и запускаю Kafka вместе с UI с помощью docker-compose.yml. Вот конфигурация:

version: '3.8'

services:
  kafka:
    image: confluentinc/confluent-local:7.5.6.arm64
    hostname: kafka
    container_name: kafka
    ports:
      - "8082:8082"
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092'

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - 9999:8080
    environment:
      DYNAMIC_CONFIG_ENABLED: true
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092

Примечание: Если вы используете другую платформу или настройки, адаптируйте конфигурацию под себя.

Запустите Docker Compose

docker-compose up -d

Создадим топик с названием users-topic и добавим в него несколько значений: Bob, Alice, John, Jake, Nancy.

Можно использовать Kafka UI для создания топика и добавления сообщений. Если вы предпочитаете командную строку, используйте следующие команды:

# Создание топика
docker exec -it kafka kafka-topics --create --topic users-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

# Отправка сообщений в топик
docker exec -it kafka kafka-console-producer --topic users-topic --bootstrap-server localhost:9092

После выполнения последней команды введите имена по одному в каждой строке:

Bob
Alice
John
Jake
Nancy

Нажмите Ctrl+C для выхода из продюсера.

Теперь у нас есть настроенная среда и данные для дальнейшего изучения возможностей Kafka Streams.

Конфигурация репозитория

Теперь нам нужна правильная конфигурация build.gradle.kts

plugins {
    id("java")
    id("application")
}

group = "kz.nitec"
version = "1.0-SNAPSHOT"

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.apache.kafka:kafka-streams:3.8.0")
}

tasks.test {
    useJUnitPlatform()
}

// Конфигурация для запуска приложения для DSL
tasks.register("runDSL") {
    mainClass.set("kz.nitec.DslExample")
    classpath = sourceSets["main"].runtimeClasspath
}

// Конфигурация для запуска приложения для Processor API
tasks.register("runProcessorApi") {
    mainClass.set("kz.nitec.ProcessorApiExample")
    classpath = sourceSets["main"].runtimeClasspath
}

./gradlew build библиотека Kafka Stream добавлена и готова к использованию теперь можно начать писать код для приложения

DSL решение

Создайте новый класс DslExample со следующим исходным кодом:

import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;

public class DslExample {
    public static void main(String[] args) {
        // Настройка свойств приложения
        Properties props = new java.util.Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "dsl-example-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        // чтобы читать данные с начала топика в явном виде указываем начальное смещение
        props.put("consumer.auto.offset.reset", "earliest");

        // Создание билдера для построения топологии
        StreamsBuilder builder = new StreamsBuilder();

        // Чтение данных из входного топика
        KStream source = builder.stream("users-topic");

        // Обработка данных и вывод в консоль
        source.foreach((key, value) -> System.out.println("Hello, " + value));

        // Создание и запуск Kafka Streams приложения
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        // Добавление shutdown hook для корректного завершения работы
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

после запуска ./gradlew runDSL --info в консоли будут следующие значения

Hello Bob
Hello Alice
...
остальные имена

Processor API решение

Processor API предоставляет более низкоуровневый доступ к обработке потоков в Kafka Streams, что даёт больше контроля над процессом обработки данных.

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.*;

import java.util.Properties;

public class ProcessorApiExample {
    public static void main(String[] args) {
        // Настройка свойств приложения
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "processor-api-example-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        // Чтобы читать данные с начала топика, указываем начальное смещение
        props.put(StreamsConfig.consumerPrefix("auto.offset.reset"), "earliest");

        // Создание топологии
        var topology = new Topology();

        // Добавляем источник (source) - читаем из топика "users-topic"
        topology.addSource("Source", "users-topic")

                // Добавляем процессор (processor) для обработки сообщений
                .addProcessor("Processor", (ProcessorSupplier) () -> new Processor<>() {
                    @Override
                    public void init(org.apache.kafka.streams.processor.ProcessorContext context) {
                        // Инициализация процессора
                    }

                    @Override
                    public void process(String key, String value) {
                        // Обработка сообщения и вывод в консоль
                        System.out.println("Hello, " + value);
                    }

                    @Override
                    public void close() {
                        // Закрытие ресурсов при необходимости
                    }
                }, "Source");

        // Создание и запуск Kafka Streams приложения
        KafkaStreams streams = new KafkaStreams(topology, props);
        streams.start();

        // Добавление shutdown hook для корректного завершения работы
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

после запуска ./gradlew runProcessorApi --info в консоли будет идентичный результат как и при DSL примере.

DSL или Processor API

Processor API предоставляет более низкоуровневый доступ к обработке потоков в Kafka Streams. Это даёт больше контроля и гибкости, но требует большего объёма кода и глубокого понимания внутренних механизмов системы.

В конкретном случае удобнее использовать DSL, так как он предоставляет простой доступ к абстракциям потоков (Streams) и таблиц (Tables). DSL позволяет писать более понятный и лаконичный код, сосредотачиваясь на логике обработки данных, а не на инфраструктурных деталях. Это упрощает разработку и снижает вероятность ошибок, делая процесс более эффективным.

© Habrahabr.ru