Динамическая генерация DAG в Airflow

Всем привет! Меня зовут Антон, в Ростелекоме я занимаюсь разработкой центрального хранилища данных. Наше хранилище состоит из модулей, в качестве оркестратора которых используются несколько инстансов Informatica, часть из которых мы хотим перевести на Airflow в рамках перехода на open-source решения. Поскольку Informatica и Airflow принципиально разные инструменты, взять и повторить существующую реализацию не так уж и просто. Нам хотелось получить workflow, с одной стороны, максимально похожий на текущую реализацию и, с другой стороны, использующий самый интересный первый принцип Airflow — динамичность, которая даёт гибкость.

В этой небольшой статье я хочу рассказать о по-настоящему динамической генерации ДАГов в Airflow. По этой теме в интернете в основном находится много статей от разработчиков из Индии, представляющих собой материалы вида «в Airflow можно генерировать даги динамически, вот пример: <пример по генерации 10 HelloWorld-тасков/дагов>». Нам же была интересна именно генерация дагов, которые будут изменяться во времени с переменным количеством и названиями тасков.

zlyrskhcw1q8wswrizaqcep2-pa.png

На текущий момент Airflow внедрён для запуска модуля, формирующего пакеты данных на удалённых серверах источников для дальнейшей загрузки в хранилище. Он запускается по простому расписанию, рассматривать его детально не очень интересно. Также в скором времени будет внедрена оркестрация через Airflow модуля, доставляющего пакеты данных для дальнейшей загрузки по слоям в промежуточный стейджинг. Здесь нас поджидает ряд граблей, описания которых я нигде не нашел и хочу поделится опытом.

По Airflow на Хабре есть пара статей от разработчиков из Mail.ru, в которых неплохо описаны базовые вещи:

Общее описание Airflow
Ветвления, параметризация через jinja и коммуникации в рамках ДАГа через Xcom


Небольшой глоссарий:

DAG/ДАГ — направленный ациклический граф. В данном случае имеется в виду последовательность действий, которые зависят друг от друга и не образуют циклов.
SubDAG/Сабдаг — то же, что и ДАГ, но находящийся внутри другого ДАГа, запускающийся в рамках родительского ДАГа (т.е. являющийся таском) и не имеющий отдельного расписания.
Operator/Оператор — конкретный шаг в даге, исполняющий определённое действие. Например, PythonOperator.
Task/Таск — конкретный инстанс оператора при запуске ДАГа, визуализируется в виде квадратика в веб-интерфейсе. Например, PythonOperator, который называется run_task и запускается в ДАГе check_dag.


Идея динамической генерации тасков в даге, проблемы и недостатки

Входные данные:

В репозитории оркестратора есть таблица, назовём её PKG_TABLE.
Есть механизм, который добавляет в таблицу PKG_TABLE записи о том, что пакет данных готов к загрузке.

Что мы хотели:

ДАГ, который будет генерироваться для готовых к загрузке пакетов и запускать их загрузку (спойлер: в итоге всё получилось).

С помощью кода, приведенного ниже, мы генерируем даг, состоящий из таска LatestOnlyOperator и зависящего от него сабдага-таска, который создаётся при запуске функции pkg_subdag_factory, получающей список пакетов из таблицы PKG_TABLE и генерирующей несколько PythonOperator’ов. Если пакетов на загрузку нет — генерируется DummyOperator.

Первую версию решили сделать одним PythonOperator’ом, в дальнейшем переделав на детальный workflow средствами Airflow.

# -*- coding: utf-8 -*-
"""
Основной DAG для запуска доставки
"""

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.hooks.oracle_hook import OracleHook

from datetime import datetime, timedelta
import logging

from scripts.lib import run_load, select_pkg_data

def pkg_subdag_factory(
        oracle_hook, parent_dag_name, child_dag_name, start_date,
        schedule_interval, param_dict):

    """
    функция, возвращающая DAG с переменным количеством PythonOperator\`ов
        (1 пакет - 1 PythonOperator)

    входные параметры:
    oracle_hook - airflow.hooks.oracle_hook.OracleHook
    parent_dag_name - имя "родительского" дага
    child_dag_name - имя создаваемого дага
    start_date - дата начала запуска расписания созданного дага
    schedule_interval - интервал запуска дага для расписания
    param_dict - словарь со входными параметрами
    """

    dag = DAG(
        '%s.%s' % (parent_dag_name, child_dag_name),
        schedule_interval=schedule_interval,
        start_date=start_date,
        catchup=False
        )

    logging.info('selecting pkg data...')
    pkg_set = select_pkg_data(oracle_hook)

    if len(pkg_set):
        logging.info('pkg_set:')
        logging.info(pkg_set)

        for pkg in pkg_set:
            pkg_id = pkg[1]
            pkg_dict = {'pkg_data_' + str(pkg_id): pkg}
            param_dict.update(pkg_dict)
            task_name = 'pkg_' + str(pkg_id)
            PythonOperator(
                task_id=task_name,
                python_callable=run_load,
                op_kwargs={
                    'oracle_hook': oracle_hook,
                    'param_dict': param_dict,
                    'pkg_id': pkg_id
                    },
                retries=0,
                dag=dag
            )

    else:
        logging.info('Undelivered packages not found')
        DummyOperator(task_id='no_packages_dummy', retries=0, dag=dag)

    return dag

interval = '*/10 * * * *'

args = {
    'owner': 'airflow',
    'start_date': datetime(2018, 11, 12)
    }

oracle_hook = OracleHook('ora_meta')

main_dag_name = 'load'
load_dag_name = 'load_packages'

param_dict = {
    # здесь находится длинный словарь с параметрами
    }

main_dag = DAG(
    dag_id=main_dag_name,
    default_args=args,
    schedule_interval=interval,
    catchup=False
    )

subdag = SubDagOperator(
    subdag=pkg_subdag_factory(
        oracle_hook, main_dag_name, load_dag_name,
        args['start_date'], interval, param_dict
        ),
    task_id=load_dag_name,
    dag=main_dag
    )

# создаёт оператор, отраюатывающий только для последнего интервала расписания
latest_only = LatestOnlyOperator(task_id='latest_only', dag=main_dag)

subdag.set_upstream(latest_only)

На следующих скриншотах видно, как это выглядит в результате.
Внешний вид ДАГа:
_xtlsduwcwvecmuwk2qfd2rlpyq.png

Внешний вид cабдага при отсутствии пакетов для доставки:
z7vngyipby78gaksu29i0km1y04.png

Внешний вид cабдага при наличии пакетов для доставки:
-3ig9fawopeyqupws_ya7pp3ve0.png


Проблемы и нюансы


  • Catchup не работал так, как мы ожидали: после включения выключенного дага происходили множественные запуски (не за весь период расписания, но 2–3 одновременно были). Из-за этого пришлось добавить LatestOnlyOperator, чтобы все запуски, кроме последнего, происходили вхолостую.
  • Если создать сабдаг — его нужно явно включить через командную строку командой «airflow unpause <имя_сабдага>», иначе он не запускается, причём делать это нужно при создании каждого нового сабдага (сабдаг с новым именем), из-за чего динамически генерировать будет очень неудобно. Если в конфигурации airflow ($airflow_home/airflow.cfg) установить параметр «dags_are_paused_at_creation»=false, это будет не нужно, но это может привести к неприятным последствиям со случайным автоматическим запуском нового дага — мне кажется, что запускать новые даги надо явно вручную.

Как написано в документации, «A key capability of Airflow is that these DAG Runs are atomic, idempotent items, <...>», что значит: «Подразумевается, что даг генерируется в неизменном виде». Из-за того, что мы нарушили это «key capability», мы узнали некоторые вещи:


  • Пустой даг (без тасков) запускается и не может закончиться, забивая все возможные параллели. Происходило это, если не было пакетов на загрузку в момент запуска дага. Для обхода этого создаётся DummyOperator.
  • Если во время работы таска даг перегенерируется и в обновившемся даге этого таска уже не будет — он остановится с прерыванием запущенного процесса. А происходит это при каждом такте шедулера, но не чаще, чем указано в параметре min_file_process_interval в конфигурации airflow ($airflow_home/airflow.cfg). Для обхода этого мы сделали генерацию тасков по пакетам не только по статусу «готов к загрузке», но и по статусу «загрузка в процессе», чтобы он продолжал генерироваться, пока загрузка идет.
  • Если в текущей версии дага нет какого-то таска, который был раньше — например, был таск с именем «pkg_123», который был прогружен раньше и он не создается в текущей версии дага, в веб-интерфейсе нельзя увидеть статистику по этому таску. Хотя в базе airflow вся информация сохраняетcя и на её основе можно построить внешними средствами красивый дашборд по старым запускам. Когда возникнет вопрос про частоту обновления ДАГов и возможность это отключить, про это можно почитать здесь.
  • Из-за динамической генерации task_id приходится в каждый такой таск прокидывать словарь с данными по всем текущим пакетам, а также id текущего пакета, чтобы при работе самой функции выбирать из этого же словаря нужные данные по id пакета. Иначе все таски запускались за один и тот же пакет.


Execution_date в логах и фактическое время запуска

Закончу еще одним нюансом Airflow, который поначалу путает и не описан простыми словами в других статьях — execution_date (которое отображается во всех логах, в интерфейсе и т.д.) и фактическое время запуска. В принципе описание есть в документации airflow и FAQ, но результат неочевиден, поэтому мне кажется, что требуется пояснение.

Документация: «Шедулер запускает ваш джоб в конце периода»
Результат: Если создать даг с расписанием, например, @daily, то запуск с execution_date »2018–01–01 00:00:00» фактически будет запущен »2018–02–01 00:00:00».


Полезные ссылки:

Документация по catchup
Документация по LatestOnlyOperator
Ещё документация по LatestOnlyOperator
Пример использования LatestOnlyOperator
Некоторые нюансы
Вопрос про зависимости от предыдущего запуска
Небольшой пример про динамическую генерацию
Вопрос про динамическую генерацию с небольшим описанием

© Habrahabr.ru