[Из песочницы] Apache Kafka и тестирование с Kafka Server

?v=1

Существуют различные способы для написания тестов с использованием Apache Kafka. К примеру, можно использовать TestContainers и EmbeddedKafka. Об этом можно почитать, к примеру, вот здесь: Подводные камни тестирования Kafka Streams. Но существует и вариант для написания тестов с использованием KafkaServer.

Предположим, необходимо разработать сервис отправки сообщений по различным каналам: email, telegram и т.п.

Пусть имя сервиса будет: SenderService.

Сервис должен: слушать заданный канал, выделять из канала нужные ему сообщения, разбирать сообщения и отправлять по нужному каналу для конечной доставки сообщений.

Для проверки сервиса необходимо сформировать сообщение для отправки с использованием канала отправки почты и убедиться в том, что сообщение было передано в конечный канал.
Конечно, в реальных приложениях тесты будут сложнее. Но для иллюстрации выбранного подхода, такого теста будет достаточно.

Сервис и тест реализованы с использованием: Java 1.8, Kafka 2.1.0, JUnit 5.5.2, Maven 3.6.1.

Сервис будет иметь возможность начать работу и остановить свою работу.

void start()

void stop()

При старте необходимо задать, как минимум, следующие параметры:

String bootstrapServers
String senderTopic
EmailService emailService

bootstrapServers — адрес kafka.
senderTopic — топик, из которого будут считываться сообщения.
emailService — сервис для конечной отправки сообщений по почте.

В реальном сервисе таких конечных сервисов будет столько же сколько и конечных каналов отправки сообщений.

Теперь необходим «потребитель», который слушает канал, фильтрует и отправляет сообщения в конечные каналы. Количество таких «потребителей» можно выбирать. Подход для написания «потребителя» описан вот здесь: Introducing the Kafka Consumer: Getting Started with the New Apache Kafka 0.9 Consumer Client.

Collection closeables = new ArrayList<>();
ExecutorService senderTasksExecutor = Executors.newFixedThreadPool(senderTasksN);
ExecutorService tasksExecutorService = Executors.newFixedThreadPool(tasksN);
for (int i = 0; i < senderTasksN; i++) {
    SenderConsumerLoop senderConsumerLoop =
            new SenderConsumerLoop(
                    bootstrapServers,
                    senderTopic,
                    "sender",
                    "sender",
                    tasksExecutorService,
                    emailService
            );
    closeables.add(senderConsumerLoop);
    senderTasksExecutor.submit(senderConsumerLoop);
}

В цикле создается экземпляр «потребителя», запоминается в коллекции и запускается через сервис запуска задач.

При выполнении этого кода «потребители» начинают работать. Сервис ждет их завершения или сигнала для остановки.

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    for (AutoCloseable autoCloseable : closeables) {
        try {
            autoCloseable.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    senderTasksExecutor.shutdown();
    tasksExecutorService.shutdown();
    stop();
    try {
        senderTasksExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}));

При завершении необходимо освободить ресурсы.

«Потребитель» имеет следующие публичные методы:

void run()

void close()

Основной метод: run.

@Override
public void run() {
    kafkaConsumer = createKafkaConsumerStringString(bootstrapServers, clientId, groupId);
    kafkaConsumer.subscribe(Collections.singleton(topic));
    while (true) {
        calculate(kafkaConsumer.poll(Duration.ofSeconds(1)));
    }
}

По входным параметрам создается экземпляр «kafka-потребителя». «kafka-потребитель» подписывается на заданный топик. В бесконечном цикле выбираются записи из топика. И отправляются на обработку.

Для иллюстрации json-сообщения будут иметь несколько полей, которые будут задавать и тип сообщения, и данные для отправки.

Пример сообщения:

{
  "subject": {
    "subject_type": "send"
  },
  "body": {
    "method": "email",
    "recipients": "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml",
    "title": "42",
    "message": "73"
  }
}

subject_type — тип сообщения. Для сервиса нужно значение «send».
method — тип конечного сервиса для отправки. «email» — отправка через почту.
recipients — список получателей.
title — заголовок для сообщения.
message — сообщение.

Обработка всех записей:

void calculate(ConsumerRecords records) {
    for (ConsumerRecord record : records) {
        calculate(record);
    }
}

Обработка одной записи:

void calculate(ConsumerRecord record) {
            JSONParser jsonParser = new JSONParser();
            Object parsedObject = null;
            try {
                parsedObject = jsonParser.parse(record.value());
            } catch (ParseException e) {
                e.printStackTrace();
            }
            if (parsedObject instanceof JSONObject) {
                JSONObject jsonObject = (JSONObject) parsedObject;
                JSONObject jsonSubject = (JSONObject) jsonObject.get(SUBJECT);
                String subjectType = jsonSubject.get(SUBJECT_TYPE).toString();
                if (SEND.equals(subjectType)) {
                    JSONObject jsonBody = (JSONObject) jsonObject.get(BODY);
                    calculate(jsonBody);
                }
            }
        }

Распределение сообщений по типу:

void calculate(JSONObject jsonBody) {
    String method = jsonBody.get(METHOD).toString();
    if (EMAIL_METHOD.equals(method)) {
        String recipients = jsonBody.get(RECIPIENTS).toString();
        String title = jsonBody.get(TITLE).toString();
        String message = jsonBody.get(MESSAGE).toString();
        sendEmail(recipients, title, message);
    }
}

Отправка в конечную систему:

void sendEmail(String recipients, String title, String message) {
    tasksExecutorService.submit(() -> emailService.send(recipients, title, message));
}

Отправка сообщений происходит через сервис исполнения задач.

Ожидания завершения отправки не происходит.

Создание «kafka-потребителя»:

static KafkaConsumer createKafkaConsumerStringString(
        String bootstrapServers,
        String clientId,
        String groupId
) {
    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    properties.setProperty(
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty(
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return new KafkaConsumer<>(properties);
}

Интерфейс для писем:

interface EmailService {
    void send(String recipients, String title, String message);
}

Для теста понадобиться следующее.
Адрес «kafka-сервера».
Порт для «kafka-сервера».
Имя топика.

Сервис для управления «kafka-сервером». Будет описан ниже.

public class SenderServiceTest {
    @Test
    void consumeEmail() throws InterruptedException {
        String brokerHost = "127.0.0.1";
        int brokerPort = 29092;
        String bootstrapServers = brokerHost + ":" + brokerPort;
        String senderTopic = "sender_data";
        try (KafkaServerService kafkaServerService = new KafkaServerService(brokerHost, brokerPort)) {
            kafkaServerService.start();
            kafkaServerService.createTopic(senderTopic);

        }
    }
}

Задаются параметры. Создается сервис для управления «kafka-сервером». «kafka-сервером» стартует. Создается необходимый топик.

Создается «mock» конечного сервиса для отправки сообщений:

SenderService.EmailService emailService = mock(SenderService.EmailService.class);

Создается сам сервис и стартует:

SenderService senderService = new SenderService(bootstrapServers, senderTopic, emailService);
senderService.start();

Задаются параметры для сообщения:

String recipients = "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml";
String title = "42";
String message = "73";

Отправляется сообщение в канал:

kafkaServerService.send(senderTopic, key(), createMessage(EMAIL_METHOD, recipients, title, message));

Ожидание:

Thread.sleep(6000);

Проверка, что сообщение дошло до конечного сервиса:

verify(emailService).send(recipients, title, message);

Остановка:

senderService.stop();

Все вместе:

public class SenderServiceTest {
    @Test
    void consumeEmail() throws InterruptedException {
        String brokerHost = "127.0.0.1";
        int brokerPort = 29092;
        String bootstrapServers = brokerHost + ":" + brokerPort;
        String senderTopic = "sender_data";
        try (KafkaServerService kafkaServerService = new KafkaServerService(brokerHost, brokerPort)) {
            kafkaServerService.start();
            kafkaServerService.createTopic(senderTopic);
            SenderService.EmailService emailService = mock(SenderService.EmailService.class);
            SenderService senderService = new SenderService(bootstrapServers, senderTopic, emailService);
            senderService.start();
            String recipients = "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml";
            String title = "42";
            String message = "73";
            kafkaServerService.send(senderTopic, key(), createMessage(EMAIL_METHOD, recipients, title, message));
            Thread.sleep(6000);
            verify(emailService).send(recipients, title, message);
            senderService.stop();
        }
    }
}

Вспомогательный код:

public class SenderFactory {
    public static final String SUBJECT = "subject";
    public static final String SUBJECT_TYPE = "subject_type";
    public static final String BODY = "body";
    public static final String METHOD = "method";
    public static final String EMAIL_METHOD = "email";
    public static final String RECIPIENTS = "recipients";
    public static final String TITLE = "title";
    public static final String MESSAGE = "message";
    public static final String SEND = "send";

    public static String key() {
        return UUID.randomUUID().toString();
    }

    public static String createMessage(String method, String recipients, String title, String message) {
        Map map = new HashMap<>();
        Map subject = new HashMap<>();
        Map body = new HashMap<>();
        map.put(SUBJECT, subject);
        subject.put(SUBJECT_TYPE, SEND);
        map.put(BODY, body);
        body.put(METHOD, method);
        body.put(RECIPIENTS, recipients);
        body.put(TITLE, title);
        body.put(MESSAGE, message);
        return JSONObject.toJSONString(map);
    }
}

Основные методы:

void start()

void close()

void createTopic(String topic)

В методе «start» происходит создание сервера и вспомогательных объектов.

Создание «zookeeper» и сохранение его адреса:

zkServer = new EmbeddedZookeeper();
String zkConnect = zkHost + ":" + zkServer.port();

Создание клиента «zookeeper»:

zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
zkUtils = ZkUtils.apply(zkClient, false);

Задание свойств для сервера:

Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.setProperty("broker.id", "0");
try {
    brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
} catch (IOException e) {
    throw new RuntimeException(e);
}
brokerProps.setProperty("listeners", "PLAINTEXT://" + brokerHost + ":" + brokerPort);
brokerProps.setProperty("offsets.topic.replication.factor", "1");
KafkaConfig config = new KafkaConfig(brokerProps);

Создание сервера:

kafkaServer = TestUtils.createServer(config, new MockTime());

Все вместе:

public void start() {
    zkServer = new EmbeddedZookeeper();
    String zkConnect = zkHost + ":" + zkServer.port();
    zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
    zkUtils = ZkUtils.apply(zkClient, false);
    Properties brokerProps = new Properties();
    brokerProps.setProperty("zookeeper.connect", zkConnect);
    brokerProps.setProperty("broker.id", "0");
    try {
        brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
    brokerProps.setProperty("listeners", "PLAINTEXT://" + brokerHost + ":" + brokerPort);
    brokerProps.setProperty("offsets.topic.replication.factor", "1");
    KafkaConfig config = new KafkaConfig(brokerProps);
    kafkaServer = TestUtils.createServer(config, new MockTime());
}

Остановка сервиса:

@Override
public void close() {
    kafkaServer.shutdown();
    zkClient.close();
    zkServer.shutdown();
}

Создание топика:

public void createTopic(String topic) {
    AdminUtils.createTopic(
            zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
}

В заключении нужно отметить, что приведенный здесь код лишь иллюстрирует выбранный способ.

Для создания и тестирования сервисов с использованием «kafka» можно обратиться к следующему ресурсу:
kafka-streams-examples

Исходный код

Код для тестирования с «kafka-сервером»

© Habrahabr.ru