Как использовать Spring в качестве фреймворка для Flink-приложений
Всем привет, меня зовут Александр Бобряков. Я техлид в команде МТС Аналитики, занимаюсь Real-Time обработкой данных. Недавно мы начали использовать фреймворк Apache Flink, и я решил поделиться на Хабре своим опытом внедрения этой технологии в наши продукты в цикле статей. В предыдущей части я рассказывал про основные концепции потоковой обработки данных. А ещё затронул архитектуру и главные механизмы Apache Flink.
В этой статье начнем разработку Flink-приложения с использованием фреймворка Spring. Изучим структуру приложения, основные плагины и полезные настройки. Развернем Flink-кластер в Docker и попробуем запустить первое Flink-задание. Структура приложения будет постепенно развиваться в последующих статьях.
Это вторая статья из серии моих материалов про Apache Flink. По мере выхода новых постов, ссылки на них будут появляться ниже.
Список моих постов про Flink
Введение в Apache Flink: осваиваем фреймворк на реальных примерах
Приложение под Apache Flink: с чего начать?
TBD
Весь разбираемый исходный код можно найти в репозитории. В master-ветке представлен итоговый проект по всей серии статей. Этот пост соответствует релизной ветке с названием release/1_Flink_with_Spring_template.
Flink + Spring. Первые шаги
Дисклеймер: использование Spring — это не best-practice, так как проще прибегнуть к более легковесным DI-фреймворкам.
Но Spring привычнее любому java-разработчику, а внедрять что-то новое как правило дорого по ресурсам. Все же при его использовании рекомендуется исключить все ненужные вам зависимости для уменьшения размера запускаемого толстого jar-файла (в этой статье мы на этом не останавливаемся, но если будут вопросы — пишите).
Стек технологий
В проекте я буду использовать следующие инструменты и технологии:
Понятное дело, вы можете использовать более свежие версии.
Создаем проект
Первым делом создаем базовый шаблон приложения в Idea:
Теперь нам нужно определить плагины, зависимости и другие параметры сборки приложения. Блок с плагинами будет выглядеть следующим образом:
plugins {
id 'java'
id 'application'
id 'org.springframework.boot' version '2.7.7'
id 'io.spring.dependency-management' version '1.0.15.RELEASE'
id 'jacoco'
id 'com.github.johnrengelman.shadow' version '7.1.2'
id 'pmd'
}
Теперь переходим к плагину shadow. Для запуска приложения в Flink-кластере нам нужно собрать исполняемый jar-файл. Стандартный плагин Spring Framework не подойдет. Он использует специфический макет загрузки классов, который не поддерживается Apache Flink.
Нам нужен другой способ создания толстого jar-файла со всеми добавленными зависимостями, и им может стать Gradle Shadow. Подробнее про упаковку программ для Flink написано в документации.
Использовать этот плагин достаточно просто, его настройка выглядит следующим образом:
shadowJar {
dependsOn 'check'
zip64 true
mergeServiceFiles()
append 'META-INF/spring.handlers'
append 'META-INF/spring.schemas'
append 'META-INF/spring.tooling'
transform(PropertiesFileTransformer) {
paths = ['META-INF/spring.factories']
mergeStrategy = "append"
}
}
Список зависимостей можно определить вот так, но они будут расширяться по мере развития приложения:
ext {
set('flinkVersion', '1.17.0')
set('jacksonVersion', '2.14.1')
set('awaitilityVersion', '4.2.0')
set('testcontainersVersion', "1.18.0")
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation "com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}"
implementation "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}"
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${jacksonVersion}"
implementation "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:${jacksonVersion}"
annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
annotationProcessor 'org.projectlombok:lombok'
compileOnly "org.apache.flink:flink-streaming-java:${flinkVersion}"
compileOnly 'org.projectlombok:lombok'
testImplementation 'ch.qos.logback:logback-classic:1.2.9'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation "org.apache.flink:flink-streaming-java:${flinkVersion}:tests"
testImplementation "org.apache.flink:flink-test-utils:${flinkVersion}"
testImplementation "org.apache.flink:flink-statebackend-rocksdb:${flinkVersion}"
testImplementation 'org.testcontainers:junit-jupiter'
testImplementation "org.awaitility:awaitility:${awaitilityVersion}"
testImplementation "commons-io:commons-io:2.11.0"
testAnnotationProcessor 'org.projectlombok:lombok'
testCompileOnly 'org.projectlombok:lombok'
}
Мы используем стандартные зависимости, поэтому подробно останавливаться на них не будем.
Следом укажем Main-класс приложения в результирующем манифесте. Это можно сделать путем установки в build.gradle параметра:
mainClassName = 'com.asbobryakov.flink_spring.Main'
В итоге для сборки исполняемого jar-файла нужно будет воспользоваться командой:
./gradlew shadowJar test
Xlint: serial
Дополнительно в build.gradle рекомендую добавить флаг для javac-компилятора –Xlint:serial.
tasks.withType(JavaCompile) {
options.compilerArgs << "-Xlint:serial" << "-Werror"
}
С этой настройкой будет выдаваться предупреждение, если есть класс, реализующий интерфейс Serializable, в котором не определено поле serialVersionUID.
В мире распределенного вычисления это важно: при выполнении программы наши классы будут сериализовываться для передачи между компонентами кластера.
Структура приложения
Проект будет выглядеть так:
В качестве настроек application.yml исходим из определения имени приложения и флага для старта пайплайнов обработки данных, который заиспользуем чуть дальше:
flink:
app-name: flink-spring
submit-jobs-on-app-start: true
Так как мы имеем дело со Spring, наше приложение содержит стандартный основной класс, являющийся фактической точкой входа в программу. Именно его мы указывали в gradle-настройке «mainClassName»:
@SpringBootApplication
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
}
В момент запуска приложения произойдет поднятие контекста, после чего можно начать основную работу. Определяем листенер, который будет выступать логической точкой входа в программу:
@Component
@RequiredArgsConstructor
@ConditionalOnProperty("flink.submit-jobs-on-app-start")
public class AppListener {
private final JobStarter jobStarter;
@EventListener(ApplicationStartedEvent.class)
@SneakyThrows
public void onApplicationStart() {
jobStarter.startJobs();
}
}
Как раз в этом месте удобно иметь флаг «flink.submit-jobs-on-app-start» для управления созданием этого компонента. Он пригодится в тестах (сразу выставим в false) и при первых деплоях приложения, когда ничего не понятно и хочется попробовать просто запустить пустой jar без непосредственной логики.
Задача листенера — передать управление по запуску задач нашему кастомному компоненту JobStarter. Для этого определим абстрактный класс FlinkJob, который будут реализовывать все наши бизнес-джобы, запускаемые в Flink:
public abstract class FlinkJob {
public abstract void registerJob(StreamExecutionEnvironment env);
}
Метод registerJob должен создать и зарегистрировать весь пайплайн обработки данных и привязать его в Flink-контексте StreamExecutionEnvironment. Об этом поговорим далее. Возвращаясь к JobStarter, напишем код для поиска всех задач и их запуска:
@Service
@Slf4j
@RequiredArgsConstructor
public class JobStarter {
private final StreamExecutionEnvironment environment;
private final List jobs;
@SneakyThrows
public JobClient startJobs() {
if (jobs.isEmpty()) {
log.info("No Jobs found for start");
return null;
}
for (FlinkJob job : jobs) {
log.info("Register job '{}'", job.getClass().getSimpleName());
job.registerJob(environment);
}
return environment.executeAsync();
}
}
В коде все достаточно просто: Spring находит бины, реализующие FlinkJob, после чего инжектит их в бин текущего класса. Он по ним пробегается и регистрирует. Для асинхронного запуска задач в Flink используется метод executeAsync ().
Поговорим подробнее о классе StreamExecutionEnvironment. Это отправная точка для конфигурации Flink-приложения и создания пайплайнов потоковой обработки данных.
Чтобы получить объект этого класса, можно воспользоваться статическим методом StreamExecutionEnvironment.getExecutionEnvironment (). Flink сам поймет, как ему обработать его вызов: если запуск произведен с локального компьютера без предварительно развернутого Flink-кластера, то будет автоматически создан мини-кластер под текущие ресурсы компьютера. А при запуске на ноде реального Flink-кластера будет найден именно этот кластер.
По сути это точка входа, на основе которой мы построим все операции. Позже они исполнятся на найденном (или созданном) окружении. Для регистрации такого бина напишем Spring класс-конфигурацию:
@Configuration
public class FlinkConfig {
@Bean
public StreamExecutionEnvironment streamExecutionEnvironment() {
return StreamExecutionEnvironment.getExecutionEnvironment();
}
}
Общие шаги создания любой задачи можно описать как:
Получение ExecutionEnvironment
Определение источников данных (Source)
Определение преобразований данных (Operator)
Определение получателей данных (Sink)
Запуск
Осталось определить базовую Flink-задачу в одной из нашей реализации FlinkJob:
@Component
public class SimpleJob extends FlinkJob {
@Override
public void registerJob(StreamExecutionEnvironment env) {
env.fromElements("value_1", "value_2", "value_3")
.map(value -> "after_map_" + value)
.print();
}
}
Как можно увидеть, сначала определяем источник данных — список конкретных String-объектов. Существует API для указания более сложных источников, таких как Kafka, но эти примеры мы рассмотрим в следующих статьях.
Далее идет преобразование каждого элемента с помощью метода map, а в качестве приемника данных используется STDOUT. В целом процесс построения пайплайна очень похож на работу с Java Stream API. В Flink существует другой API для построения программ (например Table API), но сейчас сосредоточимся на DataStream API.
На этом реализация задачи закончена. Мы ожидаем, что Flink обработает три элемента-строки, преобразует их и напечатает в stdout результат. Теперь это нужно проверить, но обойдемся пока без тестов. Мы запустим свой Flink Cluster через docker-compose.
Запуск Flink-приложения в локальном кластере
Для пробного запуска нам понадобится локально поднять Flink Cluster. Для этого можно воспользоваться docker-compose.
Flink-кластер в docker-compose
В docker-compose определяем JobManager и хотя бы один TaskManager. Можем указать несколько TaskManager для увеличения уровня параллельности вычислений. Роли этих компонентов мы рассмотрели в предыдущей статье.
version: '3.9'
services:
jobmanager:
image: flink:1.17.0-scala_2.12-java11
ports:
- '8081:8081'
environment:
FLINK_PROPERTIES: "jobmanager.rpc.address: jobmanager"
command:
- jobmanager
taskmanager:
image: flink:1.17.0-scala_2.12-java11
environment:
FLINK_PROPERTIES: "jobmanager.rpc.address: jobmanager"
command:
- taskmanager
Обратите внимание на порт 8081 — он используется для предоставления Flink UI (http://localhost:8081)
Flink UI и запуск приложения
После запуска docker-compose и перехода на http://localhost:8081 мы попадем во Flink UI:
На первой вкладке Overview отображается основная информация. Также видно количество доступных TaskManager и слотов (1 в нашем случае согласно определению docker-compose файла).
Для ручного запуска jar-файла переходим во вкладку «Submit New Job» и запускаем предварительно собранный jar. Он будет создан в каталоге ./build/libs/) командой:
./gradlew shadowJar
Main-класс автоматически подтянулся из манифеста Jar. При необходимости тут можно указать дополнительные параметры запуска — например, уровень параллелизма или точку сохранения. Когда приложение заработает, увидим следующую картину:
Важная оговорка: для более красивой визуализации графа задачи (разбиение каждого оператора пайплайна на отдельные блоки) я использую настройку streamExecutionEnvironment.disableOperatorChaining()
. Подробнее про это можно почитать в документации Flink. Без неё вы увидите на графе только один блок, совмещающий сразу три операции: source, map. sink.
Это происходит, потому что Flink старается объединить операторы в один, чтобы не тратить ресурсы на сериализацию данных между операторами. Такое возможно, если последовательные операторы имеют одинаковую параллельность, а данные между их параллельными задачами (Tasks) не перемешиваются (forward-передача). Для боевого решения выключать эту настройку не рекомендуется.
В итоге видно, что наша задача завершилась корректно. Это происходит, т.к. источник данных имеет конечное число элементов, а Flink автоматически останавливает задание по окончанию их обработки. Подробнее об этом расскажу в следующих статьях, когда будем работать с «unbounded» потоками. Будь источником данных Kafka, наша задача продержалась бы в состоянии Running условно вечно.
Результаты
По умолчанию Sink в виде print () печатает результат в стандартный поток вывода машины, на которой запущено приложение. Открываем логи контейнеров docker:
1) docker-compose-jobmanager-1 (flink:1.17.0-scala_2.12-java11)
Видим лог о поиске и запуске заданий классом JobStarter.
2) docker-compose-taskmanager-1 (flink:1.17.0-scala_2.12-java11)
Видим лог событий в потоке данных, , а также информацию о завершении задачи.
Собственно, вот так это и работает. В следующих постах рассмотренная джоба SimpleJob нам не потребуется — она была нужна для наглядности. В третьей статье мы рассмотрим реальную бизнес-задачу с созданием пайплайна данных Kafka-to-Kafka с дедупликацией сообщений.
Спасибо, что прочитали! Если возникнут вопросы, задавайте их в комментариях.