Apache Flink: Сериализация и JacksonStateSerializer

4525793e2666945f668ada3220b6cbc1.png

Привет, Хабр! На связи Александр Бобряков, техлид в команде МТС Аналитики. Это мой десятый материал про Apache Flink. В предыдущей части мы закончили разбирать оператор с Flink-таймерами, использующими внутреннее состояние. Также я показал, как их можно тестировать с помощью классов TestHarness или Flink MiniCluster. В дополнение тестами была покрыта вся Flink-джоба, включая E2E-тесты.

В этой части мы посмотрим сериализацию данных и состояний в операторах. Также напишем свой сериализатор, поддерживающий эволюцию схемы. В следующих частях протестируем его и внедрим в наше приложение.

Весь разбираемый исходный код можно найти в репозитории AlexanderBobryakov/flink-spring. В master-ветке представлен итоговый проект по всей серии статей. Эта часть соответствует релизной ветке с названием release/9_JacksonStateSerializer.

По мере выхода новых материалов на Хабре ссылки на них будут появляться ниже.

Список моих статей про Flink:

Оглавление:

Сериализация во Flink

 Во время выполнения Flink-задания потоки данных проходят через различные операторы в слотах Task Manager кластера. При этом происходит передача событий по сети внутри кластера и между сторонними ресурсами (бэкенды состояний, асинхронные вызовы и т. д). Поэтому все эти данные нужно оптимально сериализовать и десериализовать. Если сериализация настроена неправильно, производительность вашего Flink-задания заметно снизится.

Для эффективной работы Flink нуждается в конкретных знаниях о типах событий. Эта информация используется для создания оптимальных сериализаторов, десериализаторов и компараторов данных для каждого обрабатываемого типа. Чем больше Flink знает о типах данных, тем лучше схемы сериализации и размещение данных в памяти.

Часто проблемы возникают в ситуациях, когда используются лямбда-функции или присутствует эффект стирания типов в Java. В первом случае лямбда-выражения Java 8 не связаны с реализующим классом, который расширяет интерфейс функции, поэтому извлечение типов для них работает иначе, чем для других выражений. Во втором случае компилятор Java отбрасывает большую часть информации об универсальном типе после компиляции. Все это означает, что во время выполнения экземпляр объекта больше не знает своего типа. Например, экземпляры DataStream и DataStream могут выглядеть одинаково. В таких случаях нужно явно указать Flink дополнительную информацию, чтобы улучшить быстродействие.

​​Flink поддерживает следующие типы данных:

  1. Java Tuples и Scala Case Classes: составные типы данных, которые состоят из фиксированного числа типизированных полей: Tuple1… Tuple25.

  2. Java POJOs. Требования к нему выглядят так:

  • Класс должен быть общедоступным.

  • Ему нужен публичный конструктор без аргументов (конструктор по умолчанию).

  • Все поля либо общедоступны, либо доступны через функции получения и установки. Для поля с именем Foo методы получения и установки должны быть названы getFoo () и setFoo ().

  • Тип поля поддерживается зарегистрированным сериализатором.

Вы можете проверить, соответствует ли ваш класс требованиям POJO, вызвав из файла flink-test-utils метод:

@Test
void shouldPojoSerializer() {
   org.apache.flink.types.PojoTestUtils.assertSerializedAsPojo(ClickMessage.class);
}

В случае отрицательного результата вы увидите следующее сообщение:

  1. Primitive Types. Поддерживаются все стандартные примитивы Java/Scala.

  2. Regular Classes.

  3. Values: специальные классы, реализующие org.apache.flink.types.Value (IntValue, LongValue, FloatValue и другие). Используйте их, когда применение сериализации общего назначения будет крайне неэффективным. Примером может служить тип данных, реализующий разреженный вектор элементов в виде массива.

  4. Hadoop Writables. Можно использовать типы, которые реализуют org.apache.hadoop.Writable интерфейс.

  5. Special Types.

Чтобы напрямую указать Flink тип данных, существует метод — TypeInformation.of (MyClass.class). Эту подсказку нужно применять, когда Flink не может восстановить стертую информацию о типе. Например, при использовании метода returns и в других случаях:

environment.fromElements(...)
       .map(...)
       .returns(MyClass.class)
       .sinkTo(...);

Все такие подсказки о типе Flink использует для выбора эффективного сериализатора. Если нужно использовать свой сериализатор при передаче объекта между операторами, создайте свой экземпляр TypeInformation, переопределив необходимые методы, в том числе createSerializer ().

Выбор сериализатора

Под все типы создаются свои сериализаторы:

  1. Стандартные сериализаторы Flink используются для базовых (примитивов) и вспомогательных типов (Optional, List, Map…), массивов, кортежей и так далее.

Пример: TupleSerializer, ByteSerializer, MapSerializer, RowSerializer.

  1. PojoSerializer применяется для POJO-классов. Eсли ваш тип данных не охватывается специализированным сериализатором, но соответствует правилам POJO, то будет использоваться именно он.

  2. Kryo — универсальный вариант для любых остальных типов.

  3. Пользовательские сериализаторы, которые при необходимости можно создать самим под любой тип. Например, для интеграции с другими вариантами сериализаторов. Таких как Google Protobuf, Apache Thrift и прочие, хотя многие такие связки уже существуют.

Чаще всего используется Kryo, так как это универсальный вариант. Важно уметь его настраивать. Для этого зарегистрируем классы перед началом выполнения задания для повышения производительности сериализации: env.getConfig ().registerKryoType (MyCustomType.class).

private void registerKryoTypes(StreamExecutionEnvironment env) {
   env.getConfig().registerKryoType(ClickMessage.class);
   env.getConfig().registerKryoType(TriggerMessage.class);
   env.getConfig().registerKryoType(AlertMessage.class);
}

Разработчики Flink также предоставляют результаты производительности этих сериализаторов:

74d238301deea8b719e753a4b71fe28e.png

Так как стандартный Kryo сериализатор показывает среднюю производительность, то лучше выбрать другой вариант. Но как проверить, использует ли Flink Kryo для каких-то ваших схем? Достаточно отключить использование универсальных типов в настройке Flink StreamExecutionEnvironment:

final var env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableGenericTypes();

Этот метод запрещает использование типов, которые могут быть сериализованы только через Kryo. То есть Flink будет генерировать UnsupportedOperationException всякий раз, когда встретит тип данных, проходящий через Kryo для сериализации, например:

da5c441f0d5738379c64f28b40e889ef.png

Метод disableGenericTypes () крайне не рекомендуется оставлять в production-коде, так как в есть вероятность, что в программе появится ранее неизвестный тип (при динамической загрузке классов или по другой причине), который повлечет падение Flink-задания.

Эволюция схемы

В процессе работы приложения могут измениться бизнес-требования и нам придется поменять формат данных. Поэтому схемы данных должны эволюционировать: уметь развиваться вместе с приложениями.

Во время потоковой обработки событий, например при чтении из Kafka, мы используем кастомные десериализаторы (как Jackson, показанный в предыдущих частях на практике). Но дела обстоят сложнее в случае хранения промежуточных данных в операторах с состоянием (ValueState и т. д.), использующих дескрипторы состояния при инициализации:

// alert state
final var alertDescriptor = new ValueStateDescriptor<>("alertState", AlertState.class);
alertDescriptor.enableTimeToLive(defaultTtlConfig);
alertState = getRuntimeContext().getState(alertDescriptor);

Представьте, что наше приложение работает, а какой-то оператор использует состояние с RocksDB в качестве бэкенда состояний, куда записывает объекты AlertState. Затем мы обновили приложение и добавили новое поле в класс AlertState. При перезапуске приложения из savepoint нужно иметь возможность корректной десериализации из RocksDB старых сообщений AlertState, у которых при сериализации в самом определении класса AlertState отсутствовало новое поле как таковое.

Возможность развития схемы состояния зависит от сериализатора, используемого для чтения/записи байтов сохраняемого состояния. Другими словами, схема зарегистрированного состояния может развиваться только в том случае, если ее сериализатор должным образом поддерживает ее. Поэтому задача сводится к реализации сериализатора с возможностью сериализовать и десериализовать объект, у которого изменился состав класса между перезапусками приложения в кластере.

Сейчас из коробки Fink 1.17 позволяет эволюционировать схему только для типов POJO и Avro. Заметьте, у Kryo этой возможности нет. Если вы используете какие-то уникальные особенности приложений или классов, нужно позаботиться о реализации нового сериализатора, который поддерживает необходимое изменение схемы.

Для понимания внутренних процессов покажу, как написать такой сериализатор самостоятельно.

Реализация кастомного Jackson-сериализатора, поддерживающего эволюцию схемы

Задача сериализатора

Допустим, мы хотим создать сериализатор, использующий Jackson-аннотации для класса, который описывает состояние в операторе с поддержкой эволюции схемы этого класса. Например, у нас есть следующая эволюция класса для состояния:

4a4290885ec0cee3107765228766309c.png

Изначально в классе версии v1 есть два поля, которые стандартно сериализуются. Сразу проверим через Flink использование PojoSerializer под такой класс:

PojoTestUtils.assertSerializedAsPojo(TestEvolvedClass.class)
Instances of the class 'TestEvolvedClass' cannot be serialized as a POJO, but would use a 'KryoSerializer' instead. 

При эволюции схемы до версии v2 удалилось старое поле intField, так как оно больше не нужно бизнесу, но добавилось поле newField. Его дефолтное значение выставляется в конструкторе класса. С помощью аннотации @JsonIgnoreProperties (ignoreUnknown = true) мы понимаем, что десериализация в новую версию класса должна пройти успешно при удалении intField. Иначе получится ошибка десериализации, так как в сериализованном объекте присутствует поле, которого нет в новой схеме.

Под такое условие задачи начнем реализовывать собственный JacksonStateSerializer, чтобы не использовать Kryo. Подробное описание, как создать десериализатор, есть в документации.

Дисклеймер: дальнейшая реализация JacksonStateSerializer является только примером, так как основное внимание будет уделено тестам на эволюцию схем.

Реализация JacksonStateSerializer

В общих чертах сериализатор должен быть универсальным и работать с любым переданным ему классом, описанным с Jackson-аннотациями. То есть сериализатор необходимо параметризовать: JacksonStateSerializer. Также следует использовать под капотом объект ObjectMapper.

Нам понадобится реализовать два взаимосвязанных абстрактных класса: непосредственный сериализатор TypeSerializer и TypeSerializerSnapshot. Стоит упомянуть, что взаимодействие Flink с этими абстракциями немного меняется в зависимости от используемого бэкенда состояний (JVM Heap, RocksDB, File System…). Подробнее об этом есть в документации. Рассмотрим эти абстракции.

TypeSerializerSnapshot

Представляет собой конфигурацию для TypeSerializer на определенный момент времени. Этот снапшот сохраняется в контрольных точках как единый источник метаинформации о схеме сериализованных данных. Этот класс нужен для проверки совместимости между новым и старым сериализаторами, так как их можно версионировать аналогично эволюции схем. Для этого они имеют свои версии и типы совместимости:

@PublicEvolving
public class TypeSerializerSchemaCompatibility {

   /** Enum for the type of the compatibility. */
   enum Type {

       /** This indicates that the new serializer continued to be used as is. */
       COMPATIBLE_AS_IS,

       /**
        * This indicates that it is possible to use the new serializer after performing a full-scan
        * migration over all state, by reading bytes with the previous serializer and then writing
        * it again with the new serializer, effectively converting the serialization schema to
        * correspond to the new serializer.
        */
       COMPATIBLE_AFTER_MIGRATION,

       /**
        * This indicates that a reconfigured version of the new serializer is compatible, and
        * should be used instead of the original new serializer.
        */
       COMPATIBLE_WITH_RECONFIGURED_SERIALIZER,

       /**
        * This indicates that the new serializer is incompatible, even with migration. This
        * normally implies that the deserialized Java class can not be commonly recognized by the
        * previous and new serializer.
        */
       INCOMPATIBLE
   }

  // ...
}

TypeSerializerSnapshot сохраняет информацию о параметрах самого сериализатора и всех вложенных при их наличии, а также записываемую схему данных.

Flink предоставляет несколько вспомогательных классов для реализации собственного TypeSerializerSnapshot:

  1. SimpleTypeSerializerSnapshot для совсем простых типов.

  2. CompositeTypeSerializerSnapshot для сериализаторов на основе других вложенных сериализаторов.

  3. GenericTypeSerializerSnapshot — базовая минимальная реализация для параметризированных сериализаторов.

В нашем случае удобно использовать GenericTypeSerializerSnapshot. Реализация будет выглядеть так:

@SuppressWarnings("unused")
public static final class JacksonSerializerSnapshot extends GenericTypeSerializerSnapshot> {
   private static final long serialVersionUID = 1;

   private Class typeClass;

   public JacksonSerializerSnapshot() {
       // used for flink reflection
   }

   public JacksonSerializerSnapshot(Class typeClass) {
       super(typeClass);
       this.typeClass = typeClass;
   }

   @Override
   protected TypeSerializer createSerializer(Class typeClass) {
       return new JacksonStateSerializer<>(typeClass);
   }

   @Override
   protected Class getTypeClass(JacksonStateSerializer serializer) {
       return typeClass != null ? typeClass : serializer.typeClass;
   }

   @Override
   protected Class serializerClass() {
       return JacksonStateSerializer.class;
   }
}

Описание этого фрагмента кода:

  1. Единственным полем является тип сериализуемого объекта, который нужно описать через Jackson-аннотации при желании сериализовать этим сериализатором.

  2. Конструктор по умолчанию должен быть обязательно. Это необходимо для десериализации моментального снимка конфигурации из его двоичной формы внутренними Flink-процессами.

  3. Основной конструктор, который принимает тип сериализуемого класса.

  4. Методы createSerializer, getTypeClass, serializerClass достаточно примитивны и вряд ли требуют подробного пояснения.

TypeSerializer

Представляет собой непосредственно сериализатор объектов. Нам нужно реализовать абстрактный класс TypeSerializer следующим образом:

public class JacksonStateSerializer extends TypeSerializer {
   private static final long serialVersionUID = 1;

   private final Class typeClass;
   private final transient ObjectMapper mapper;

   public JacksonStateSerializer(Class typeClass) {
       this.typeClass = typeClass;
       this.mapper = createObjectMapper();
   }

   @Override
   public boolean isImmutableType() {
       return false;
   }

   @Override
   public TypeSerializer duplicate() {
       return new JacksonStateSerializer<>(typeClass);
   }

   @Override
   public T createInstance() {
       return null;
   }

   @Override
   public T copy(T from) {
       return mapper.convertValue(from, typeClass);
   }

   @Override
   public T copy(T from, T reuse) {
       return copy(from);
   }

   @Override
   public void copy(DataInputView source, DataOutputView target) throws IOException {
       serialize(deserialize(source), target);
   }

   @Override
   public int getLength() {
       return -1;
   }

   @Override
   public void serialize(T record, DataOutputView target) throws IOException {
       final var bytes = mapper.writeValueAsBytes(record);
       target.writeInt(bytes.length);
       target.write(bytes);
   }

   @Override
   public T deserialize(DataInputView source) throws IOException {
       final var byteLength = source.readInt();
       final var recordBytes = new byte[byteLength];
       source.readFully(recordBytes);
       return mapper.readValue(recordBytes, typeClass);
   }

   @Override
   public T deserialize(T reuse, DataInputView source) throws IOException {
       return deserialize(source);
   }

   @Override
   @SuppressWarnings("unchecked")
   public boolean equals(Object obj) {
       if (obj == this) {
           return true;
       } else if (obj != null && obj.getClass() == JacksonStateSerializer.class) {
           final var that = (JacksonStateSerializer) obj;
           return this.typeClass == that.typeClass;
       } else {
           return false;
       }
   }

   @Override
   public int hashCode() {
       return typeClass.hashCode();
   }

   @Override
   public TypeSerializerSnapshot snapshotConfiguration() {
       return new JacksonSerializerSnapshot<>(typeClass);
   }

   @SuppressWarnings("unused")
   public static final class JacksonSerializerSnapshot extends GenericTypeSerializerSnapshot> {
      // ...
   }
}

Пройдемся по реализации. В качестве полей класса используются JacksonSerializerSnapshot тип сериализуемого класса, а также объект ObjectMapper для его использования при непосредственной сериализации и десериализации объектов. Настроенный объект ObjectMapper создается с помощью единственного статического метода в нашем коде.

Затем необходимо переопределить ряд стандартных методов. Большинство из них достаточно очевидны, поэтому остановимся на основных. Serialize используется для сериализации текущего record-объекта класса typeClass в рантайме: сначала записываем информацию о количестве байт для последующей вычитки, а потом и сам объект. Как раз в этом месте используем objectMapper.writeValueAsBytes для сериализации с автоматической обработкой всех Jackson-аннотаций. В метод deserialize обратно десериализуем объект с помощью objectMapper.readValue.

Важным методом является snapshotConfiguration (). Он возвращает экземпляр JacksonSerializerSnapshot. Этот экземпляр записывает в точку сохранения информацию об объекте TypeSerializerSnapshot вместе со всеми сериализованными объектами для его восстановления и поддержки совместимости самих JacksonSerializerSnapshot между собой.

На этом реализация JacksonStateSerializer закончена. Осталось проверить его работу. Для этого напишем тест со следующим сценарием:

  1. Берем Jackson-схему v1 для использования в качестве состояния в stateful-операторе.

  2. Запускаем Flink MiniCluster, подложив в classpath схему v1.

  3. Прогоняем одно сообщение в потоке данных для сериализации объекта в состояния со схемой версии v1.

  4. Снимаем точку сохранения savepoint.

  5. Останавливаем кластер.

  6. Берем Jackson-схему v2 для использования в качестве состояния в предыдущем stateful-операторе.

  7. Запускаем Flink MiniCluster из снятой точки сохранения, подложив в classpath схему v2

  8. Прогоняем одно сообщение в потоке данных для десериализации объекта схемы версии v1 (сохраненного ранее в состоянии) уже в схему версии v2

  9. Проверяем, что объект успешно десериализовался в схему версии v2

Такой тест написать достаточно сложно, поэтому мы рассмотрим его в следующей части.

В данной материале основной темой была эволюция схемы состояния. Мы сначала изучили сериализацию данных в Flink и несколько абстракций, а также затронули тему сериализаторов для состояний операторов. Затем с помощью Jackson сериализатора, поддерживающего эволюцию, мы создали сериализатор, переопределив классы TypeSerializer и GenericTypeSerializerSnapshot. В конце определили план тестирования нашего JacksonStateSerializer, которое рассмотрим в следующей части.

© Habrahabr.ru