Динамическая генерация DAG в Airflow
Всем привет! Меня зовут Антон, в Ростелекоме я занимаюсь разработкой центрального хранилища данных. Наше хранилище состоит из модулей, в качестве оркестратора которых используются несколько инстансов Informatica, часть из которых мы хотим перевести на Airflow в рамках перехода на open-source решения. Поскольку Informatica и Airflow принципиально разные инструменты, взять и повторить существующую реализацию не так уж и просто. Нам хотелось получить workflow, с одной стороны, максимально похожий на текущую реализацию и, с другой стороны, использующий самый интересный первый принцип Airflow — динамичность, которая даёт гибкость.
В этой небольшой статье я хочу рассказать о по-настоящему динамической генерации ДАГов в Airflow. По этой теме в интернете в основном находится много статей от разработчиков из Индии, представляющих собой материалы вида «в Airflow можно генерировать даги динамически, вот пример: <пример по генерации 10 HelloWorld-тасков/дагов>». Нам же была интересна именно генерация дагов, которые будут изменяться во времени с переменным количеством и названиями тасков.
На текущий момент Airflow внедрён для запуска модуля, формирующего пакеты данных на удалённых серверах источников для дальнейшей загрузки в хранилище. Он запускается по простому расписанию, рассматривать его детально не очень интересно. Также в скором времени будет внедрена оркестрация через Airflow модуля, доставляющего пакеты данных для дальнейшей загрузки по слоям в промежуточный стейджинг. Здесь нас поджидает ряд граблей, описания которых я нигде не нашел и хочу поделится опытом.
По Airflow на Хабре есть пара статей от разработчиков из Mail.ru, в которых неплохо описаны базовые вещи:
Общее описание Airflow
Ветвления, параметризация через jinja и коммуникации в рамках ДАГа через Xcom
Небольшой глоссарий:
DAG/ДАГ — направленный ациклический граф. В данном случае имеется в виду последовательность действий, которые зависят друг от друга и не образуют циклов.
SubDAG/Сабдаг — то же, что и ДАГ, но находящийся внутри другого ДАГа, запускающийся в рамках родительского ДАГа (т.е. являющийся таском) и не имеющий отдельного расписания.
Operator/Оператор — конкретный шаг в даге, исполняющий определённое действие. Например, PythonOperator.
Task/Таск — конкретный инстанс оператора при запуске ДАГа, визуализируется в виде квадратика в веб-интерфейсе. Например, PythonOperator, который называется run_task и запускается в ДАГе check_dag.
Идея динамической генерации тасков в даге, проблемы и недостатки
Входные данные:
В репозитории оркестратора есть таблица, назовём её PKG_TABLE.
Есть механизм, который добавляет в таблицу PKG_TABLE записи о том, что пакет данных готов к загрузке.
Что мы хотели:
ДАГ, который будет генерироваться для готовых к загрузке пакетов и запускать их загрузку (спойлер: в итоге всё получилось).
С помощью кода, приведенного ниже, мы генерируем даг, состоящий из таска LatestOnlyOperator и зависящего от него сабдага-таска, который создаётся при запуске функции pkg_subdag_factory, получающей список пакетов из таблицы PKG_TABLE и генерирующей несколько PythonOperator’ов. Если пакетов на загрузку нет — генерируется DummyOperator.
Первую версию решили сделать одним PythonOperator’ом, в дальнейшем переделав на детальный workflow средствами Airflow.
# -*- coding: utf-8 -*-
"""
Основной DAG для запуска доставки
"""
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.hooks.oracle_hook import OracleHook
from datetime import datetime, timedelta
import logging
from scripts.lib import run_load, select_pkg_data
def pkg_subdag_factory(
oracle_hook, parent_dag_name, child_dag_name, start_date,
schedule_interval, param_dict):
"""
функция, возвращающая DAG с переменным количеством PythonOperator\`ов
(1 пакет - 1 PythonOperator)
входные параметры:
oracle_hook - airflow.hooks.oracle_hook.OracleHook
parent_dag_name - имя "родительского" дага
child_dag_name - имя создаваемого дага
start_date - дата начала запуска расписания созданного дага
schedule_interval - интервал запуска дага для расписания
param_dict - словарь со входными параметрами
"""
dag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name),
schedule_interval=schedule_interval,
start_date=start_date,
catchup=False
)
logging.info('selecting pkg data...')
pkg_set = select_pkg_data(oracle_hook)
if len(pkg_set):
logging.info('pkg_set:')
logging.info(pkg_set)
for pkg in pkg_set:
pkg_id = pkg[1]
pkg_dict = {'pkg_data_' + str(pkg_id): pkg}
param_dict.update(pkg_dict)
task_name = 'pkg_' + str(pkg_id)
PythonOperator(
task_id=task_name,
python_callable=run_load,
op_kwargs={
'oracle_hook': oracle_hook,
'param_dict': param_dict,
'pkg_id': pkg_id
},
retries=0,
dag=dag
)
else:
logging.info('Undelivered packages not found')
DummyOperator(task_id='no_packages_dummy', retries=0, dag=dag)
return dag
interval = '*/10 * * * *'
args = {
'owner': 'airflow',
'start_date': datetime(2018, 11, 12)
}
oracle_hook = OracleHook('ora_meta')
main_dag_name = 'load'
load_dag_name = 'load_packages'
param_dict = {
# здесь находится длинный словарь с параметрами
}
main_dag = DAG(
dag_id=main_dag_name,
default_args=args,
schedule_interval=interval,
catchup=False
)
subdag = SubDagOperator(
subdag=pkg_subdag_factory(
oracle_hook, main_dag_name, load_dag_name,
args['start_date'], interval, param_dict
),
task_id=load_dag_name,
dag=main_dag
)
# создаёт оператор, отраюатывающий только для последнего интервала расписания
latest_only = LatestOnlyOperator(task_id='latest_only', dag=main_dag)
subdag.set_upstream(latest_only)
На следующих скриншотах видно, как это выглядит в результате.
Внешний вид ДАГа:
Внешний вид cабдага при отсутствии пакетов для доставки:
Внешний вид cабдага при наличии пакетов для доставки:
Проблемы и нюансы
- Catchup не работал так, как мы ожидали: после включения выключенного дага происходили множественные запуски (не за весь период расписания, но 2–3 одновременно были). Из-за этого пришлось добавить LatestOnlyOperator, чтобы все запуски, кроме последнего, происходили вхолостую.
- Если создать сабдаг — его нужно явно включить через командную строку командой «airflow unpause <имя_сабдага>», иначе он не запускается, причём делать это нужно при создании каждого нового сабдага (сабдаг с новым именем), из-за чего динамически генерировать будет очень неудобно. Если в конфигурации airflow ($airflow_home/airflow.cfg) установить параметр «dags_are_paused_at_creation»=false, это будет не нужно, но это может привести к неприятным последствиям со случайным автоматическим запуском нового дага — мне кажется, что запускать новые даги надо явно вручную.
Как написано в документации, «A key capability of Airflow is that these DAG Runs are atomic, idempotent items, <...>», что значит: «Подразумевается, что даг генерируется в неизменном виде». Из-за того, что мы нарушили это «key capability», мы узнали некоторые вещи:
- Пустой даг (без тасков) запускается и не может закончиться, забивая все возможные параллели. Происходило это, если не было пакетов на загрузку в момент запуска дага. Для обхода этого создаётся DummyOperator.
- Если во время работы таска даг перегенерируется и в обновившемся даге этого таска уже не будет — он остановится с прерыванием запущенного процесса. А происходит это при каждом такте шедулера, но не чаще, чем указано в параметре min_file_process_interval в конфигурации airflow ($airflow_home/airflow.cfg). Для обхода этого мы сделали генерацию тасков по пакетам не только по статусу «готов к загрузке», но и по статусу «загрузка в процессе», чтобы он продолжал генерироваться, пока загрузка идет.
- Если в текущей версии дага нет какого-то таска, который был раньше — например, был таск с именем «pkg_123», который был прогружен раньше и он не создается в текущей версии дага, в веб-интерфейсе нельзя увидеть статистику по этому таску. Хотя в базе airflow вся информация сохраняетcя и на её основе можно построить внешними средствами красивый дашборд по старым запускам. Когда возникнет вопрос про частоту обновления ДАГов и возможность это отключить, про это можно почитать здесь.
- Из-за динамической генерации task_id приходится в каждый такой таск прокидывать словарь с данными по всем текущим пакетам, а также id текущего пакета, чтобы при работе самой функции выбирать из этого же словаря нужные данные по id пакета. Иначе все таски запускались за один и тот же пакет.
Execution_date в логах и фактическое время запуска
Закончу еще одним нюансом Airflow, который поначалу путает и не описан простыми словами в других статьях — execution_date (которое отображается во всех логах, в интерфейсе и т.д.) и фактическое время запуска. В принципе описание есть в документации airflow и FAQ, но результат неочевиден, поэтому мне кажется, что требуется пояснение.
Документация: «Шедулер запускает ваш джоб в конце периода»
Результат: Если создать даг с расписанием, например, @daily, то запуск с execution_date »2018–01–01 00:00:00» фактически будет запущен »2018–02–01 00:00:00».
Полезные ссылки:
Документация по catchup
Документация по LatestOnlyOperator
Ещё документация по LatestOnlyOperator
Пример использования LatestOnlyOperator
Некоторые нюансы
Вопрос про зависимости от предыдущего запуска
Небольшой пример про динамическую генерацию
Вопрос про динамическую генерацию с небольшим описанием