Unit-тестирование Flink-операторов, Job: Flink MiniCluster

Привет, Хабр! С вами вновь Александр Бобряков, техлид в команде МТС Аналитики. И я с очередной статьёй из цикла про фреймворк Apache Flink.

В предыдущей части я рассказал, как тестировать stateless- и stateful-операторы Flink с использованием вспомогательных TestHarness-абстракций, предоставляемых Flink.

В этой статье напишем тесты на всю джобу с использованием мини-кластера Flink и при помощи JUnit Extension. Ещё мы начнём выделять удобные вспомогательные абстракции для тестов, которые понадобятся позже.

eafb72b880140877f06a701028232ceb.jpgСписок моих постов про Flink

Весь разбираемый исходный код можно найти в репозитории AlexanderBobryakov/flink-spring. В master-ветке представлен итоговый проект по всей серии статей. Эта статья соответствует релизной ветке с названием release/5_flinkcluster_job_deduplicator_test.

Оглавление статьи

Flink MiniCluster

В документации по написанию тестов предлагается использовать абстракцию Flink мини-кластера MiniClusterWithClientResource для локального тестирования полноценных заданий. Это обусловлено тем, что мы не сможем полностью воспроизвести работу Flink на обычных Unit-тестах (даже с помощью TestHarness), учитывая параллельность и другие внутренние процессы. Но мини-кластер такую возможность даёт.

После старта мини-кластера универсальный метод для определения окружения StreamExecutionEnvironment.getExecutionEnvironment () автоматически подключится к мини-кластеру, и все наши задания будут выполняться на нём. Документация предлагает такой сценарий использования:

public class ExampleIntegrationTest {

    @ClassRule
    public static MiniClusterWithClientResource flinkCluster =
            new MiniClusterWithClientResource(
                    new MiniClusterResourceConfiguration.Builder()
                            .setNumberSlotsPerTaskManager(2)
                            .setNumberTaskManagers(1)
                            .build());

    @Test
    public void someТest() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // ...
    }
}

Но в этом случае нужно создавать @ClassRule на каждый тестовый класс либо использовать наследование в тестах. Я предлагаю немного другой, более удобный способ.

Итак, какие у нас условия задачи?

Во-первых, хотелось бы не тянуть лишних зависимостей в тесты с Flink.

Во-вторых, у нас должна быть возможность включать мини-кластер в любом тесте максимально просто и, главное, единожды, чтобы кластер поднимался перед всеми тестовыми классами и убивался после всех тестов.

Какое решение? На ум приходит использование своей аннотации @WithFlinkCluster, которая будет предоставлять Flink мини-кластер для класса-теста, над которым она висит. Давайте посмотрим на саму реализацию:

@Retention(RUNTIME)
@Inherited
@ExtendWith({FlinkClusterExtension.class})
public @interface WithFlinkCluster {
}

Ничего особенного в ней нет. Основная фишка внутри FlinkClusterExtension. Это JUnit Extension. Если кратко, то они нужны для изменения поведения тестов с помощью событий их жизненного цикла. Реализация моего FlinkClusterExtension выглядит вот так:

@Slf4j
@SuppressWarnings({"PMD.AvoidUsingVolatile"})
public class FlinkClusterExtension implements BeforeAllCallback, ExtensionContext.Store.CloseableResource {
   private static final MiniClusterWithClientResource FLINK_CLUSTER;
   private static final Lock LOCK = new ReentrantLock();
   private static volatile boolean started;

   static {
       final var configuration = new Configuration();
       configuration.set(CoreOptions.DEFAULT_PARALLELISM, 2);
       FLINK_CLUSTER = new MiniClusterWithClientResource(
           new MiniClusterResourceConfiguration.Builder()
               .setConfiguration(configuration)
               .setNumberSlotsPerTaskManager(2)
               .setNumberTaskManagers(1)
               .build());
   }

   @Override
   public void beforeAll(ExtensionContext context) throws Exception {
       LOCK.lock();
       try {
           if (!started) {
               log.info("Start Flink MiniCluster");
               started = true;
               FLINK_CLUSTER.before();
               context.getRoot().getStore(GLOBAL).put("Flink Cluster", this);
           }
       } finally {
           LOCK.unlock();
       }
   }

   @Override
   public void close() {
       log.info("Close Flink MiniCluster");
       FLINK_CLUSTER.after();
       started = false;
   }
}

Обратите внимание на имплементацию двух интерфейсов: BeforeAllCallback, ExtensionContext.Store.CloseableResource. Первый предоставляет метод beforeAll перед стартом всех тестов внутри каждого тестового класса, у которых висит аннотация @ExtendWith ({FlinkClusterExtension.class}). А второй — коллбэк на закрытие ресурсов уже после отработки всех тестовых классов. В статическом блоке инициализируем Flink мини-кластер, передав ему различные настройки:

  • количество слотов

  • TaskManager

  • стандартный параллелизм — его лучше по умолчанию выставить >1, чтобы отловить неочевидные баги в ваших сценариях

В методе beforeAll выполняется непосредственный запуск кластера через его метод before (). В реализации FlinkClusterExtension присутствуют синхронизации в виде блокировки через объект ReentrantLock, чтобы избежать повторного запуска в случае параллельно запускаемых тестов. Метод close завершает работу мини-кластера единожды после выполнения всех тестов, которые используют текущий JUnit Extension.

В итоге каждый тест, где нужен будет кластер Flink, можно писать так:

@WithFlinkCluster
class DeduplicatorUnitTest_byFlinkCluster {
   @Test
    void test()  {
        final var env = StreamExecutionEnvironment.getExecutionEnvironment();
        // ...
    }
}

Абстракция для Sink в тестировании через Flink MiniCluster

Прежде чем переходить непосредственно к написанию теста, нужно подумать:, а как мы будем проверять наши задания? Ожидаемый результат выполнения задания или оператора — наличие событий на его выходе.

Проблема в том, что каждый оператор имеет свою параллельность. Благодаря ей операторы сериализуются на каждый TaskManager в количестве своей параллельности (если в каждом TM существует единственный слот). Каждый из таких параллельных операторов может писать в свой параллельный экземпляр Sink-оператора. А нам было бы удобно собрать их воедино.

Об этом также упоминается в конце документации в блоке замечаний, где авторы предлагают создать свой CollectSink. Они используют статическую коллекцию, нам такой вариант не очень подходит — тесты могут выполняться параллельно и независимо, а создавать отдельный класс со статической коллекцией неудобно, ведь доступ к единственной статической коллекции будет из всех классов одновременно. Это может привести к тому, что тесты будут влиять друг на друга.

В качестве решения в исходниках Flink можно обнаружить синглтон-класс org.apache.flink.test.streaming.runtime.util.TestListWrapper, который предлагает более подходящий вариант:

private List> lists;

Далее в каждом отдельном тесте при инициализации TestListWrapper создаётся внутренний List в объекте lists выше. ID этого листа возвращается пользователю, а дальше можно написать свой Writer, который будет писать именно в этот List, запрашивая его по полученному id у самого TestListWrapper. Звучит непонятно, поэтому предоставлю код идеи:

@SuppressWarnings("PMD.TestClassWithoutTestCases")
public class TestListSink implements Sink {
   private static final long serialVersionUID = 1L;

   private final ListWriter writer = new ListWriter();
   private final int resultListId;

   public TestListSink() {
       this.resultListId = TestListWrapper.getInstance().createList();
   }

   @Override
   public SinkWriter createWriter(InitContext context) {
       return writer;
   }

   public List getHandledEvents() {
       return new ArrayList<>(resultList());
   }

   @SuppressWarnings("unchecked")
   private List resultList() {
       synchronized (TestListWrapper.getInstance()) {
           return (List) TestListWrapper.getInstance().getList(resultListId);
       }
   }

   private class ListWriter implements SinkWriter, Serializable {
       private static final long serialVersionUID = 1L;

       @Override
       public void write(T element, Context context) {
           resultList().add(element);
       }

       @Override
       public void flush(boolean endOfInput) {
           // no op
       }

       @Override
       public void close() {
           // no op
       }
   }
}

При создании моего TestListSink (в каждом отдельном тесте) инициализируется новый List и запоминается его id: TestListWrapper.getInstance ().createList (). Также у нас есть своя реализация SinkWriter, которая при получении события в методе write записывает его в один и тот же List по id листа. Так в случае большой параллельности выходных операторов мы получим единый List, в который будет писать каждый параллельный экземпляр выходного оператора. Также нам полезно определить вспомогательный метод getHandledEvents, который вернёт все записанные события всех параллельных экземпляров sink-оператора после выполнения теста.

Тестирование дедупликатора с помощью Flink MiniCluster

В прошлой статье мы написали тест на дедупликатор с помощью абстракций TestHarness. В качестве дополнительного примера можно переписать тот же тест с использованием мини-кластера. Для этого мы:

  1. На новый тестовый класс повесим нашу новую аннотацию.

  2. Немного перепишем сам тест, определив непосредственно минимальный пайплайн обработки с использованием дедупликатора.

Сделаем это в новом тестовом классе:

@WithFlinkCluster
public class DeduplicatorUnitTest_byFlinkCluster {
   private final Time TTL = Time.milliseconds(100);

   @SneakyThrows
   @ParameterizedTest
   @MethodSource("provideUniqueEvents")
   void shouldDeduplicateMessagesByTtl(List events) {
       final var sourceEvents = new ArrayList();
       sourceEvents.addAll(events);
       sourceEvents.addAll(events);
       final var env = StreamExecutionEnvironment.getExecutionEnvironment();
       final var sink = new TestListSink();
       env.fromCollection(sourceEvents)
           .keyBy(value -> value)
           .flatMap(new Deduplicator<>(TTL))
           .sinkTo(sink);

       env.execute();

       final var outputEvents = sink.getHandledEvents();
       assertEquals(events.size(), outputEvents.size(),
           format("Unexpected number of events after deduplication. Output events: %s", outputEvents));
       assertEquals(events, new ArrayList<>(outputEvents), "Unexpected events after deduplication");
   }

   private static Stream provideUniqueEvents() {
       return Stream.of(arguments(List.of("key_1", "key_2")));
   }
}

По своей сути тест очень похож на предыдущие: на вход получаем несколько String-событий, определяем источник данных fromCollection (), передав в него дважды входные данные. Потом определяем сам дедупликатор и выходной Sink с использованием нашего универсального TestListSink. После запуска пайплайна проверяем, что данных в результирующем Sink столько же, сколько было уникальных сообщений.

Важным моментом является использование MiniCluster. Это происходит под капотом во время вызова метода StreamExecutionEnvironment.getExecutionEnvironment (). Так как наш FlinkClusterExtension отрабатывает до запуска теста, то на момент непосредственного выполнения теста уже будет создан мини-кластер на локальной машине, а метод getExecutionEnvironment () его увидит и подтянет.

Тестирование всей Job с помощью Flink MiniCluster

Теперь можно переходить к первому тесту всего Flink-задания. Напомню, что наша джоба фильтрует входной поток ClickMessage, пропуская события только с типом платформы WEB и APP. Далее события APP проходят дедупликацию, а затем события APP и WEB записываются в выходной топик Kafka, определяющийся динамически из поля ClickMessage.productTopic.

Пайплайн выглядит так:

dd3a50ecd7a89fc4bf94b5f369260358.png

В тесте будут участвовать многие Spring-компоненты, поэтому выделим новую аннотацию для тестирования Flink Job:

@Retention(RUNTIME)
@SpringBootTest(
   webEnvironment = NONE,
   classes = {
       PropertiesConfig.class,
       FlinkConfig.class,
   })
@ActiveProfiles({"test"})
@WithFlinkCluster
public @interface FlinkJobTest {
}

Так как мы не хотим поднимать абсолютно весь контекст по умолчанию, то задаём в аннотации @SpringBootTest лишь две конфигурации:

  1. PropertyConfig, который добавляет в контекстное все наши проперти-классы, которые биндятся с application.yml.

  2. FlinkConfig, в котором регистрируется и настраивается бин StreamExecutionEnvironment.

Дополнительно мы применим созданную аннотацию @FlinkJobTest для поднятия мини-кластера.

Мы тестируем саму логику джобы, а какие именно будут source и sink, неважно. Реальные имплементации (Kafka) мы подставим в Е2Е-тестах.

Первый тест будет проверять, что события APP дедуплицируются:

@FlinkJobTest
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
class ClickToProductJobUnitTest {
   @Autowired
   private StreamExecutionEnvironment environment;
   @Autowired
   private ClickToProductJobProperties properties;

   @ParameterizedTest
   @EnumSource(value = Platform.Enum.class, names = {"APP"})
   @SneakyThrows
   void shouldDeduplicateClickMessages(Platform platform) {
       final var message = aClickMessage().withPlatform(platform).build();
       final var sink = new TestListSink>();
       final var job = new ClickToProductJob(
           properties,
           env -> env.fromElements(message, message, message).uid("test_source"),
           () -> sink
       );

       job.registerJob(environment);
       environment.execute();

       final var out = sink.getHandledEvents();
       assertEquals(1, out.size(), format("Unexpected message count in sink: %s", out));
   }

   // ...
}

В начале кода мы используем нашу аннотацию @FlinkJobTest. В самом тесте можем заинжектить основные бины:

  1. StreamExecutionEnvironment, нацеленный на поднятый в FlinkClusterExtension мини-кластер.

  2. ClickToProductJobProperties, в котором имеем все настроенные в application-test.yml настройки.

В самом тесте создаём ClickMessage — на основе платформы APP, переданной в аргументы параметризованного теста. Далее определяем описанный в предыдущих главах Sink TestListSink и вручную создаём тестируемую джобу ClickToProductJob. В аргументы джобы передаём проперти, источник данных и приёмник.

! Источник данных содержит три одинаковых экземпляра входного сообщения, которые джоба должна дедуплицировать.

После этого происходит регистрация джобы в environment мини-кластера и синхронный запуск. Так как наш источник данных fromCollection имеет три события, то по мере их обработки джоба завершится автоматически. Это достаточно важный момент, потому что в случае бесконечных источников данных (например, Kafka) джоба будет выполняться вечно, пока мы её не завершим. Для вечного выполнения понадобится асинхронный запуск, который рассмотрим в следующих статьях. На выходе проверяем, что Sink содержит лишь одно событие, а остальные дедуплицировались.

Так можно написать тест, который проверяет, что события WEB не дедуплицируются, но обрабатываются, а события с произвольным типом платформы не обрабатываются вообще. Пример теста можно посмотреть в репозитории проекта, указанном в начале статьи.

В итоге структура тестов выглядит следующим образом:

cda48ceee25cd450d1c9fa9808155a9e.png

Вывод

Мы рассмотрели создание Unit-теста на полноценную джобу Flink и отдельные stateful-операторы с использованием мини-кластера. Также мы научились запускать мини-кластер один раз перед всеми тестовыми классами, нуждающимися в нём. В дополнение мы создали вспомогательные абстракции и аннотации, существенно разделяя ответственность в тестах и упрощая логику написания новых тестов.

За кадром осталась Kafka, ведь наша джоба читает и пишет данные в её топики. Как написать E2E-тест на полную интеграцию Kafka + Flink Job, я расскажу в следующей части.

Habrahabr.ru прочитано 2528 раз