Как провести unit-тестирование Flink-операторов: TestHarness20.03.2024 14:30
Привет всем, на связи снова Александр Бобряков, техлид в команде МТС Аналитики. Продолжаем цикл статей про фреймворк Apache Flink.
Напомню, в предыдущих частях я рассказывал про построение пайплайна Kafka-to-Kafka с промежуточным разделением потока и дедупликацией событий. Также в предыдущей статье я рассказал, как можно динамически определить выходной Kafka-топик для каждого отправляемого события.
Начиная с этой статьи начнём разбирать, как тестировать всё наше приложение Flink + Spring. Многие описанные подходы вполне применимы и в любом другом обычном Spring-приложении, поэтому, надеюсь, вы найдёте для себя что-то новое.
В данной статье мы рассмотрим, как протестировать stateless- и stateful-операторы Flink с помощью абстракций TestHarness.
Список моих постов про Flink
Весь разбираемый исходный код можно найти в репозитории AlexanderBobryakov/flink-spring. В master-ветке представлен итоговый проект по всей серии статей. Данная статья соответствует релизной ветке с названием release/4_testharness_deduplicator_test.
Оглавление статьи
Структура тестов
Небольшой спойлер: к концу статьи мы получим вот такую структуру тестов:
Мы добавим первые тесты на stateless-фильтр, stateful-дедупликатор и на stateless-сплитератор (учитывающий внутренние Flink-абстракции).
Абстракция EntityTestBuilder
В нашем приложении уже есть несколько бизнес-DTO, например ClickMessage и ProductMessage, и будут добавляться новые. В самих тестах мы должны их создавать для воспроизведения отдельных сценариев. Но подумаем об удобстве: определим единое место, в котором будем создавать каждое DTO. Ведь в сценариях много полей, а определённые значения могут иметь бизнес-ценность.
Для этого создадим в тестовой директории следующий интерфейс:
public interface EntityTestBuilder {
T build();
}
Для тестов у него также будут реализации под каждую нашу DTO. Давайте рассмотрим пример для ClickMessage:
В этой реализации мы в одном месте определяем все дефолтные значения полей, а также статический метод-билдер, который будем вызывать в тестах для построения объекта ClickMessage. При этом аннотации lombok позволяют переопределять дефолтные значения с помощью методов withXXX (…), например:
aClickMessage().withPlatform(APP).build()
Тестирование stateless Flink-операторов
Начнём с самых простых тестов на stateless-операторы. Напомню, что они не используют внутренние состояния. Им неважно, какие события они обрабатывали ранее — каждое событие полностью независимо от других. Такой оператор выглядит как обычный Java-класс, и в его тестировании нет ничего сложного.
Давайте рассмотрим наш фильтр, пропускающий события с определённым значением поля platform:
public class ClickMessageWithPlatformFilter implements FilterFunction {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(ClickMessage message) {
return Optional.ofNullable(message.getPlatform())
.map(platform -> WEB.equals(platform) || APP.equals(platform))
.orElse(false);
}
}
Протестировать его можно с помощью JUnit 5 с использованием аннотации @ParameterizedTest. Эта аннотация позволяет указывать тесту аргументы, которые поступают посредством второй Source-аннотации (@MethodSource, @ValueSource, @CsvSource…):
class ClickMessageWithPlatformFilterUnitTest {
@ParameterizedTest
@MethodSource("clickMessagesByPlatform")
void shouldFilterMessageByPlatform(boolean expectedFilter, ClickMessage message) {
final var filter = new ClickMessageWithPlatformFilter();
final var filterResult = filter.filter(message);
assertEquals(expectedFilter, filterResult, format("Unexpected filter result for message: %s", message));
}
private static Stream clickMessagesByPlatform() {
return Stream.of(
arguments(true, aClickMessage().withPlatform(APP).build()),
arguments(true, aClickMessage().withPlatform(WEB).build()),
arguments(false, aClickMessage().withPlatform(Platform.of("unknown")).build()),
arguments(false, aClickMessage().withPlatform(Platform.of("")).build()),
arguments(false, aClickMessage().withPlatform(null).build())
);
}
}
в статическом методе clickMessagesByPlatform определяем перечень всех комбинаций входных параметров для теста .boolean expectedFilter — ожидаемый результат вызова фильтра true/false
ClickMessage message — само сообщение для фильтрации (обратите внимание на удобство создания объектов этого класса)
Тест будет запускаться 5 раз. В самом тесте создаём объект фильтра и вызываем метод filter для дальнейшего сравнения с ожидаемым результатом.
Так мы можем протестировать классы для методов map, flatMap, process и т. д. Конечно, иногда может понадобиться замокать какой-нибудь объект с помощью Mockito, но это ничем не отличается от стандартного тестирования.
Тестирование stateful Flink-операторов
Теперь перейдём к тестированию stateful-операторов. Они тесно связаны со средой выполнения Flink, потому что могут содержать внутреннее состояние, управляемое Flink, а также какие-нибудь таймеры и т. д. В этом случае подход, который я описал выше, не подойдёт. На этот раз нам нужны более умные абстракции, которые могут выполнить логику управления Flink. Такие абстракции есть в документации к тестированию.
Также в документации предлагается использовать так называемые TestHarness-классы. Для этого мы настроим дополнительные зависимости:
Первая зависимость даёт нам доступ ко всем описанным в документации TestHarness-классам, а вторая — ко многим удобным абстракциям в виде Flink MiniCluster, TestEnvironment и т. д.
Тестирование с помощью TestHarness-классов
TestHarness — это классы для тестирования операторов, основанные на использовании внутренней логики Flink: watermark, состояния, таймеры и т. д.
Сейчас объясню на примере обычного stateless-оператора. Рассмотрим тест над нашим написанным ранее сплитератором входных событий по нескольким побочным выходным потокам (по OutputTag) на основе предикатов:
@RequiredArgsConstructor
public class StreamSpliterator extends ProcessFunction {
private static final long serialVersionUID = 1L;
private final Map, SerializablePredicate> predicatesByTags;
private final OutputTag defaultTag;
@Override
public void processElement(I value, ProcessFunction.Context ctx, Collector