Kafka Streams ч4: Stateful processing

b5916b33ae70519650f0f68a119810d7

В предыдущих статьях

мы познакомились с основами Kafka Streams и рассмотрели stateless операции. В этой статье мы погрузимся в stateful processing и создадим приложение для управления запасами в реальном времени. Шаг за шагом мы реализуем функциональность, которая позволит отслеживать состояние запасов товаров, обрабатывать поступления и продажи, а также предоставлять доступ к текущему состоянию через REST API.

Описание задачи

Мы разработаем приложение, которое:

  • Обрабатывает потоки данных о поступлениях товаров на склад и о продажах товаров.

  • Поддерживает актуальное состояние запасов для каждого товара.

  • Обновляет состояние запасов при поступлении новых товаров и при продажах.

  • Использует stateful операции для хранения состояния запасов.

  • Предоставляет доступ к состоянию через REST API с использованием Javalin.

  • Обогащает данные о транзакциях информацией о товаре из справочника.

  • Выводит информацию об остатках товаров в выходной топик Kafka.

Архитектура приложения

Бизнес-сущности

У нас есть три основных сущности:

  1. Поступления товаров (RestockEvent)

  2. Продажи товаров (SaleEvent)

  3. Информация о товарах (ProductInfo)

Топики Kafka

Мы будем работать с четырьмя топиками:

  1. restock-events-topic (KStream)

  2. sale-events-topic (KStream)

  3. product-info-topic (GlobalKTable)

  4. inventory-output-topic (выходной топик)

Бизнес-схема

flowchart LR
    RestockEvents[Поступления товаров] -->|KStream| InventoryProcessor[Процессор запасов]
    SaleEvents[Продажи товаров] -->|KStream| InventoryProcessor
    ProductInfo[Информация о товарах] -->|GlobalKTable| InventoryProcessor
    InventoryProcessor -->|State Store| InventoryState[Состояние запасов]
    InventoryProcessor -->|Выходной топик| InventoryOutput[Остатки товаров]
    InventoryState -->|REST API| JavalinServer[Сервер Javalin]

Шаг 1: Настройка проекта

Структура проекта

Создадим проект со следующей структурой:

css
Copy code
kafka-streams-stateful/
├── build.gradle.kts
├── src/
│   ├── main/
│   │   ├── java/
│   │   │   └── com/
│   │   │       └── example/
│   │   │           └── inventory/
│   │   │               ├── InventoryApp.java
│   │   │               ├── InventoryChange.java
│   │   │               ├── InventoryProcessor.java
│   │   │               ├── InventoryRestService.java
│   │   │               ├── GsonSerializer.java
│   │   │               ├── GsonDeserializer.java
│   │   │               ├── ProductInfo.java
│   │   │               ├── RestockEvent.java
│   │   │               └── SaleEvent.java
│   │   └── resources/
│   └── test/
└── settings.gradle.kts

  • build.gradle.kts: Файл сборки проекта с необходимыми зависимостями.

  • src/main/java/com/example/inventory/: Пакет с основными классами приложения.

    • InventoryApp.java: Главный класс приложения, запускающий Kafka Streams и REST API.

    • InventoryProcessor.java: Класс, содержащий логику обработки потоков данных и агрегации состояния запасов.

    • InventoryRestService.java: Класс, предоставляющий REST API для доступа к состоянию запасов.

    • InventoryChange.java: Модель данных для изменений запасов.

    • RestockEvent.java: Модель данных поступления товаров.

    • SaleEvent.java: Модель данных продаж товаров.

    • ProductInfo.java: Модель данных информации о товарах.

    • GsonSerializer.java: Класс для сериализации объектов в JSON.

    • GsonDeserializer.java: Класс для десериализации JSON в объекты.

  • src/main/resources/: Ресурсы приложения (если необходимо).

  • src/test/: Тесты для приложения (опционально).

  • settings.gradle.kts: Настройки Gradle для проекта.

Файл сборки build.gradle.kts

plugins {
    id("java")
    id("application")
}

group = "com.example.inventory"
version = "1.0"

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.apache.kafka:kafka-streams:3.8.0")
    implementation("io.javalin:javalin:6.1.6")
    implementation("com.squareup.okhttp3:okhttp:4.12.0")
    implementation("com.google.code.gson:gson:2.9.1")
    implementation("org.slf4j:slf4j-simple:2.0.12")
}

application {
    mainClass.set("com.example.inventory.InventoryApp")
}

Шаг 2: Создание моделей данных

Модель RestockEvent

public class RestockEvent {
    private String eventId;
    private String productId;
    private int quantity;
    private String supplier;
    private long timestamp;
}

Модель SaleEvent

public class SaleEvent {
    private String eventId;
    private String productId;
    private int quantity;
    private String storeId;
    private String saleDate; // Формат YYYY-MM-DD
    private long timestamp;
}

Модель ProductInfo

public class ProductInfo {
    private String productId;
    private String productName;
    private String category;
    private double price;
    private String manufacturer;
    private int reorderLevel;
}

Модель InventoryChange

public class InventoryChange {
    private String productId;
    private int quantityChange;
}

Шаг 3: Создание класса обработки InventoryProcessor

В этом шаге мы создадим класс InventoryProcessor, который будет содержать логику обработки потоков данных и агрегации состояния запасов.

Код класса InventoryProcessor

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;

public class InventoryProcessor {

    private final StreamsBuilder builder;
    private static final String RESTOCK_TOPIC = "restock-events-topic";
    private static final String SALE_TOPIC = "sale-events-topic";
    private static final String PRODUCT_INFO_TOPIC = "product-info-topic";
    private static final String INVENTORY_OUTPUT_TOPIC = "inventory-output-topic";

    public InventoryProcessor() {
        this.builder = new StreamsBuilder();
    }

    public Topology buildTopology() {
        // Сериализаторы и десериализаторы
        Serde stringSerde = Serdes.String();
        Serde restockEventSerde = Serdes.serdeFrom(new GsonSerializer<>(), new GsonDeserializer<>(RestockEvent.class));
        Serde saleEventSerde = Serdes.serdeFrom(new GsonSerializer<>(), new GsonDeserializer<>(SaleEvent.class));
        Serde productInfoSerde = Serdes.serdeFrom(new GsonSerializer<>(), new GsonDeserializer<>(ProductInfo.class));
        Serde inventoryChangeSerde = Serdes.serdeFrom(new GsonSerializer<>(), new GsonDeserializer<>(InventoryChange.class));
        Serde integerSerde = Serdes.Integer();

        // Чтение потоков
        KStream restockStream = builder.stream(RESTOCK_TOPIC, Consumed.with(stringSerde, restockEventSerde));
        KStream saleStream = builder.stream(SALE_TOPIC, Consumed.with(stringSerde, saleEventSerde));
        GlobalKTable productInfoTable = builder.globalTable(PRODUCT_INFO_TOPIC, Consumed.with(stringSerde, productInfoSerde));

        // Обогащение данных о поступлениях информацией о товаре
        KStream enrichedRestockStream = restockStream.join(
                productInfoTable,
                (key, value) -> value.getProductId(),
                (restockEvent, productInfo) -> {
                    // Обогащаем restockEvent при необходимости
                    return restockEvent;
                }
        );

        // Обогащение данных о продажах информацией о товаре
        KStream enrichedSaleStream = saleStream.join(
                productInfoTable,
                (key, value) -> value.getProductId(),
                (saleEvent, productInfo) -> {
                    // Обогащаем saleEvent при необходимости
                    return saleEvent;
                }
        );

        // Создаем класс InventoryChange
        KStream restockChanges = enrichedRestockStream
                .mapValues(restockEvent ->
                        new InventoryChange(restockEvent.getProductId(), restockEvent.getQuantity())
                );

        KStream saleChanges = enrichedSaleStream
                .mapValues(saleEvent ->
                        new InventoryChange(saleEvent.getProductId(), -saleEvent.getQuantity())
                );

        // Объединяем поступления и продажи
        KStream inventoryChanges = restockChanges.merge(saleChanges);

        // Группируем по productId
        KGroupedStream groupedInventory = inventoryChanges.groupByKey(Grouped.with(stringSerde, inventoryChangeSerde));

        // Агрегируем изменения
        KTable inventoryState = groupedInventory.aggregate(
                () -> 0,
                (key, value, aggregate) -> aggregate + value.getQuantityChange(),
                Materialized.>as("inventory-store")
                        .withKeySerde(stringSerde)
                        .withValueSerde(integerSerde)
        );

        // Выводим состояние в выходной топик
        inventoryState.toStream().to(INVENTORY_OUTPUT_TOPIC, Produced.with(stringSerde, integerSerde));

        return builder.build();
    }
}

Комментарии к коду

  • Сериализация и десериализация: Используем собственные классы GsonSerializer и GsonDeserializer для преобразования наших объектов в JSON и обратно.

  • Обогащение данных: Используем join для обогащения потоков данных информацией о товарах из productInfoTable.

  • Создание изменений запасов: Создаем экземпляры InventoryChange, представляющие изменение запасов (положительное для поступлений, отрицательное для продаж).

  • Группировка и агрегирование: Группируем изменения запасов по productId и агрегируем их с помощью aggregate, сохраняя текущее состояние запасов в inventory-store.

  • Выходной поток: Отправляем агрегированные данные в выходной топик inventory-output-topic.

Шаг 4: Реализация REST API с использованием Javalin

Создадим класс InventoryRestService, который будет предоставлять доступ к состоянию запасов через REST API.

Код класса InventoryRestService

import io.javalin.Javalin;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

public class InventoryRestService {

    private final KafkaStreams streams;

    public InventoryRestService(KafkaStreams streams) {
        this.streams = streams;
    }

    public void start() {
        var app = Javalin.create().start(7777);

        app.get("/inventory/{productId}", ctx -> {
            String productId = ctx.pathParam("productId");

            ReadOnlyKeyValueStore keyValueStore = streams.store(
                    StoreQueryParameters.fromNameAndType("inventory-store", QueryableStoreTypes.keyValueStore())
            );

            var quantity = keyValueStore.get(productId);
            if (quantity != null) {
                ctx.json(new InventoryResponse(productId, quantity));
            } else {
                ctx.status(404).result("Product not found");
            }
        });
    }

    public record InventoryResponse(String productId, int quantity) {
    }
}

Комментарии к коду

  • Маршрут /inventory/{productId}: Позволяет получить текущий запас товара по его productId.

  • Доступ к state store: Используем streams.store() с StoreQueryParameters, чтобы получить доступ к состоянию inventory-store.

  • Ответ: Возвращаем JSON с productId и текущим quantity в виде Java Record InventoryResponse.

Шаг 5: Объединение всего в InventoryApp

Создадим главный класс приложения InventoryApp, который будет запускать обработчик потоков и REST API.

Код класса InventoryApp

import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

public class InventoryApp {

    public static void main(String[] args) {
        var processor = new InventoryProcessor();
        var topology = processor.buildTopology();

        var streams = getKafkaStreams(topology);
        streams.start();

        var restService = new InventoryRestService(streams);
        restService.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }

    private static KafkaStreams getKafkaStreams(Topology topology) {
        var props = new Properties();

        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "inventory-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde");
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde");

        return new KafkaStreams(topology, props);
    }
}

Шаг 6: Создание классов сериализации и десериализации

Класс GsonSerializer

import com.google.gson.Gson;
import java.nio.charset.StandardCharsets;
import org.apache.kafka.common.serialization.Serializer;

public class GsonSerializer implements Serializer {
    private final Gson gson = new Gson();

    @Override
    public byte[] serialize(String topic, T data) {
        if (data == null) {
            return null;
        }
        return gson.toJson(data).getBytes(StandardCharsets.UTF_8);
    }
}

Класс GsonDeserializer

import com.google.gson.Gson;
import java.nio.charset.StandardCharsets;
import org.apache.kafka.common.serialization.Deserializer;

public class GsonDeserializer implements Deserializer {
    private final Gson gson = new Gson();
    private final Class deserializedClass;

    public GsonDeserializer(Class deserializedClass) {
        this.deserializedClass = deserializedClass;
    }

    @Override
    public T deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        return gson.fromJson(new String(data, StandardCharsets.UTF_8), deserializedClass);
    }
}

Шаг 7: Инструкции по запуску и тестированию приложения

Предварительные шаги

  1. Убедитесь, что Kafka запущена на localhost:9092 или доступна через Docker.

  2. Создайте необходимые топики:

kafka-topics.sh --create --topic restock-events-topic --bootstrap-server localhost:9092
kafka-topics.sh --create --topic sale-events-topic --bootstrap-server localhost:9092
kafka-topics.sh --create --topic product-info-topic --bootstrap-server localhost:9092
kafka-topics.sh --create --topic inventory-output-topic --bootstrap-server localhost:9092

Сборка и запуск приложения

./gradlew build
./gradlew run

Отправка тестовых сообщений через Kafka UI или консоль

Использование Kafka UI

  1. Подключитесь к Kafka UI и убедитесь, что он настроен для подключения к вашему Kafka-брокеру.

  2. Отправьте сообщения с ключами в соответствующие топики:

  • Топик product-info-topic

    • Key: product123

    • Value:

      {
        "productId": "product123",
        "productName": "Товар 123",
        "category": "Категория A",
        "price": 100.0,
        "manufacturer": "Производитель A",
        "reorderLevel": 50
      }
  • Топик restock-events-topic

    • Key: product123

    • Value:

      {
        "eventId": "restock1",
        "productId": "product123",
        "quantity": 100,
        "supplier": "Поставщик A",
        "timestamp": 1690000000000
      }
  • Топик sale-events-topic

    • Key: product123

    • Value:

      {
        "eventId": "sale1",
        "productId": "product123",
        "quantity": 20,
        "storeId": "storeA",
        "saleDate": "2023-07-22",
        "timestamp": 1690000100000
      }

Проверка состояния через REST API

Отправьте GET-запрос:

curl 

Ожидаемый ответ:

{
  "productId": "product123",
  "quantity": 80
}

Проверка выходного топика inventory-output-topic

Вы можете просмотреть сообщения в этом топике через Kafka UI или консольного потребителя:

kafka-console-consumer.sh --topic inventory-output-topic --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator=": "

Ожидаемый вывод: product123: 80

Заключение

Мы успешно реализовали приложение для управления запасами в реальном времени с использованием Kafka Streams и stateful processing. Приложение обрабатывает поступления и продажи товаров, поддерживает актуальное состояние запасов и предоставляет доступ к этому состоянию через REST API.

Полный исходный код нашего приложения

© Habrahabr.ru