Создаем Spring Boot Starter для Kafka с Avro: пошаговое руководство

Введение
Apache Kafka — мощный инструмент для обработки потоков данных в реальном времени, но его интеграция в проекты на Spring Boot может быть непростой задачей
В этой статье вы узнаете о лучших практиках разработки стартеров Spring Boot Starter для Kafka с поддержкой Avro, а также получите примеры использования с различными настройками. Эта статья будет полезна разработчикам, желающим упростить работу с Kafka, и менеджерам проектов, ищущим способы оптимизации процессов. Дочитавшим статью до конца будет приятный бонус.
Я хотел поделиться опытом создания библиотеки, которая упрощает интеграцию Kafka в Spring Boot, предоставляя гибкую конфигурацию и поддержку сериализации Avro.
Уникальность: Многие стартеры для Kafka существуют, но мой фокусируется на enterprise-функциях (идемпотентность, ретраи) и передаче схемы Avro через параметры.
Почему мне верят: Я подробно описываю процесс, включая ошибки и их решения, а также публикую рабочий код.
Проблема: Настройка Kafka в проектах часто требует много boilerplate-кода.
Цель: Дать читателю готовое решение и вдохновить на создание собственных стартеров.
Что такое Spring Boot Starter?
Spring Boot Starter — это модуль, который предоставляет готовую конфигурацию для определенной технологии. Например, spring-boot-starter-web настраивает веб-сервер, а spring-boot-starter-data-jpa — доступ к базе данных. Наш стартер будет:
Автоматически создавать KafkaProducer и KafkaConsumer.
Использовать Avro для сериализации и десериализации сообщений.
Поддерживать настройку через application.yml.
Этапы разработки
Процесс разработки включал создание автоконфигурации для продюсера и консюмера Kafka. Вот ключевые шаги:
Определение зависимостей:
Spring Boot для автоконфигурации.
Spring Kafka для работы с Kafka.
Apache Avro для сериализации (опционально).
Создание KafkaProperties:
Класс для чтения конфигурации из application.yml с префиксом apppetr.kafka.
Автоконфигурация:
Настройка KafkaTemplate для продюсера и ConcurrentKafkaListenerContainerFactory для консюмера.
Поддержка Avro:
Мы будем использовать Avro для строгой типизации сообщений. Определим схему при помощи настройки в конфигурационном файле стартера
3. Создание класса свойств
Для настройки стартера через application.yml создадим класс KafkaProperties в kafka-starter/src/main/java/com/app/petr/KafkaProperties.java:
package com.app.petr;
import org.springframework.boot.context.properties.ConfigurationProperties;
import java.util.HashMap;
import java.util.Map;
@ConfigurationProperties(prefix = "apppetr.kafka")
public class KafkaProperties {
private String bootstrapServers = "localhost:9092";
private Producer producer = new Producer();
private Consumer consumer = new Consumer();
private String avroSchemaPath;
public static class Producer {
private String topic = "default-topic";
private boolean idempotenceEnabled = true; // Идемпотентность для надежности
private int acks = 1; // Подтверждения: 0, 1, all
private int retries = 3; // Количество попыток при сбоях
private int retryBackoffMs = 1000; // Задержка между ретраями
private int deliveryTimeoutMs = 120000; // Таймаут доставки
private int requestTimeoutMs = 30000; // Таймаут запроса
private int maxInFlightRequests = 5; // Макс. количество неподтвержденных запросов
private Map config = new HashMap<>(); // Дополнительные настройки
public String getTopic() { return topic; }
public void setTopic(String topic) { this.topic = topic; }
public boolean isIdempotenceEnabled() { return idempotenceEnabled; }
public void setIdempotenceEnabled(boolean idempotenceEnabled) { this.idempotenceEnabled = idempotenceEnabled; }
public int getAcks() { return acks; }
public void setAcks(int acks) { this.acks = acks; }
public int getRetries() { return retries; }
public void setRetries(int retries) { this.retries = retries; }
public int getRetryBackoffMs() { return retryBackoffMs; }
public void setRetryBackoffMs(int retryBackoffMs) { this.retryBackoffMs = retryBackoffMs; }
public int getDeliveryTimeoutMs() { return deliveryTimeoutMs; }
public void setDeliveryTimeoutMs(int deliveryTimeoutMs) { this.deliveryTimeoutMs = deliveryTimeoutMs; }
public int getRequestTimeoutMs() { return requestTimeoutMs; }
public void setRequestTimeoutMs(int requestTimeoutMs) { this.requestTimeoutMs = requestTimeoutMs; }
public int getMaxInFlightRequests() { return maxInFlightRequests; }
public void setMaxInFlightRequests(int maxInFlightRequests) { this.maxInFlightRequests = maxInFlightRequests; }
public Map getConfig() { return config; }
public void setConfig(Map config) { this.config = config; }
}
public static class Consumer {
private String topic = "default-topic";
private String groupId = "default-group";
private int maxPollRecords = 500; // Макс. записей за один poll
private int maxPollIntervalMs = 300000; // Макс. интервал между poll
private int sessionTimeoutMs = 10000; // Таймаут сессии
private int heartbeatIntervalMs = 3000; // Интервал heartbeat
private int fetchMaxBytes = 52428800; // Макс. размер выборки (50MB)
private boolean autoCommitEnabled = true; // Автокоммит оффсетов
private int autoCommitIntervalMs = 5000; // Интервал автокоммита
private String autoOffsetReset = "earliest"; // Сброс оффсета
private int retryBackoffMs = 1000; // Задержка между ретраями
private int maxRetries = 3; // Количество ретраев
private Map config = new HashMap<>(); // Дополнительные настройки
public String getTopic() { return topic; }
public void setTopic(String topic) { this.topic = topic; }
public String getGroupId() { return groupId; }
public void setGroupId(String groupId) { this.groupId = groupId; }
public int getMaxPollRecords() { return maxPollRecords; }
public void setMaxPollRecords(int maxPollRecords) { this.maxPollRecords = maxPollRecords; }
public int getMaxPollIntervalMs() { return maxPollIntervalMs; }
public void setMaxPollIntervalMs(int maxPollIntervalMs) { this.maxPollIntervalMs = maxPollIntervalMs; }
public int getSessionTimeoutMs() { return sessionTimeoutMs; }
public void setSessionTimeoutMs(int sessionTimeoutMs) { this.sessionTimeoutMs = sessionTimeoutMs; }
public int getHeartbeatIntervalMs() { return heartbeatIntervalMs; }
public void setHeartbeatIntervalMs(int heartbeatIntervalMs) { this.heartbeatIntervalMs = heartbeatIntervalMs; }
public int getFetchMaxBytes() { return fetchMaxBytes; }
public void setFetchMaxBytes(int fetchMaxBytes) { this.fetchMaxBytes = fetchMaxBytes; }
public boolean isAutoCommitEnabled() { return autoCommitEnabled; }
public void setAutoCommitEnabled(boolean autoCommitEnabled) { this.autoCommitEnabled = autoCommitEnabled; }
public int getAutoCommitIntervalMs() { return autoCommitIntervalMs; }
public void setAutoCommitIntervalMs(int autoCommitIntervalMs) { this.autoCommitIntervalMs = autoCommitIntervalMs; }
public String getAutoOffsetReset() { return autoOffsetReset; }
public void setAutoOffsetReset(String autoOffsetReset) { this.autoOffsetReset = autoOffsetReset; }
public int getRetryBackoffMs() { return retryBackoffMs; }
public void setRetryBackoffMs(int retryBackoffMs) { this.retryBackoffMs = retryBackoffMs; }
public int getMaxRetries() { return maxRetries; }
public void setMaxRetries(int maxRetries) { this.maxRetries = maxRetries; }
public Map getConfig() { return config; }
public void setConfig(Map config) { this.config = config; }
}
public String getBootstrapServers() { return bootstrapServers; }
public void setBootstrapServers(String bootstrapServers) { this.bootstrapServers = bootstrapServers; }
public Producer getProducer() { return producer; }
public void setProducer(Producer producer) { this.producer = producer; }
public Consumer getConsumer() { return consumer; }
public void setConsumer(Consumer consumer) { this.consumer = consumer; }
public String getAvroSchemaPath() {
return avroSchemaPath;
}
public void setAvroSchemaPath(String avroSchemaPath) {
this.avroSchemaPath = avroSchemaPath;
}
}
Этот класс позволяет задавать параметры Kafka через конфигурацию, например:
apppetr:
kafka:
bootstrap-servers: localhost:9092
producer:
topic: test-topic
idempotence-enabled: true
acks: all
retries: 5
retry-backoff-ms: 2000
delivery-timeout-ms: 120000
request-timeout-ms: 30000
max-in-flight-requests: 5
consumer:
topic: test-topic
group-id: test-group
max-poll-records: 1000
max-poll-interval-ms: 600000
session-timeout-ms: 15000
heartbeat-interval-ms: 5000
fetch-max-bytes: 52428800
auto-commit-enabled: true
auto-commit-interval-ms: 5000
auto-offset-reset: earliest
retry-backoff-ms: 2000
max-retries: 5
avro-schema-path: ${project.basedir}/src/main/resources/avro/message.avdl
4. Реализация автоконфигурации
Ключевой элемент стартера — класс автоконфигурации KafkaAutoConfiguration в kafka-starter/src/main/java/com/app/petr/KafkaAutoConfiguration.java:
package com.app.petr;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;
import java.util.HashMap;
import java.util.Map;
@Configuration
@ConditionalOnClass({ KafkaTemplate.class, ConcurrentKafkaListenerContainerFactory.class })
@EnableKafka
@EnableConfigurationProperties(KafkaProperties.class)
@ConditionalOnProperty(prefix = "apppetr.kafka", name = "bootstrap-servers")
public class KafkaAutoConfiguration {
private final KafkaProperties properties;
public KafkaAutoConfiguration(KafkaProperties properties) {
this.properties = properties;
}
// Producer Configuration
@Bean
@ConditionalOnMissingBean
public ProducerFactory producerFactory() {
Map config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, properties.getProducer().isIdempotenceEnabled());
config.put(ProducerConfig.ACKS_CONFIG, String.valueOf(properties.getProducer().getAcks()));
config.put(ProducerConfig.RETRIES_CONFIG, properties.getProducer().getRetries());
config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, properties.getProducer().getRetryBackoffMs());
config.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, properties.getProducer().getDeliveryTimeoutMs());
config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, properties.getProducer().getRequestTimeoutMs());
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, properties.getProducer().getMaxInFlightRequests());
config.putAll(properties.getProducer().getConfig());
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
@ConditionalOnMissingBean
public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) {
KafkaTemplate template = new KafkaTemplate<>(producerFactory);
template.setDefaultTopic(properties.getProducer().getTopic());
return template;
}
// Consumer Configuration
@Bean
@ConditionalOnMissingBean
public ConsumerFactory consumerFactory() {
Map config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
config.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getConsumer().getGroupId());
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, properties.getConsumer().getMaxPollRecords());
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, properties.getConsumer().getMaxPollIntervalMs());
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, properties.getConsumer().getSessionTimeoutMs());
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, properties.getConsumer().getHeartbeatIntervalMs());
config.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, properties.getConsumer().getFetchMaxBytes());
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, properties.getConsumer().isAutoCommitEnabled());
config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, properties.getConsumer().getAutoCommitIntervalMs());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, properties.getConsumer().getAutoOffsetReset());
config.putAll(properties.getConsumer().getConfig());
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
@ConditionalOnMissingBean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConsumerFactory consumerFactory) {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(1); // Параллелизм
factory.getContainerProperties().setPollTimeout(3000); // Таймаут опроса
factory.setCommonErrorHandler(errorHandler()); // Настраиваем обработку ошибок с ретраями
return factory;
}
@Bean
public DefaultErrorHandler errorHandler() {
BackOff backOff = new FixedBackOff(
properties.getConsumer().getRetryBackoffMs(),
properties.getConsumer().getMaxRetries()
);
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
(record, exception) -> {
System.err.println("Failed to process record: " + record + ", exception: " + exception.getMessage());
}, // Логирование ошибок
backOff
);
return errorHandler;
}
}
Что делает этот код?
Создает KafkaProducer и KafkaConsumer с настройками из KafkaProperties.
Использует @ConditionalOnMissingBean, чтобы пользователь мог переопределить бины.
Добавляет DisposableBean для корректного закрытия ресурсов.
5. Добавление файла импортов автоконфигурации: org.springframework.boot.autoconfigure.AutoConfiguration.imports
В процессе разработки стартера я добавил файл src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports, который заменил устаревший подход с spring.factories для указания автоконфигурационных классов в Spring Boot 3.x. Этот файл содержит список классов автоконфигурации, которые Spring Boot должен загрузить.
Пример содержимого:
io.github.bigbox89.KafkaAutoConfiguration
Назначение: Сообщает Spring Boot, что KafkaAutoConfiguration является точкой входа для автоконфигурации стартера. Это упрощает обнаружение и регистрацию конфигурации без необходимости полного сканирования classpath.
Что будет, если его не добавить
Если файл org.springframework.boot.autoconfigure.AutoConfiguration.imports отсутствует:
Автоконфигурация не загрузится: Spring Boot не найдёт класс KafkaAutoConfiguration, и стартер не будет применён, даже если все зависимости и свойства указаны корректно.
Тихий сбой: Приложение запустится, но продюсер и консюмер Kafka не будут настроены, что может привести к неочевидным ошибкам (например, @KafkaListener не сработает).
Совместимость: В версиях Spring Boot до 3.0 можно было использовать spring.factories, но в 3.x без AutoConfiguration.imports стартер становится неработоспособным.
6. Примеры использования
Продюсер
Пример ProducerApplication в producer-example/src/main/java/com/app/petr/ProducerApplication.java:
package com.app.petr;
import app.petr.Message;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;
import java.nio.ByteBuffer;
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate kafkaTemplate, KafkaProperties properties) {
return args -> {
String topic = properties.getProducer().getTopic();
if (topic == null) {
throw new IllegalStateException("Producer topic is not configured");
}
for (int i = 0; i < 100; i++) {
Message message = Message.newBuilder()
.setId("id-" + i)
.setContent("Message " + i)
.setTimestamp(System.currentTimeMillis())
.build();
ByteBuffer buffer = message.toByteBuffer();
byte[] messageBytes = new byte[buffer.remaining()];
buffer.get(messageBytes);
kafkaTemplate.send(topic, message.getId().toString(), messageBytes);
System.out.println("Sent message: " + message.getId());
Thread.sleep(1000);
}
};
}
}
Консюмер
Пример ConsumerApplication в consumer-example/src/main/java/com/app/petr/ConsumerApplication.java:
package com.app.petr;
import app.petr.Message;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import java.nio.ByteBuffer;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@KafkaListener(topics = "#{kafkaProperties.consumer.topic}", groupId = "#{kafkaProperties.consumer.groupId}")
public void listen(byte[] message) throws Exception {
Message avroMessage = Message.fromByteBuffer(ByteBuffer.wrap(message));
System.out.printf("Received message: id=%s, content=%s, timestamp=%d%n",
avroMessage.getId(), avroMessage.getContent(), avroMessage.getTimestamp());
}
}
7. Тестирование с Testcontainers
Чтобы убедиться, что стартер работает, добавим интеграционные тесты:
package com.app.petr;
import app.petr.Message;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import java.io.ByteArrayOutputStream;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import static org.assertj.core.api.Assertions.assertThat;
@Testcontainers
public class IntegrationProducerAndConsumerTest {
private static final String TOPIC = "rest_data";
@Container
private static final KafkaContainer kafkaContainer = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.3.0"));
private KafkaProducer createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer<>(props);
}
private KafkaConsumer createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<>(props);
}
@Test
public void testProducerAndConsumer() throws Exception {
KafkaProducer producer = createProducer();
KafkaConsumer consumer = createConsumer();
consumer.subscribe(Collections.singleton(TOPIC));
Message message = Message.newBuilder()
.setId("test-id")
.setContent("Test content")
.setTimestamp(System.currentTimeMillis())
.build();
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
SpecificDatumWriter writer = new SpecificDatumWriter<>(Message.class);
writer.write(message, encoder);
encoder.flush();
byte[] serializedMessage = out.toByteArray();
ProducerRecord record = new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage);
producer.send(record).get();
producer.flush();
SpecificDatumReader reader = new SpecificDatumReader<>(Message.class);
ConsumerRecords records = consumer.poll(Duration.ofSeconds(5));
assertThat(records.isEmpty()).isFalse();
records.forEach(consumerRecord -> {
try {
Decoder decoder = DecoderFactory.get().binaryDecoder(consumerRecord.value(), null);
Message receivedMessage = reader.read(null, decoder);
assertThat(receivedMessage.getId().toString()).isEqualTo(message.getId());
assertThat(receivedMessage.getContent().toString()).isEqualTo(message.getContent());
assertThat(receivedMessage.getTimestamp()).isEqualTo(message.getTimestamp());
} catch (Exception e) {
throw new RuntimeException("Failed to deserialize message", e);
}
});
producer.close();
consumer.close();
}
}
Также добавим отдельно тесты для продюссера:
package com.app.petr;
import app.petr.Message;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import java.io.ByteArrayOutputStream;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
@Testcontainers
public class ProducerApplicationTest {
private static final String TOPIC = "test_data";
@Container
private static final KafkaContainer kafkaContainer = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.3.0"));
@BeforeAll
static void setup() {
kafkaContainer.start();
}
private KafkaProducer createProducer(Properties props) {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer<>(props);
}
private byte[] serializeMessage(Message message) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
SpecificDatumWriter writer = new SpecificDatumWriter<>(Message.class);
writer.write(message, encoder);
encoder.flush();
return out.toByteArray();
}
@Test
public void testProducerWithDefaultConfig() throws Exception {
Properties props = new Properties();
KafkaProducer producer = createProducer(props);
Message message = Message.newBuilder()
.setId("test-id-1")
.setContent("Default config test")
.setTimestamp(System.currentTimeMillis())
.build();
byte[] serializedMessage = serializeMessage(message);
ProducerRecord record = new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage);
Future future = producer.send(record);
producer.flush();
assertThat(future.get()).isNotNull(); // Проверяем успешную отправку
producer.close();
}
@Test
public void testProducerWithIdempotenceEnabled() throws Exception {
Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // Включаем идемпотентность
props.put(ProducerConfig.ACKS_CONFIG, "all"); // Требуется для идемпотентности
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); // Ограничение для идемпотентности
KafkaProducer producer = createProducer(props);
Message message = Message.newBuilder()
.setId("test-id-2")
.setContent("Idempotent test")
.setTimestamp(System.currentTimeMillis())
.build();
byte[] serializedMessage = serializeMessage(message);
ProducerRecord record = new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage);
Future future = producer.send(record);
producer.send(record); // Отправляем тот же ключ ещё раз
producer.flush();
assertThat(future.get()).isNotNull(); // Успешная отправка с идемпотентностью
producer.close();
}
@Test
public void testProducerWithAcksZero() throws Exception {
Properties props = new Properties();
props.put(ProducerConfig.ACKS_CONFIG, "0"); // Без подтверждений
KafkaProducer producer = createProducer(props);
Message message = Message.newBuilder()
.setId("test-id-3")
.setContent("Acks=0 test")
.setTimestamp(System.currentTimeMillis())
.build();
byte[] serializedMessage = serializeMessage(message);
ProducerRecord record = new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage);
Future future = producer.send(record);
producer.flush();
assertThat(future.get()).isNotNull(); // Отправка без ожидания подтверждения
producer.close();
}
@Test
public void testProducerWithRetries() throws Exception {
Properties props = new Properties();
props.put(ProducerConfig.RETRIES_CONFIG, "3"); // 3 попытки
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100"); // Задержка между попытками 100 мс
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "500"); // Уменьшаем таймаут запроса до 500 мс
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "1000"); // Таймаут доставки 1000 мс (должен быть >= linger.ms + request.timeout.ms)
props.put(ProducerConfig.LINGER_MS_CONFIG, "0"); // Явно задаём linger.ms для ясности
KafkaProducer producer = createProducer(props);
Message message = Message.newBuilder()
.setId("test-id-4")
.setContent("Retries test")
.setTimestamp(System.currentTimeMillis())
.build();
byte[] serializedMessage = serializeMessage(message);
ProducerRecord record = new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage);
Future future = producer.send(record);
producer.flush();
assertThat(future.get()).isNotNull(); // Успешная отправка с ретраями
producer.close();
}
@Test
public void testProducerWithShortTimeout() {
Properties props = new Properties();
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1"); // Очень короткий таймаут
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "1"); // Очень короткий таймаут доставки
KafkaProducer producer = createProducer(props);
Message message = Message.newBuilder()
.setId("test-id-5")
.setContent("Short timeout test")
.setTimestamp(System.currentTimeMillis())
.build();
byte[] serializedMessage;
try {
serializedMessage = serializeMessage(message);
} catch (Exception e) {
throw new RuntimeException("Serialization failed", e);
}
ProducerRecord record = new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage);
Future future = producer.send(record);
// Ожидаем исключение из-за короткого таймаута
assertThrows(ExecutionException.class, future::get, "Expected timeout exception due to short timeout");
assertThat(future.isDone()).isTrue();
producer.close();
}
}
И конcюмера:
package com.app.petr;
import app.petr.Message;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import java.io.ByteArrayOutputStream;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import static org.assertj.core.api.Assertions.assertThat;
@Testcontainers
public class ConsumerApplicationTest {
private static final String TOPIC = "consumer_test_data";
@Container
private static final KafkaContainer kafkaContainer = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.3.0"));
@BeforeAll
static void setup() {
kafkaContainer.start();
}
@BeforeEach
void clearTopic() {
Properties adminProps = new Properties();
adminProps.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
try (AdminClient adminClient = AdminClient.create(adminProps)) {
adminClient.deleteTopics(Collections.singleton(TOPIC)).all().get();
adminClient.createTopics(Collections.singleton(new NewTopic(TOPIC, 1, (short) 1))).all().get();
} catch (Exception e) {
// Игнорируем ошибки, если топик не существовал
}
}
private KafkaProducer createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer<>(props);
}
private KafkaConsumer createConsumer(Properties props) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
return new KafkaConsumer<>(props);
}
private byte[] serializeMessage(Message message) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
SpecificDatumWriter writer = new SpecificDatumWriter<>(Message.class);
writer.write(message, encoder);
encoder.flush();
return out.toByteArray();
}
private Message deserializeMessage(byte[] data) throws Exception {
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
SpecificDatumReader reader = new SpecificDatumReader<>(Message.class);
return reader.read(null, decoder);
}
private void sendMessages(int count) throws Exception {
KafkaProducer producer = createProducer();
for (int i = 0; i < count; i++) {
Message message = Message.newBuilder()
.setId("id-" + i)
.setContent("Message " + i)
.setTimestamp(System.currentTimeMillis())
.build();
byte[] serializedMessage = serializeMessage(message);
producer.send(new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage)).get();
}
producer.flush();
producer.close();
}
@Test
public void testConsumerWithDefaultConfig() throws Exception {
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "default-group-" + System.nanoTime());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer consumer = createConsumer(props);
consumer.subscribe(Collections.singleton(TOPIC));
sendMessages(5);
ConsumerRecords records = consumer.poll(Duration.ofSeconds(5));
assertThat(records.count()).isEqualTo(5);
consumer.close();
}
@Test
public void testConsumerWithMaxPollRecords() throws Exception {
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "max-poll-group-" + System.nanoTime());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2");
KafkaConsumer consumer = createConsumer(props);
consumer.subscribe(Collections.singleton(TOPIC));
sendMessages(5);
ConsumerRecords records = consumer.poll(Duration.ofSeconds(5));
assertThat(records.count()).isLessThanOrEqualTo(2);
consumer.close();
}
@Test
public void testConsumerWithAutoOffsetResetLatest() throws Exception {
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "latest-group-" + System.nanoTime());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
KafkaConsumer consumer = createConsumer(props);
// Подписываемся и вызываем poll для завершения регистрации
consumer.subscribe(Collections.singleton(TOPIC));
consumer.poll(Duration.ofSeconds(20)); // Даём Kafka время зарегистрировать консюмера
// Убеждаемся, что до отправки сообщений ничего не читается
ConsumerRecords recordsBefore = consumer.poll(Duration.ofSeconds(1));
assertThat(recordsBefore.isEmpty()).isTrue();
// Отправляем 3 сообщения после полной регистрации
sendMessages(3);
// Читаем новые сообщения
ConsumerRecords recordsAfter = consumer.poll(Duration.ofSeconds(5));
assertThat(recordsAfter.count()).isEqualTo(3);
consumer.close();
}
@Test
public void testConsumerWithDisableAutoCommit() throws Exception {
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "no-auto-commit-group-" + System.nanoTime());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer consumer = createConsumer(props);
consumer.subscribe(Collections.singleton(TOPIC));
sendMessages(5);
ConsumerRecords records = consumer.poll(Duration.ofSeconds(5));
assertThat(records.count()).isEqualTo(5);
consumer.close();
consumer = createConsumer(props);
consumer.subscribe(Collections.singleton(TOPIC));
ConsumerRecords recordsAgain = consumer.poll(Duration.ofSeconds(5));
assertThat(recordsAgain.count()).isEqualTo(5);
consumer.close();
}
@Test
public void testConsumerWithShortSessionTimeout() throws Exception {
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "short-session-group-" + System.nanoTime());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "6000");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "2000");
KafkaConsumer consumer = createConsumer(props);
consumer.subscribe(Collections.singleton(TOPIC));
sendMessages(3);
ConsumerRecords records = consumer.poll(Duration.ofSeconds(5));
assertThat(records.count()).isEqualTo(3);
consumer.close();
}
}
Как работает тест?
Testcontainers запускает Kafka-контейнер.
Сообщение отправляется через продюсер и читается консюмером.
Проверяется соответствие отправленного и полученного сообщения.
Преимущества стартера
Простота подключения: Достаточно добавить зависимость и настроить application.yml.
Гибкость: Возможность переопределить бины или добавить кастомные настройки.
Типобезопасность: Avro обеспечивает строгую структуру данных.
Возможные ошибки и их решение
Проблема 1: Генерация Avro-классов внутри стартера
Ситуация: Изначально я использовал avro-maven-plugin для генерации классов из message.avdl в стартере. Это ограничивало гибкость, так как схема была фиксированной.
Решение:
Убрал генерацию из стартера.
Добавил параметр avro-schema-path в KafkaProperties, чтобы пользователь указывал путь к схеме.
Переложил ответственность за генерацию на проект пользователя.
Пример конфигурации:
apppetr:
kafka:
avro-schema-path: ${project.basedir}/src/main/resources/avro/message.avdl
Проблема 2: Ошибки в тестах с auto.offset.reset=latest
Ситуация: Тест testConsumerWithAutoOffsetResetLatest ожидал 3 сообщения, но получал 1 из-за асинхронности подписки.
Решение:
Добавил consumer.poll (Duration.ofSeconds (2)) после subscribe, чтобы дождаться регистрации консюмера.
Убрал Thread.sleep, сделав тест более надёжным.
consumer.subscribe(Collections.singleton(TOPIC));
consumer.poll(Duration.ofSeconds(20));
sendMessages(3);
assertThat(consumer.poll(Duration.ofSeconds(5)).count()).isEqualTo(3);
Примеры применения стартера
1. Надёжная доставка транзакций
Задача: Гарантировать доставку без дубликатов.
Конфигурация:
apppetr:
kafka:
bootstrap-servers: kafka1:9092,kafka2:9092
producer:
topic: transactions
idempotence-enabled: true
acks: all
retries: 5
retry-backoff-ms: 2000
avro-schema-path: src/main/resources/avro/transaction.avdl
Код:
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendTransaction(Transaction tx) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
SpecificDatumWriter writer = new SpecificDatumWriter<>(Transaction.class);
writer.write(tx, encoder);
encoder.flush();
kafkaTemplate.send("transactions", tx.getId().toString(), out.toByteArray());
}
2. Высокоскоростной сбор логов
Задача: Быстрая отправка логов с допустимой потерей.
Конфигурация:
apppetr:
kafka:
bootstrap-servers: localhost:9092
producer:
topic: logs
acks: 0
max-in-flight-requests: 10
consumer:
topic: logs
group-id: log-collector
max-poll-records: 1000
auto-offset-reset: latest
avro-schema-path: src/main/resources/avro/log.avdl
Код:
@KafkaListener(topics = "logs", groupId = "log-collector")
public void processLog(byte[] data) throws Exception {
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
SpecificDatumReader reader = new SpecificDatumReader<>(Log.class);
Log log = reader.read(null, decoder);
System.out.println("Log: " + log.getMessage());
}
3. Обработка событий с ретраями
Задача: Повторная обработка при сбоях.
Конфигурация:
apppetr:
kafka:
bootstrap-servers: kafka:9092
consumer:
topic: events
group-id: event-processor
retry-backoff-ms: 2000
max-retries: 5
avro-schema-path: src/main/resources/avro/event.avdl
Код:
@KafkaListener(topics = "events", groupId = "event-processor")
public void handleEvent(byte[] data) throws Exception {
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
SpecificDatumReader reader = new SpecificDatumReader<>(Event.class);
Event event = reader.read(null, decoder);
processEvent(event); // Логика с возможными исключениями
}
Бонус дочитавшим: памятка — шпаргалка по основным параметрам конфигурации kafka
Общие параметры
Параметр | Описание | Тип | По умолчанию |
---|---|---|---|
bootstrap.servers | Список адресов брокеров Kafka | String | localhost:9092 |
Параметры продюсера
Параметр | Описание | Тип | По умолчанию |
---|---|---|---|
key.serializer | Сериализатор ключа | Class | - |
value.serializer | Сериализатор значения | Class | - |
acks | Уровень подтверждений (0, 1, all) | String | 1 |
retries | Количество ретраев при сбоях | int | 0 |
retry.backoff.ms | Задержка между ретраями (мс) | int | 100 |
enable.idempotence | Включение идемпотентности | boolean | false |
max.in.flight.requests.per.connection | Макс. неподтверждённых запросов | int | 5 |
delivery.timeout.ms | Общий таймаут доставки (мс) | int | 120000 (2 мин) |
request.timeout.ms | Таймаут запроса к брокеру (мс) | int | 30000 (30 сек) |
buffer.memory | Размер буфера для отправки (байт) | long | 33554432 (32MB) |
batch.size | Размер батча для отправки (байт) | int | 16384 (16KB) |
linger.ms | Задержка перед отправкой батча (мс) | int | 0 |
compression.type | Тип сжатия (none, gzip, snappy, lz4, zstd) | String | none |
Параметры консюмера
Параметр | Описание | Тип | По умолчанию |
---|---|---|---|
key.deserializer | Десериализатор ключа | Class | - |
value.deserializer | Десериализатор значения | Class | - |
group.id | Идентификатор группы потребителей | String | null |
auto.offset.reset | Политика сброса оффсета (earliest, latest, none) | String | latest |
enable.auto.commit | Включение автокоммита оффсетов | boolean | true |
auto.commit.interval.ms | Интервал автокоммита (мс) | int | 5000 (5 сек) |
max.poll.records | Макс. записей за один poll | int | 500 |
max.poll.interval.ms | Макс. интервал между poll (мс) | int | 300000 (5 мин) |
session.timeout.ms | Таймаут сессии группы (мс) | int | 10000 (10 сек) |
heartbeat.interval.ms | Интервал heartbeat (мс) | int | 3000 (3 сек) |
fetch.max.bytes | Макс. размер выборки (байт) | int | 52428800 (50MB) |
fetch.min.bytes | Мин. размер выборки (байт) | int | 1 |
fetch.max.wait.ms | Макс. ожидание данных (мс) | int | 500 |
Полезные заметки
Идемпотентность: Для enable.idempotence=true требуется acks=all и retries > 0.
Производительность: Увеличьте batch.size и linger.ms для продюсера или max.poll.records для консюмера, чтобы повысить пропускную способность.
Надёжность: Используйте acks=all и высокий retries для продюсера, отключите enable.auto.commit для точного контроля оффсетов в консюмере.
Отладка: Логируйте group.id и проверяйте auto.offset.reset, если данные не читаются.
Заключение
Создание kafka-spring-boot-starter позволило мне упростить интеграцию Kafka в проекты на Spring Boot. Проблемы с Avro и тестами научили меня гибкости и важности синхронизации в асинхронных системах. Надеюсь, этот опыт вдохновит вас на создание собственных библиотек!
Вопрос к читателям: Какой функционал вы бы добавили в такой стартер? Делитесь идеями в комментариях!
Попробуйте внедрить стартер в свой проект! Какие задачи вы решаете с Kafka? Делитесь опытом в комментариях — обсудим, как улучшить этот подход!
Исходный код доступен на GitHub https://github.com/bigbox89/kafka-spring-boot-starter. Если у вас возникнут вопросы по настройке или тестированию, пишите — разберемся вместе!