DAG’и без напрягов: наш опыт использования метаданных при работе с Apache Airflow

Apache Airflow — простой и удобный batch-ориентированный инструмент для построения, планирования и мониторинга дата-пайплайнов. Ключевой его особенностью является то, что, используя Python-код и встроенные функциональные блоки, можно соединить множество различных технологий, использующихся в современном мире. Основная рабочая сущность Airflow — DAG — направленный ацикличный граф, в котором узлами являются задачи, а зависимости между задачами представлены направленными ребрами. 

Те, кто использует Apache Airflow для оркестрации задач загрузки данных в хранилище, наверняка оценили гибкость, которую он предоставляет для решения шаблонных задач. Когда весь процесс разработки сводится к заполнению конфигурационного файла с описанием параметров DAGа и списком задач, которые должны выполняться. У нас в Леруа Мерлен такой подход успешно используется для создания задач по перекладыванию данных из raw-слоя в ods-слой хранилища. Поэтому было решено распространить его на задачи по заполнению витрин данных.  

Основная сложность состояла в том, что единой методологии разработки витрин данных и процедур по их заполнению у нас пока нет. И каждый разработчик решал задачу, основываясь на своих личных предпочтениях и опыте. Это укладывается в один из основных корпоративных IT принципов — «You build it — you run it», который означает, что разработчик несет ответственность за свое решение и сам его поддерживает. Данный принцип хорош для быстрой проработки гипотез, но для однотипных вещей больше подходит стандартное решение. 

Как было

Тут стоит рассказать, как велась до этого разработка для загрузки витрин данных. Разработчик пишет процедуры загрузки в GreenPlum, разрабатывает DAGи для их запуска, после чего создает по шаблону новый репозиторий на GitHub, загружает код своих DAGов и добавляет свой репозиторий в основной проект Airflow в качестве сабмодуля. При таком подходе возникали следующие трудности:

  1. Нужно погружение в Python и Apache Airflow;

  2. На момент начала разработки релиз основного проекта происходил раз в неделю, поэтому, чтоб увидеть свои DAGи на проде Airflow, нужно было подождать;

  3. Основной проект постепенно разрастался и начал притормаживать при деплое;

  4. Разбросанным по разным репозиториям кодом DAGов, выполняющих однотипные задачи, сложно управлять;

  5. Отсутствие единого подхода также влияло и на качество SQL-кода процедур. Часто можно было встретить сложную логику по управлению параметрами загрузки, которую легко можно было «перевесить» на Airflow.

Все вышеперечисленное привело нас к мысли, что пора брать ситуацию под свой контроль и заняться разработкой стандартного решения. Анализ существующих DAG-ов показал, что большинство из них очень простые, не содержат сложных зависимостей и состоят, в основном, из DummyOperator-ов и PostgresOperator-ов. Это и послужило отправной точкой для разработки нового инструмента, который, в свою очередь, должен был:

  1. Уметь создавать DAG-и на основе конфигурационного файла в формате YAML, в которым бы были указаны основные параметры, как-то: дата старта, расписание, параметры подключения к БД, названия запускаемых процедур, их параметры и т. д. YAML файлы должны храниться внутри корпоративного сервиса по управлению метаданными, получить их содержимое можно через API;

  2. Быть максимально простым, иметь понятную документацию, чтобы погружение не занимало много времени;

  3. В то же время быть максимально гибким, уметь работать с максимально возможным количеством параметров настройки DAG-а в Airflow.

Что есть

Получился примерно вот следующий шаблон для конфигурационного файла:

96e452d8d71eb0052d1266fad8b70764.png

Из которого создается вот такой DAG:

c10827f7a5597582e50212599742cb0c.png

Описание параметров

Общие параметры:

  • module_name — нужен для формирование DAG_ID;

  • pool — пул, в котором будут запущены задачи;

  • queue — очередь для задач;

  • owner — владелец DAGа;

  • postgres_conn_id — строка подключения к БД;

  • email — список емейлов для рассылки алертов;

  • tags — список тэгов для поиска DAGа в UI;

  • access_control: роль для управление DAGом;

  • schedule_interval — расписание для запуска DAGа;

  • start_date и catchup — параметры, управляющие глубиной истории загрузки. Airflow использует интервальный подход. Это означает, что временной период от start_date и до опциональной end_date (мы не используем) разбивается на интервалы, указанные в schedule_interval. Если catchup True, то запуск DAGА начнется от start_date, если False, то с текущего интервала;

  • schema_name — схема БД, в которой находится витрина;

  • task_list — список задач в DAGе.

Основные параметры задач:

  • task_name — соответствует task_id Airflow

  • task_type — тип задачи

  • task_schema_name — схема БД, в которой находится витрина, если схема отличается от общей

  • task_conn_id — строка подключения, если отличается от общей

  • procedure_name — процедура загрузки витрины

  • params — список параметров процедуры и их значений

  • task_depends_on — список задач, от которых зависит запуск данной задачи

  • priority_weight — приоритет данной задачи по отношению к другим задачам

  • task_concurrency — количество одновременно запущенных экземпляров задачи во всех запущенных экземплярах DAGа

Сейчас существует три типа задач (task_type):

1) Dummy — соответствует DummyOperator. Задача, которая ничего не выполняет и обычно служит начальной и конечной задачей, а также для разделения задач на блоки.

0bddc51f1c41b4c917038b281d47cb72.png

2) Обычная загрузка — соответствует PostgresOperator в Airflow 

207d7a71475b8a2f4ea7312210fa33a0.png

Вот так выглядит SQL-код, который генерит эта задача:

afc43a19806efb87e8b831916b56e56a.png

3) Множественная загрузка — много PostgresOperator (если нужно создать кучу однотипных задач, различающихся по одному параметру) 

d0d926450b1a8fe0f282ef349df72495.png

У этого типа есть свои специфические параметры:

  • task_multiply — может иметь 2 значения: «schema» или «params». Если указано   schema», то значения из task_multiply_list добавляются в выражение SEARCH_PATH.                     Если «params», то значения из task_multiply_list добавляются в список параметров процедуры для параметра из списка params, у которого в значении указано 'task_multiply_list»

  • task_multiply_list — список значений для параметра, по которому будут создаваться однотипные задачи

В результате получается такой SQL-код.

Для «schema»:

d31b02c376cfe89eaec946bbd1bc2070.png

Для «params»:

518ffa529798ff4e5032f0ce2760e327.png

А вот так проставляются зависимости между задачами:

38e114ddedd0d67858a2c5bf829e81d0.png

Куда пойдем

Внедрение инструмента позволило существенно сократить время на разработку DAGов. Глубокое погружение в Apache Airflow больше не нужно, хотя почитать про макросы и расписание все-таки придется. Шаблон конфигурационного файла заполняется минут за 10–15. Время, затрачиваемое на ревью и деплой на прод, тоже сильно сократились. Однако здесь же и кроется основная зона для развития: сейчас ревью и деплой происходят в ручном режиме. Хочется обложить все это тестами и предоставить разработчику возможность самому отправлять свои DAGи на прод.

© Habrahabr.ru