Kafka Streams ч4: Stateful processing
В предыдущих статьях
мы познакомились с основами Kafka Streams и рассмотрели stateless операции. В этой статье мы погрузимся в stateful processing и создадим приложение для управления запасами в реальном времени. Шаг за шагом мы реализуем функциональность, которая позволит отслеживать состояние запасов товаров, обрабатывать поступления и продажи, а также предоставлять доступ к текущему состоянию через REST API.
Описание задачи
Мы разработаем приложение, которое:
Обрабатывает потоки данных о поступлениях товаров на склад и о продажах товаров.
Поддерживает актуальное состояние запасов для каждого товара.
Обновляет состояние запасов при поступлении новых товаров и при продажах.
Использует stateful операции для хранения состояния запасов.
Предоставляет доступ к состоянию через REST API с использованием Javalin.
Обогащает данные о транзакциях информацией о товаре из справочника.
Выводит информацию об остатках товаров в выходной топик Kafka.
Архитектура приложения
Бизнес-сущности
У нас есть три основных сущности:
Поступления товаров (
RestockEvent
)Продажи товаров (
SaleEvent
)Информация о товарах (
ProductInfo
)
Топики Kafka
Мы будем работать с четырьмя топиками:
restock-events-topic
(KStream
)sale-events-topic
(KStream
)product-info-topic
(GlobalKTable
)inventory-output-topic
(выходной топик)
Бизнес-схема
flowchart LR
RestockEvents[Поступления товаров] -->|KStream| InventoryProcessor[Процессор запасов]
SaleEvents[Продажи товаров] -->|KStream| InventoryProcessor
ProductInfo[Информация о товарах] -->|GlobalKTable| InventoryProcessor
InventoryProcessor -->|State Store| InventoryState[Состояние запасов]
InventoryProcessor -->|Выходной топик| InventoryOutput[Остатки товаров]
InventoryState -->|REST API| JavalinServer[Сервер Javalin]
Шаг 1: Настройка проекта
Структура проекта
Создадим проект со следующей структурой:
css
Copy code
kafka-streams-stateful/
├── build.gradle.kts
├── src/
│ ├── main/
│ │ ├── java/
│ │ │ └── com/
│ │ │ └── example/
│ │ │ └── inventory/
│ │ │ ├── InventoryApp.java
│ │ │ ├── InventoryChange.java
│ │ │ ├── InventoryProcessor.java
│ │ │ ├── InventoryRestService.java
│ │ │ ├── GsonSerializer.java
│ │ │ ├── GsonDeserializer.java
│ │ │ ├── ProductInfo.java
│ │ │ ├── RestockEvent.java
│ │ │ └── SaleEvent.java
│ │ └── resources/
│ └── test/
└── settings.gradle.kts
build.gradle.kts
: Файл сборки проекта с необходимыми зависимостями.src/main/java/com/example/inventory/
: Пакет с основными классами приложения.InventoryApp.java
: Главный класс приложения, запускающий Kafka Streams и REST API.InventoryProcessor.java
: Класс, содержащий логику обработки потоков данных и агрегации состояния запасов.InventoryRestService.java
: Класс, предоставляющий REST API для доступа к состоянию запасов.InventoryChange.java
: Модель данных для изменений запасов.RestockEvent.java
: Модель данных поступления товаров.SaleEvent.java
: Модель данных продаж товаров.ProductInfo.java
: Модель данных информации о товарах.GsonSerializer.java
: Класс для сериализации объектов в JSON.GsonDeserializer.java
: Класс для десериализации JSON в объекты.
src/main/resources/
: Ресурсы приложения (если необходимо).src/test/
: Тесты для приложения (опционально).settings.gradle.kts
: Настройки Gradle для проекта.
Файл сборки build.gradle.kts
plugins {
id("java")
id("application")
}
group = "com.example.inventory"
version = "1.0"
repositories {
mavenCentral()
}
dependencies {
implementation("org.apache.kafka:kafka-streams:3.8.0")
implementation("io.javalin:javalin:6.1.6")
implementation("com.squareup.okhttp3:okhttp:4.12.0")
implementation("com.google.code.gson:gson:2.9.1")
implementation("org.slf4j:slf4j-simple:2.0.12")
}
application {
mainClass.set("com.example.inventory.InventoryApp")
}
Шаг 2: Создание моделей данных
Модель RestockEvent
public class RestockEvent {
private String eventId;
private String productId;
private int quantity;
private String supplier;
private long timestamp;
}
Модель SaleEvent
public class SaleEvent {
private String eventId;
private String productId;
private int quantity;
private String storeId;
private String saleDate; // Формат YYYY-MM-DD
private long timestamp;
}
Модель ProductInfo
public class ProductInfo {
private String productId;
private String productName;
private String category;
private double price;
private String manufacturer;
private int reorderLevel;
}
Модель InventoryChange
public class InventoryChange {
private String productId;
private int quantityChange;
}
Шаг 3: Создание класса обработки InventoryProcessor
В этом шаге мы создадим класс InventoryProcessor
, который будет содержать логику обработки потоков данных и агрегации состояния запасов.
Код класса InventoryProcessor
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
public class InventoryProcessor {
private final StreamsBuilder builder;
private static final String RESTOCK_TOPIC = "restock-events-topic";
private static final String SALE_TOPIC = "sale-events-topic";
private static final String PRODUCT_INFO_TOPIC = "product-info-topic";
private static final String INVENTORY_OUTPUT_TOPIC = "inventory-output-topic";
public InventoryProcessor() {
this.builder = new StreamsBuilder();
}
public Topology buildTopology() {
// Сериализаторы и десериализаторы
Serde stringSerde = Serdes.String();
Serde restockEventSerde = Serdes.serdeFrom(new GsonSerializer<>(), new GsonDeserializer<>(RestockEvent.class));
Serde saleEventSerde = Serdes.serdeFrom(new GsonSerializer<>(), new GsonDeserializer<>(SaleEvent.class));
Serde productInfoSerde = Serdes.serdeFrom(new GsonSerializer<>(), new GsonDeserializer<>(ProductInfo.class));
Serde inventoryChangeSerde = Serdes.serdeFrom(new GsonSerializer<>(), new GsonDeserializer<>(InventoryChange.class));
Serde integerSerde = Serdes.Integer();
// Чтение потоков
KStream restockStream = builder.stream(RESTOCK_TOPIC, Consumed.with(stringSerde, restockEventSerde));
KStream saleStream = builder.stream(SALE_TOPIC, Consumed.with(stringSerde, saleEventSerde));
GlobalKTable productInfoTable = builder.globalTable(PRODUCT_INFO_TOPIC, Consumed.with(stringSerde, productInfoSerde));
// Обогащение данных о поступлениях информацией о товаре
KStream enrichedRestockStream = restockStream.join(
productInfoTable,
(key, value) -> value.getProductId(),
(restockEvent, productInfo) -> {
// Обогащаем restockEvent при необходимости
return restockEvent;
}
);
// Обогащение данных о продажах информацией о товаре
KStream enrichedSaleStream = saleStream.join(
productInfoTable,
(key, value) -> value.getProductId(),
(saleEvent, productInfo) -> {
// Обогащаем saleEvent при необходимости
return saleEvent;
}
);
// Создаем класс InventoryChange
KStream restockChanges = enrichedRestockStream
.mapValues(restockEvent ->
new InventoryChange(restockEvent.getProductId(), restockEvent.getQuantity())
);
KStream saleChanges = enrichedSaleStream
.mapValues(saleEvent ->
new InventoryChange(saleEvent.getProductId(), -saleEvent.getQuantity())
);
// Объединяем поступления и продажи
KStream inventoryChanges = restockChanges.merge(saleChanges);
// Группируем по productId
KGroupedStream groupedInventory = inventoryChanges.groupByKey(Grouped.with(stringSerde, inventoryChangeSerde));
// Агрегируем изменения
KTable inventoryState = groupedInventory.aggregate(
() -> 0,
(key, value, aggregate) -> aggregate + value.getQuantityChange(),
Materialized.>as("inventory-store")
.withKeySerde(stringSerde)
.withValueSerde(integerSerde)
);
// Выводим состояние в выходной топик
inventoryState.toStream().to(INVENTORY_OUTPUT_TOPIC, Produced.with(stringSerde, integerSerde));
return builder.build();
}
}
Комментарии к коду
Сериализация и десериализация: Используем собственные классы
GsonSerializer
иGsonDeserializer
для преобразования наших объектов в JSON и обратно.Обогащение данных: Используем
join
для обогащения потоков данных информацией о товарах изproductInfoTable
.Создание изменений запасов: Создаем экземпляры
InventoryChange
, представляющие изменение запасов (положительное для поступлений, отрицательное для продаж).Группировка и агрегирование: Группируем изменения запасов по
productId
и агрегируем их с помощьюaggregate
, сохраняя текущее состояние запасов вinventory-store
.Выходной поток: Отправляем агрегированные данные в выходной топик
inventory-output-topic
.
Шаг 4: Реализация REST API с использованием Javalin
Создадим класс InventoryRestService
, который будет предоставлять доступ к состоянию запасов через REST API.
Код класса InventoryRestService
import io.javalin.Javalin;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
public class InventoryRestService {
private final KafkaStreams streams;
public InventoryRestService(KafkaStreams streams) {
this.streams = streams;
}
public void start() {
var app = Javalin.create().start(7777);
app.get("/inventory/{productId}", ctx -> {
String productId = ctx.pathParam("productId");
ReadOnlyKeyValueStore keyValueStore = streams.store(
StoreQueryParameters.fromNameAndType("inventory-store", QueryableStoreTypes.keyValueStore())
);
var quantity = keyValueStore.get(productId);
if (quantity != null) {
ctx.json(new InventoryResponse(productId, quantity));
} else {
ctx.status(404).result("Product not found");
}
});
}
public record InventoryResponse(String productId, int quantity) {
}
}
Комментарии к коду
Маршрут
/inventory/{productId}
: Позволяет получить текущий запас товара по егоproductId
.Доступ к state store: Используем
streams.store()
сStoreQueryParameters
, чтобы получить доступ к состояниюinventory-store
.Ответ: Возвращаем JSON с
productId
и текущимquantity
в виде Java RecordInventoryResponse
.
Шаг 5: Объединение всего в InventoryApp
Создадим главный класс приложения InventoryApp
, который будет запускать обработчик потоков и REST API.
Код класса InventoryApp
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
public class InventoryApp {
public static void main(String[] args) {
var processor = new InventoryProcessor();
var topology = processor.buildTopology();
var streams = getKafkaStreams(topology);
streams.start();
var restService = new InventoryRestService(streams);
restService.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
private static KafkaStreams getKafkaStreams(Topology topology) {
var props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "inventory-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde");
return new KafkaStreams(topology, props);
}
}
Шаг 6: Создание классов сериализации и десериализации
Класс GsonSerializer
import com.google.gson.Gson;
import java.nio.charset.StandardCharsets;
import org.apache.kafka.common.serialization.Serializer;
public class GsonSerializer implements Serializer {
private final Gson gson = new Gson();
@Override
public byte[] serialize(String topic, T data) {
if (data == null) {
return null;
}
return gson.toJson(data).getBytes(StandardCharsets.UTF_8);
}
}
Класс GsonDeserializer
import com.google.gson.Gson;
import java.nio.charset.StandardCharsets;
import org.apache.kafka.common.serialization.Deserializer;
public class GsonDeserializer implements Deserializer {
private final Gson gson = new Gson();
private final Class deserializedClass;
public GsonDeserializer(Class deserializedClass) {
this.deserializedClass = deserializedClass;
}
@Override
public T deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
return gson.fromJson(new String(data, StandardCharsets.UTF_8), deserializedClass);
}
}
Шаг 7: Инструкции по запуску и тестированию приложения
Предварительные шаги
Убедитесь, что Kafka запущена на
localhost:9092
или доступна через Docker.Создайте необходимые топики:
kafka-topics.sh --create --topic restock-events-topic --bootstrap-server localhost:9092
kafka-topics.sh --create --topic sale-events-topic --bootstrap-server localhost:9092
kafka-topics.sh --create --topic product-info-topic --bootstrap-server localhost:9092
kafka-topics.sh --create --topic inventory-output-topic --bootstrap-server localhost:9092
Сборка и запуск приложения
./gradlew build
./gradlew run
Отправка тестовых сообщений через Kafka UI или консоль
Использование Kafka UI
Подключитесь к Kafka UI и убедитесь, что он настроен для подключения к вашему Kafka-брокеру.
Отправьте сообщения с ключами в соответствующие топики:
Топик
product-info-topic
Key:
product123
Value:
{ "productId": "product123", "productName": "Товар 123", "category": "Категория A", "price": 100.0, "manufacturer": "Производитель A", "reorderLevel": 50 }
Топик
restock-events-topic
Key:
product123
Value:
{ "eventId": "restock1", "productId": "product123", "quantity": 100, "supplier": "Поставщик A", "timestamp": 1690000000000 }
Топик
sale-events-topic
Key:
product123
Value:
{ "eventId": "sale1", "productId": "product123", "quantity": 20, "storeId": "storeA", "saleDate": "2023-07-22", "timestamp": 1690000100000 }
Проверка состояния через REST API
Отправьте GET-запрос:
curl
Ожидаемый ответ:
{
"productId": "product123",
"quantity": 80
}
Проверка выходного топика inventory-output-topic
Вы можете просмотреть сообщения в этом топике через Kafka UI или консольного потребителя:
kafka-console-consumer.sh --topic inventory-output-topic --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator=": "
Ожидаемый вывод: product123: 80
Заключение
Мы успешно реализовали приложение для управления запасами в реальном времени с использованием Kafka Streams и stateful processing. Приложение обрабатывает поступления и продажи товаров, поддерживает актуальное состояние запасов и предоставляет доступ к этому состоянию через REST API.
Полный исходный код нашего приложения