Apache Flink: тестирование собственного сериализатора состояния

8fefee3e16ee0fa6e7a605e50c6fa93a.png

Привет, Хабр! На связи Александр Бобряков, техлид команды МТС Аналитика. Это мой одиннадцатый пост про Apache Flink. В предыдущей части мы рассмотрели сериализацию данных во Flink, написали сериализатор, поддерживающий эволюцию схемы для Flink-состояния в операторе на основе Jackson.

В этой части мы научимся писать тесты на эволюцию схемы состояния при использовании своего сериализатора.

Весь разбираемый исходный код можно найти в репозитории AlexanderBobryakov/flink-spring. В master-ветке представлен итоговый проект по всей серии. Этот материал соответствует релизной ветке с названием release/10_test_JacksonStateSerializer.

По мере выхода новых материалов на Хабре ссылки на них будут появляться ниже.

Список моих постов про Flink:

Оглавление:

Вводные условия сериализатора

В прошлой части мы создали свой JacksonStateSerializer, имплементирующий интерфейс TypeSerializer. Он должен уметь сериализовать и десериализовать объекты в состояние ValueState. Его использование для stateful-оператора с состоянием будет выглядеть так:

private ValueState state;

@Override
public void open(Configuration parameters) {
   final var descriptor = new ValueStateDescriptor<>("state", new JacksonStateSerializer<>(AlertState.class));
   state = getRuntimeContext().getState(descriptor);
}

Одна из функций сериализатора — поддерживать эволюцию схемы. Предположим, что:

  1. Мы определили схему состояния.

  2. Наше приложение уже долго работает.

  3. Объекты сохраняются в бэкенд состояния согласно схеме.

  4. После всего этого мы обновляем схему по бизнес-требованию (например, удаляем и добавляем поля).

В этом случае нужно, чтобы старые объекты без проблем десериализовались в новую схему. Для примера возьмем эволюцию схемы класса TestEvolvedClass из предыдущего текста:

3a66da3d284939a674913a98f6246793.png

Во второй версии удалилось старое поле intField, но добавилось новое — newField. Его дефолтное значение выставляется в конструкторе класса. С помощью аннотации @JsonIgnoreProperties (ignoreUnknown = true) мы ожидаем, что десериализация объекта класса v1 в новую версию v2 должна пройти успешно при удалении intField, а новое поле инициализируется в дефолтное значение default_value.

Тест эволюции схемы состояния

Сценарий теста

Первый этап: создание точки сохранения

3a687543ee5cd89157fc6393de92c33f.png

  1. Берем Jackson-схему v1 для использования в качестве состояния в stateful-операторе.

  2. Запускаем Flink MiniCluster, подложив в classpath схему v1.

  3. Прогоняем одно сообщение в потоке данных для сохранения (сериализации) объекта внутри состояния.

  4. Снимаем точку сохранения savepoint.

  5. Останавливаем кластер.

Второй этап: тестирование обратной совместимости

ca6649cc49edf74cd748c4b131e829ca.png

  1. Берем Jackson-схему v2 для использования в качестве состояния в предыдущем stateful-операторе.

  2. Запускаем Flink MiniCluster из снятой точки сохранения, подложив в classpath схему v2.

  3. Прогоняем одно сообщение в потоке данных для десериализации объекта схемы версии v1 (сохраненного ранее в состоянии) уже в схему версии v2.

  4. Проверяем, что объект успешно десериализовался в схему версии v2.

Вспомогательные компоненты теста

Для реализации логики теста по схеме выше нам нужно выделить дополнительные абстракции:

MiniClusterUtils

В тесте мы два раза запустим Flink MiniCluster, поэтому вынесем его создание в отдельный метод createMiniCluster (). В нем мы определим все основные настройки: параллелизм, количество TaskManager и т. д.

@UtilityClass
public class MiniClusterUtils {

   @NotNull
   public static MiniClusterWithClientResource createMiniCluster() {
       final var configuration = new Configuration();
       configuration.set(CoreOptions.DEFAULT_PARALLELISM, 2);
       return new MiniClusterWithClientResource(
               new MiniClusterResourceConfiguration.Builder()
                       .setConfiguration(configuration)
                       .setNumberTaskManagers(2)
                       .setNumberSlotsPerTaskManager(1)
                       .build());
   }

   @SneakyThrows
   public static void startMiniClusterWithClasspath(MiniClusterWithClientResource cluster, Collection classpath) {
       cluster.before();
       TestStreamEnvironment.setAsContext(cluster.getMiniCluster(), 1, emptyList(), classpath);
   }

   @NotNull
   public static StreamExecutionEnvironment createEnvironmentWithRockDb(File rocksDbStoragePath) {
       final var env = StreamExecutionEnvironment.getExecutionEnvironment();
       final var rocksDb = new EmbeddedRocksDBStateBackend(false);
       rocksDb.setDbStoragePath(rocksDbStoragePath.toString());
       env.setStateBackend(rocksDb);
       env.getConfig().disableForceKryo();
       env.setMaxParallelism(2);
       env.setParallelism(2);
       return env;
   }
}

Одна из проблем здесь — наличие двух одновременных версий класса TestEvolvedClass. Вспоминаем, что в Java уникальность класса задается по связи полного класса с загружаемым его ClassLoader. Это позволяет создать в рамках теста две версии одного TestEvolvedClass, загрузив их разными ClassLoader. Далее эти классы можно научиться подкидывать во Flink MiniCluster. Как раз это делает метод startMiniClusterWithClasspath (), который запускает MiniCluster, указав ему дополнительный classpath для поиска классов.

TestStreamEnvironment.setAsContext — достаточно интересный метод. В нем можно использовать не только путь для classpaths, но и отдельные jar-пакеты:

50167cc142d1dc42b221d24d581d5748.png

Также в нашем MiniClusterUtils есть метод createEnvironmentWithRockDb (). Он нужен для создания TestStreamEnvironment с RocksDB, который мы используем на проде. Так мы удостоверимся в работоспособности тестируемого сериализатора в реальных условиях. На вход метод принимает директорию, где будут храниться все файлы для RocksDB, включая состояние.

TestStatefulMapCounter

Далее по схеме нам требуется тестовый stateful-оператор, у которого мы с помощью тестируемого сериализатора будем проверять успешность десериализации. В качестве такого оператора напишем простейший маппер:

public class TestStatefulMapCounter extends RichFlatMapFunction {
   private static final long serialVersionUID = 1L;
   private static final AtomicInteger statefulCounter = new AtomicInteger(0);
   private TypeSerializer stateSerializer;
   private Class classType;

   private ValueState state;

   public TestStatefulMapCounter(TypeSerializer stateSerializer) {
       this.stateSerializer = stateSerializer;
   }

   public TestStatefulMapCounter(Class classType) {
       this.classType = classType;
   }

   @Override
   public void open(Configuration parameters) {
       final var descriptor = stateSerializer != null
               ? new ValueStateDescriptor<>("state", stateSerializer)
               : new ValueStateDescriptor<>("state", classType);
       state = getRuntimeContext().getState(descriptor);
   }

   @Override
   public void flatMap(T value, Collector out) throws Exception {
       final var stateElement = state.value();
       if (stateElement != null) {
           statefulCounter.incrementAndGet();
       } else {
           state.update(value);
       }
       out.collect(value);
   }

   @Override
   public void close() {
       statefulCounter.set(0);
   }

   public int getStatefulCounter() {
       return statefulCounter.get();
   }
}

В конструкторе запрашиваем сам сериализатор, который используется для инициализации дескриптора состояния. Внутри flatMap-метода происходит самое интересное. Если мы получили событие из состояния, инкрементируя счетчик, то событие с таким же ключом уже было раньше. Если нет, то сохраняем его в состояние. Также есть метод getStatefulCounter для использования при проверке в самих тестах. Этот маппер можно расширить для получения полноценного объекта с дальнейшей проверкой конкретных полей.

ObjectTypeClassLoaderDto

Теперь перейдем к самому сложному — научимся загружать классы разными classLoaders. На старте теста классы не должны быть загружены, поэтому просто сохраним две наших версии TestEvolvedClass в виде текстовых файлов в ресурсы:

afa94d38d776066b45d2d01215f5b8c1.png

Важно: папку будем называть по названию класса, а внутренние версии эволюции схемы — по номеру версии.

Затем удобно работать со следующей абстракцией:

@Getter
@RequiredArgsConstructor
public final class ObjectTypeClassLoaderDto {
   private final T object;
   private final Class type;
   private final URLClassLoader classLoader;
}

Ее можно использовать для хранения объекта определенной версии в поле object. Его будем передавать на вход тесту в source. Класс объекта type будет равен TestEvolvedClass, но загрузится разными ClassLoaders в зависимости от первой или второй версии класса. Сам ClassLoader можно будет получить из поля classLoader. В качестве реализации ClassLoader удобно использовать URLClassLoader, потому что текстовые классы TestEvolvedClass нужно куда-нибудь скомпилировать, а доступ к ним сразу сможем получить по их URL.

StateClassLoadingUtil

Этот класс создает объекты нашей вспомогательной дто ObjectTypeClassLoaderDto. Его код выглядит так:

@UtilityClass
public class StateClassLoadingUtil {

   @NotNull
   @SneakyThrows
   @SuppressWarnings("unchecked")
   public static  ObjectTypeClassLoaderDto compileStateClass(String className, int version) {
       final var tmpPath = Path.of(System.getProperty("java.io.tmpdir"));
       final var baseGeneratedClassesPath = tmpPath.resolve("generatedClasses").resolve(UUID.randomUUID().toString()).toFile();
       if (!baseGeneratedClassesPath.mkdirs()) {
           throw new IllegalStateException(format("Failed to create temporary directory: [%s]", List.of(baseGeneratedClassesPath)));
       }
       final var generatedClassPath = baseGeneratedClassesPath.toPath().resolve(className).resolve(UUID.randomUUID().toString());
       @Cleanup final var classLoader = ClassLoaderUtils.writeAndCompile(
               generatedClassPath.toFile(),
               className,
               loadStateClassContentFromResources(className, version));

       final var aClass = (Class) classLoader.loadClass(className);
       final var obj = (E) createEasyRandom().nextObject(aClass);
       return new ObjectTypeClassLoaderDto<>(obj, aClass, classLoader);
   }

   @SneakyThrows
   public static String loadStateClassContentFromResources(String className, int version) {
       final var stateClassFile = getResourceByName("state/" + className + "/v" + version + ".txt");
       return FileUtils.readFileToString(stateClassFile, StandardCharsets.UTF_8);
   }

   @SneakyThrows
   public static File getResourceByName(String name) {
       final var fileUrl = StateClassLoadingUtil.class.getClassLoader().getResource(name);
       if (fileUrl == null) {
           throw new FileNotFoundException(format("Resource file '%s' not found", name));
       }
       return new File(fileUrl.getFile());
   }

   private static EasyRandom createEasyRandom() {
       // Important: create new EasyRandomParameters, because it caches classes
       final var easyRandomParameters = new EasyRandomParameters();
       easyRandomParameters.setRandomizationDepth(1);
       easyRandomParameters.setStringLengthRange(new EasyRandomParameters.Range<>(1, 5));
       easyRandomParameters.setCollectionSizeRange(new EasyRandomParameters.Range<>(1, 2));
       easyRandomParameters.randomize(Integer.class, new IntegerRangeRandomizer(0, 1000));
       return new EasyRandom(easyRandomParameters);
   }
}

Первым публичным методом является compileStateClass, возвращающий ObjectTypeClassLoaderDto по переданному названию класса TestEvolvedClass и его версии. Это как раз те данные, которые заложены в структуре наших текстовых классов в папке ресурсов. Для начала метод compileStateClass создает временную директорию baseGeneratedClassesPath вида /generatedClasses/, где мы производим все наши действия.

Далее по коду мы определяем внутреннюю директорию generatedClassPath, в которой будут лежать скомпилированные файлы. Основным действием идет вызов метода ClassLoaderUtils.writeAndCompile — это наш собственный класс, о котором поговорим далее. Важно понимать, что он на вход получает рабочую директорию, имя класса и его содержимое в виде строки — как раз то, что лежит в наших txt-файлах. На выходе из метода мы получаем новый объект classloader, который используем для непосредственной загрузки искомого класса и формирования нашего дто ObjectTypeClassLoaderDto.

Интересный момент — создание объекта класса. Для этого используется EasyRandom — достаточно простая библиотека, позволяющая инициализировать любой объект по его классу со сложной вложенной структурой. Как раз это мы делаем в методе createEasyRandom () и в дальнейшем вызове метода nextObject (aClass). Добавить библиотеку можно через зависимость:

testImplementation "org.jeasy:easy-random-core:5.0.0"

Важно помнить, что EasyRandom кэширует наши классы. Нужно каждый раз создавать новый объект EasyRandom, чтобы он увидел изменение схемы TestEvolvedClass с аналогичным именем, но версии v2.

Остальные методы нашего класса loadStateClassContentFromResources () и getResourceByName () являются достаточно очевидными и используются для корректного получения наших txt-файлов из директории ресурсов и их внутреннего содержимого.

После вызова входного метода compileStateClass () над TestEvolvedClass с версиями v1 и v2 можно наблюдать следующую созданную структуру папок во временной директории:

0000NBB04T3MD6M:~ asbobryako$ find /var/folders/_y/gd8sxnq91z9glrxjlkrj98tnbsxn37/T/generatedClasses | sed -e "s/[^-][^\/]*\// |/g" -e "s/|\([^ ]\)/|-\1/"
 | | | | |-generatedClasses
 | | | | | |-e79fb148-e12a-44fd-985b-171dff3d9b8d
 | | | | | | |-TestEvolvedClass
 | | | | | | | |-adef541d-b155-4f36-807a-66cc8269a49d
 | | | | | | | | |-source
 | | | | | | | | | |-TestEvolvedClass.java
 | | | | | | | | | |-TestEvolvedClass.class
 | | | | | |-b064ff0c-8f86-473b-8ba6-262c4114a816
 | | | | | | |-TestEvolvedClass
 | | | | | | | |-44410981-1662-47ca-8f49-c3c22b67618c
 | | | | | | | | |-source
 | | | | | | | | | |-TestEvolvedClass.java
 | | | | | | | | | |-TestEvolvedClass.class
0000NBB04T3MD6M:~ asbobryako$ 

ClassLoaderUtils

Класс предназначен для вспомогательных методов компиляции классов из Java-кода:

@UtilityClass
public class ClassLoaderUtils {

   @SneakyThrows
   public static URLClassLoader writeAndCompile(File root, String className, String source) {
       final var file = writeSourceToFile(root.toPath().resolve("source").toFile(), createJavaFileName(className), source);
       compileClass(file);
       return createClassLoader(file.getParentFile(), Thread.currentThread().getContextClassLoader());
   }

   private static String createJavaFileName(String className) {
       return className + ".java";
   }

   @SneakyThrows
   private static File writeSourceToFile(File root, String filename, String source) {
       final var file = new File(root, filename);
       if (!file.getParentFile().mkdirs()) {
           throw new IllegalStateException(format("Failed to create directory: [%s]", file));
       }
       try (var fileWriter = new FileWriter(file)) {
           fileWriter.write(source);
       }
       return file;
   }

   @SneakyThrows
   private static void compileClass(File sourceFile) {
       JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
       compiler.run(
               null,
               null,
               null,
               "-proc:none",
               "-classpath",
               sourceFile.getParent() + ":" + System.getProperty("java.class.path"),
               sourceFile.getPath());
   }

   private static URLClassLoader createClassLoader(File root, ClassLoader parent)
           throws MalformedURLException {
       return new URLClassLoader(new URL[]{root.toURI().toURL()}, parent);
   }
}

Основной метод у нас — writeAndCompile. На вход он принимает директорию, в которой будем компилировать классы, их названия и содержимое в виде строк (по сути все, что внутри нашего src/test/resources/state/TestEvolvedClass/v1.txt).

Для начала метод writeAndCompile создает TestEvolvedClass.java в уникальной подпапке /source переданной директории. Затем компилирует этот файл в методе compileClass (file) с помощью объекта JavaCompiler. Это происходит вызовом метода compiler.run () с передачей всех необходимых настроек. Подробнее про это можно почитать в документации. Важный момент — передача java.class.path-параметра для доступа компилятора ко всем классам, используемым в нашем коде схем.

Последний шаг — это создание URLClassLoader, на вход которому как раз передается директория со скомпилированным TestEvolvedClass.class-классом. Таким образом, у нас будет две разные версии одного и того же класса TestEvolvedClass.class в виде двух текстовых файлов v1.txt и v2.txt. Мы сохраняем их как .java-файлы в отдельные директории, затем компилируем и загружаем разными classloader.

Реализация теста проверки эволюции схемы

Теперь у нас есть все нужные компоненты. Можно проверить, что эволюция схемы работает в обе стороны: из v1 в v2 и наоборот.

Для теста определим следующие аргументы:

private static Stream evolveClasses() {
   final var className = "TestEvolvedClass";
   return Stream.of(
       arguments(className, 1, 2),
       arguments(className, 2, 1)
   );
}

Перед началом теста готовим ресурсы и назначаем временные директории под RocksDB и savepoint:

class JacksonStateSerializerUnitTest {
   @TempDir
   private Path tmpFolder;
   private File savepointDir;
   private File rocksDbDir;

   @BeforeEach
   @SneakyThrows
   void init() {
       savepointDir = new File(tmpFolder.toFile(), "savepoints");
       rocksDbDir = new File(tmpFolder.toFile(), "rocksDb");
       if (!savepointDir.mkdirs() || !rocksDbDir.mkdirs()) {
           fail(format("Test setup failed: failed to create temporary directories: %s",
                   List.of(savepointDir.toString(), rocksDbDir.toString())));
       }
   }
// ...
}

Переходим к тесту, который полностью отражает схему из начала текста:

@ParameterizedTest
@MethodSource("evolveClasses")
@SneakyThrows
@Timeout(value = 15, unit = SECONDS)
void shouldRestoreEvolvedState(String className, int baseVersion, int evolvedVersion) {
   // job to store base state in savepoint
   final var objectTypeClassLoaderDto = compileStateClass(className, baseVersion);

   var cluster = createMiniCluster();
   startMiniClusterWithClasspath(cluster, Arrays.asList(objectTypeClassLoaderDto.getClassLoader().getURLs()));
   final var jacksonStateSerializer = new JacksonStateSerializer<>(objectTypeClassLoaderDto.getType());
   final var statefulCounter = new TestStatefulMapCounter<>(jacksonStateSerializer);
   final var fullObjectSink = new TestListSink<>();
   @Cleanup final var env = createEnvironmentWithRockDb(rocksDbDir);

   final var object = objectTypeClassLoaderDto.getObject();
   env.addSource(TestUnboundedSource.fromElements(List.of(object)))
           .returns(objectTypeClassLoaderDto.getType())
           .keyBy(value -> "static_key")
           .flatMap(statefulCounter).uid("id_stateful")
           .sinkTo(fullObjectSink);
   final var jobClient = env.executeAsync(env.getStreamGraph());
   await()
           .atMost(ofSeconds(5))
           .until(() -> fullObjectSink.getHandledEvents().size() == 1);
   assertEquals(0, statefulCounter.getStatefulCounter(), "Stateful counter has state for current object key");
   final var savepoint = jobClient.triggerSavepoint(savepointDir.toString(), NATIVE).get(2, SECONDS);
   jobClient.cancel().get(2, SECONDS);
   cluster.after();

   // job with restored state from savepoint for evolved schema
   final var evolvedObjectTypeClassLoaderDto = compileStateClass(className, evolvedVersion);

   cluster = createMiniCluster();
   startMiniClusterWithClasspath(cluster, Arrays.asList(evolvedObjectTypeClassLoaderDto.getClassLoader().getURLs()));
   final var evolvedJacksonStateSerializer = new JacksonStateSerializer<>(evolvedObjectTypeClassLoaderDto.getType());
   final var evolvedStatefulCounter = new TestStatefulMapCounter<>(evolvedJacksonStateSerializer);
   final var evolvedSink = new TestListSink<>();
   @Cleanup final var evolvedEnv = createEnvironmentWithRockDb(rocksDbDir);
   final var evolvedObject = evolvedObjectTypeClassLoaderDto.getObject();
   evolvedEnv.addSource(TestUnboundedSource.fromElements(List.of(evolvedObject)))
           .returns(evolvedObjectTypeClassLoaderDto.getType())
           .keyBy(value -> "static_key")
           .flatMap(evolvedStatefulCounter).uid("id_stateful")
           .sinkTo(evolvedSink);
   final var streamGraph = evolvedEnv.getStreamGraph();
   streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepoint));
   final var evolvedJobClient = evolvedEnv.executeAsync(streamGraph);
   await()
           .atMost(ofSeconds(5))
           .until(() -> evolvedSink.getHandledEvents().size() == 1);
   assertEquals(1, evolvedStatefulCounter.getStatefulCounter(),
           "Stateful counter has no state for the current object key, which must be restored from a savepoint");
   evolvedJobClient.cancel().get(2, SECONDS);
   cluster.after();
}

В нем все предельно просто. В первый раз он выполнится с параметрами (TestEvolvedClass, 1, 2), поэтому на старте мы создадим объект-дто objectTypeClassLoaderDto для класса версии v1 с помощью представленных ранее вспомогательных абстракций. Затем запустим кластер с джобой и передачей classpath нашей схемы v1. В джобе создаем stateful-маппер, использующий наш JacksonStateSerializer.

Тут важно понимать несколько моментов:

  • В .keyBy определена лямбда-функция, которая должна возвращать один и тот же ключ «static_key», чтобы следующий запуск джобы нашел состояние для события со схемой v2. 

  • Перед остановкой джобы проверяем, что событие поступило на sink, и синхронно снимаем savepoint выражением triggerSavepoint (savepointDir.toString (), NATIVE).get (2, SECONDS). Это возможно благодаря тому, что запуск джобы происходит в асинхронном режиме с помощью нашего бесконечного тестового источника TestUnboundedSource, который писал в прошлых статьях. 

  • Указываем uid для stateful-оператора. Это нужно для корректного восстановления состояния из savepoint в нужный нам оператор при следующем запуске джобы. Это необязательно, если наша джоба никак не меняется по структуре операторов.

На данном этапе (середине листинга выше) имеем снятый savepoint, где лежит состояние для stateful-оператора. Мы можем успешно завершить работу Flink MiniCluster вызовом cluster.after (). В дополнение проверкой statefulCounter.getStatefulCounter () = 0 проверяем, что в процессе работы джобы не было найдено объектов внутри состояния, так как получаемый каунтер при их наличии инкрементится с текущим одинаковым для всех ключом static_key.

Вторая часть теста очень похожа на первую, но мы загружаем в classpath джобы класс TestEvolvedClass структуры v2. При запуске джобы обязательно указываем ссылку на savepoint. Это делается через вызов streamGraph:

final var streamGraph = evolvedEnv.getStreamGraph();
streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepoint));
final var evolvedJobClient = evolvedEnv.executeAsync(streamGraph);

Внимание! После получения объекта StreamGraph вы теряете все вызванные трансформации над объектом environment. Повторный вызов env.execute вернет исключение, что не найдено операторов, будто вы не делали env.fromSource ().map () и т. д. Учитывайте это:

0d9d0cf93d95bf03d12d2d0837615717.png

После асинхронного запуска джобы мы проверяем, что в sink попало сообщение версии v2 и что каунтер из состояния инкрементился: assertEquals (1, evolvedStatefulCounter.getStatefulCounter (). Это гарантирует, что никаких ошибок при десериализации не произошло. В дополнение, конечно, можно добавить проверки на конкретные поля и дефолтное значение для новых полей в схеме v2.

В итоге тест успешно выполняется:

615015c5c0fbd2dd76cd19d397fdb54b.png

Когда у нас есть протестированный сериализатор, мы можем использовать его в нашем приложении. Этим мы займемся в следующей части. Я покажу, как можно применить свой сериализатор и автоматизировать тесты, чтобы убедиться в должном покрытии тестами любого бездумного изменения схемы.

© Habrahabr.ru