Подводные камни тестирования Kafka Streams

u3ke2r_udshhsnworvg5tkpuvxu.jpeg

Kafka, в отличие от реляционных баз данных, является молодой технологией, и потому инструментарий для автоматического тестирования приложений, созданных на базе этой платформы, был доступен разработчикам с самого начала. Хотя на первый взгляд с этим инструментарием всё обстоит очень хорошо — бери и пиши тесты! — на практике приходится сталкиваться с трудностями, о которых хочу поведать в этом посте.


TopologyTestDriver

Основным инструментом тестирования Kafka Streams является TopologyTestDriver. Его API подвергался значительным усовершенствованиям и к версии 2.4 стал очень удобным и приятным в использовании. Тест с его использованием выглядит словно обычный модульный тест для функции:


  • определяем входные данные;
  • подаём их в моки входных топиков (TestInputTopic);
  • читаем то, что было отправлено топологией, из моков выходных топиков (TestOutputTopic);
  • выполняем проверку полученного результата, сравнивая его с ожидаемым.

Для создания TopologyTestDriver нужен экземпляр топологии (это то, что мы тестируем) и конфигурация в виде экземпляра класса Properties:

TopologyTestDriver topologyTestDriver = 
       new TopologyTestDriver(topology, config.asProperties());

В наборе этих свойств учитываются вещи типа сериализаторов по умолчанию, и не учитываются, за ненадобностью, параметры подключения к Kafka-кластеру. Для создания «входных» тестовых топиков нужны сериализаторы ключа и значения, а для «выходных» — десериализаторы:

TestInputTopic inputTopic =
   topologyTestDriver.createInputTopic(
         INPUT_TOPIC, new StringSerializer(), new StringSerializer());
TestOutputTopic outputTopic =
   topologyTestDriver.createOutputTopic(
         OUTPUT_TOPIC, new StringDeserializer(), new StringDeserializer());

Классы TestInputTopic и TestOutputTopic содержат наборы методов, позволяющих удобно отправлять / читать по одиночке и пачками данные в виде, наиболее релевантном для решаемой задачи.

Отправить или прочитать можно пару ключ/значение или одно лишь значение, с явным указанием штампа времени или без него, а если существенной для тестируемого приложения является информация в заголовках, то можно работать с Record-ами. Возможность задавать штамп времени позволяет писать моментально срабатывающие тесты на логику работы time windows, не используя паузы для ожидания.

При этом TopologyTestDriver предоставляет немаловажную для написания проверок возможность гарантированной вычитки всех данных, произведённых топологией при помощи readKeyValuesToList() или readRecordsToList(). Результат, возвращаемый этими методами, можно интерпретировать как полный результат обработки входных данных, чего не хватает в других методах тестирования и о чём речь ещё впереди.

Другая важная возможность TopologyTestDriver — доступ к key-value-хранилищам:

KeyValueStore store = topologyTestDriver.getKeyValueStore(STORE_NAME);

Это даёт возможность прочитать состояние локальных хранилищ и выполнить не только «black box»-, но и «white box»-тестирование топологии.

При своей работе TopologyTestDriver сохраняет состояние на жёстком диске, поэтому во избежание конфликтов с другими тестами и проблем с повторным запуском тестов его необходимо не забывать закрывать вызовом метода .close() в tearDown-методе тестового класса. (Но если вы запускаете тесты в Windows, то приготовьтесь к тому, что файлы иногда не будут стираться из-за KAFKA-6647 от марта 2018 года, который наконец-то обещают исправить в версии 2.5.1. До этого исправления под Windows зачастую требуется зачистка файлов в папке C:\tmp\kafka-streams\ перед запуском тестов.)

Тесты на TopologyTestDriver пишутся легко и выполняются стремительно. Так как с его помощью тестируется контур от десериализации входного сообщения до сериализации выходного включительно, то проблемы, связанные с сериализацией/десериализацией, обнаруживаются в этих тестах автоматически, помимо любых проверок выходных данных. На мой взгляд, разработку любой функциональности на Kafka Streams API следует начинать с теста на TopologyTestDriver. Но, к сожалению, TopologyTestDriver имеет свои ограничения, и за ними скрываются те самые подводные камни.


TopologyTestDriver может быть слеп к дефектам

На практике мне доводилось встречаться со случаями, когда поведение топологии, протестированное с помощью TopologyTestDriver, отличалось от поведения в настоящем кластере.

Рассмотрим пример, который можно просто воспроизвести.

Возьмём задачу дедупликации, т. е. выделения уникальных (в некотором смысле) записей в потоке. Иными словами, нам необходимо реализовать аналог метода distinct() в Java Streams API.

Задача эта довольно распространённая, и я убеждён, что сам метод distinct() для временных окон должен появиться в интерфейсе KStream со временем, но, пока его там нет, приходится реализовывать самостоятельно, наступая на грабли.

Первое желание — обойтись без сложностей типа использования key-value-хранилищ и Processor API, построив нужное поведение на высокоуровневом DSL. Вот попытка (сразу скажу, что неверная, не пытайтесь использовать этот код в production):

final String STOP_VALUE = "";
KStream input =
    streamsBuilder.stream(INPUT_TOPIC_WRONG, Consumed.with(Serdes.String(), Serdes.String()));

input.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
  .reduce((first, second) -> STOP_VALUE)
  .toStream()
  .filter((key, value) -> !STOP_VALUE.equals(value))
  .to(OUTPUT_TOPIC_WRONG, Produced.with(Serdes.String(), Serdes.String()));

Наша задача — получить на выходе поток «дедуплицированных» ключей: ключ, попадающийся в первый раз, должен со своим значением передаваться в результирующий топик и отфильтровываться во все последующие разы.

Остроумный (хотя и неверный) подход здесь заключается в том, что мы можем методом reduce свести всякое повторное появление ключа к значению STOP_VALUE. При этом ключ, появившийся в самый первый раз, редьюсить не с чем, и он пройдёт дальше по конвейеру без изменений. В принципе, ход мыслей правильный, и в Java Streams API подобный код будет вполне рабочим. А Kafka Streams API — это почти как Java Streams API, верно?

Напишем тест для данного кода:

inputTopic.pipeKeyValueList(Arrays.asList(
     KeyValue.pair("A", "A"),
     KeyValue.pair("B", "B"),
     KeyValue.pair("B", "B"),
     KeyValue.pair("A", "A"),
     KeyValue.pair("C", "C")
));
List expected = Arrays.asList("A", "B", "C");
List actual = outputTopic.readValuesToList();
assertEquals(expected, actual);

Тест зелёный! Можно в прод?!

К сожалению, в проде мы обнаружим, что многие (если не все) значения будут отсутствовать в выходном топике. Причиной этому — механизмы кэширования, описанные, например, здесь.

Хотя reduce в конце концов (eventually) должен передавать в таблицу результат агрегации, он вовсе не обязан делать это для всякого промежуточного результата. В итоге, первое появление записи с уникальным ключом (без значения STOP_VALUE) будет «проглочено» кэшем. Когда появление следующих записей приведёт к передаче сообщений далее по конвейеру, все они будут уже иметь значение STOP_VALUE и не будут пропущены фильтром.

Так как кэширование не эмулируется TopologyTestDriver-ом, показанный выше тест будет выполняться всегда и создавать ложную иллюзию корректности кода.

Правильное решение задачи дедупликации с помощью Kafka Streams несколько сложнее и описано на developer.confluent.io.

Оба решения (и верное, и неверное) проходят тест на базе TopologyTestDriver, и, чтобы отделить одно от другого, мы уже вынуждены задействовать в тестах настоящую Kafka.


EmbeddedKafka и TestContainers

Для использования «настоящей» Кафки в тестах существуют две распространённые альтернативы: EmbeddedKafka и TestContainers.

EmbeddedKafka, в полном соответствии с названием, запускает Kafka-брокера в том же Java-процессе, где выполняются тесты.

TestContainers — это Java-библиотека, запускающая сервисы баз данных, браузеров и вообще всего что угодно внутри Docker-контейнеров.

C обеими этими альтернативами работа производится по схожим принципам. Необходимо создать объект, связанный с Kafka-брокером (embedded или контейнеризованным), получить из него адрес для подключения и передать этот адрес в параметры приложения. В Spring Boot за подключение к Kafka отвечает параметр spring.kafka.bootstrap-servers. В дальнейшем систему можно протестировать, передавая в Kafka-топики данные и забирая данные из выходных топиков.

Таким образом, функционально EmbeddedKafka и TestContainers ничем не отличаются. Однако в среде разработчиков бытует мнение, что EmbeddedKafka ведёт себя нестабильно и требует затаскивания слишком тяжелых зависимостей (включая Scala) в проект. Я на своих проектах использую TestContainers потому, что так «исторически сложилось»: у меня есть положительный опыт использования TestContainers не только для тестирования Kafka, но и в других проектах, с реляционными базами данных. От использования TestContainers за прошедшие несколько лет у меня сложились самые позитивные впечатления. Практики использования EmbeddedKafka в серьёзных тестах у меня нет, и поэтому я не могу ни подтвердить, ни опровергнуть оценку её стабильности (буду рад комментариям к посту на эту тему).

В примере на GitHub к этой статье я использую обе этих технологии, и обе они показывают одинаковые результаты.

«Магия Spring» позволяет подключить Kafka-брокер к тестируемой системе довольно изящно:

@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
public class TestTopologyKafkaEmbedded {
   /*В момент запуска теста объект KafkaStreams
     будет сконфигурирован на использование подключения к EmbeddedKafka*/

Но не столь изящно будет выглядеть наш тест!

Дело в том, что всё, что мы имеем теперь — это возможность отправлять и принимать из Kafka сообщения, наблюдая за результатом работы тестируемой системы. Но сама тестируемая система теперь работает полностью асинхронно с нашим тестом. Иными словами, мы не можем различить ситуацию, при которой мы не имеем сообщений в топике потому, что система работает долго и ещё не выдала их, и ситуацию, при которой система уже завершила обработку и новых сообщений просто не предвидится. Отсюда происходит необходимость задавать таймауты в консьюмерах, что приводит к увеличению времени работы теста. И как всякий таймаут, он не может нам гаранировать ничего.

Вот тест, которым мы проверяем реализацию алгоритма дедупликации на Kafka-брокере:

try (Consumer consumer = 
          configureConsumer(bootstrapServers, outputTopicName);
      Producer producer = 
          configureProducer(bootstrapServers)) {
   producer.send(new ProducerRecord<>(inputTopicName, "A", "A"));
   producer.send(new ProducerRecord<>(inputTopicName, "B", "B"));
   producer.send(new ProducerRecord<>(inputTopicName, "B", "B"));
   producer.send(new ProducerRecord<>(inputTopicName, "A", "A"));
   producer.send(new ProducerRecord<>(inputTopicName, "C", "C"));
   producer.flush();
   List actual = new ArrayList<>();

   while (true) {
       ConsumerRecords records = KafkaTestUtils.getRecords(consumer, 5000);
       if (records.isEmpty()) break;
          for (ConsumerRecord rec : records) {
             actual.add(rec.value());
          }
       }
       List expected = Arrays.asList("A", "B", "C");
       Collections.sort(actual);
       assertEquals(expected, actual);
   }

Запустив все тесты в проекте, получаем вот такую картинку: TopologyTestDriver, радостно и быстро принимающий как правильную, так и неправильную реализации метода дедупликации, и EmbeddedKafka и TestContainers, работающие долго (из-за таймаута в консьюмере), но оба показывающие корректный результат:

4mgf7v4vmzxgqrqyjshi4ronglg.png

Следует добавить, что рассмотренный пример является не единственным встретившимся мне на практике случаем несовпадения результатов тестирования на TopologyTestDriver и поведения системы в реальном кластере.


Выводы


  • TopologyTestDriver — удобный и обязательный к использованию инструмент для создания приложений на KafkaStreams. Тем не менее, важно учитывать, что «зелёные» тесты на TopologyTestDriver являются, говоря языком математики, необходимым, но не достаточным условием корректности программы.
  • При написании тестов для EmbeddedKafka или TestContainers открытой остаётся проблема неразличимости случаев, когда данные не выводятся и когда данные продуцируются слишком долго. Необходимость задания таймаутов увеличивает время тестов и порождает их flakiness.

Общий итог — инструментарий тестирования Kafka Streams-приложений нам ещё предстоит улучшать.

Исходный код примеров для статьи находится здесь.


Про тестирование Kafka хорошо рассказал Сергей Махетов, который будет делать воркшоп на Heisenbug 2020 Piter. Сам я буду выступать с докладом «Apache Kafka: Что это и как она изменит архитектуру вашего приложения» на HolyJS 2020 Piter.

© Habrahabr.ru