Airflow Workshop: сложные DAG’и без костылей

2ixc2gvhhqkngqgdyplnxmkuq-g.jpeg


Привет, Хабр! Меня зовут Дина, и я занимаюсь разработкой игрового хранилища данных для решения задач аналитики в Mail.Ru Group. Наша команда для разработки batch-процессов обработки данных использует Apache Airflow (далее Airflow), об этом yuryemeliyanov писал в недавней статье. Airflow — это opensource-библиотека для разработки ETL/ELT-процессов. Отдельные задачи объединяются в периодически выполняемые цепочки задач — даги (DAG — Directed Acyclic Graph).


Как правило, 80% проекта на Airflow — это стандартные DAG«и. В моей статье речь пойдёт об оставшихся 20%, которые требуют сложных ветвлений, коммуникации между задачами — словом, о DAG«ах, нуждающихся в нетривиальных алгоритмах.


Управление потоком


Условие перехода


Представьте, что перед нами стоит задача ежедневно забирать данные с нескольких шардов. Мы параллельно записываем их в стейджинговую область, а потом строим на них целевую таблицу в хранилище. Если в процессе работы по какой-то причине произошла ошибка — например, часть шардов оказалась недоступна, — DAG будет выглядеть так:


dsz_drkcnx1rbrjnqlomoym_c2w.png


Для того чтобы перейти к выполнению следующей задачи, нужно обработать ошибки в предшествующих. За это отвечает один из параметров оператора — trigger_rule. Его значение по умолчанию — all_success — говорит о том, что задача запустится тогда и только тогда, когда успешно завершены все предыдущие.


Также trigger_rule может принимать следующие значения:


  • all_failed — если все предыдущие задачи закончились неуспешно;
  • all_done — если все предыдущие задачи завершились, неважно, успешно или нет;
  • one_failed — если любая из предыдущих задач упала, завершения остальных не требуется;
  • one_success — если любая из предыдущих задач закончилась успешно, завершения остальных не требуется.


Ветвление


Для реализации логики if-then-else можно использовать оператор ветвления BranchPythonOperator. Вызываемая функция должна реализовывать алгоритм выбора задачи, который запустится следующим. Можно ничего не возвращать, тогда все последующие задачи будут помечены как не нуждающиеся в исполнении.


В нашем примере выяснилось, что недоступность шардов связана с периодическим отключением игровых серверов, соответственно, при их отключении никаких данных за нужный нам период мы не теряем. Правда, и витрины нужно строить с учётом количества включённых серверов.


Вот как выглядит этот же DAG со связкой из двух задач с параметром trigger_rule, принимающим значения one_success (хотя бы одна из предыдущих задач успешна) и all_done (все предыдущие задачи завершились), и оператором ветвления select_next_task вместо единого PythonOperator«а.


# Запускается, когда все предыдущие задачи завершены
all_done = DummyOperator(task_id='all_done', trigger_rule='all_done', dag=dag)
# Запускается, как только любая из предыдущих задач успешно отработал
one_success = DummyOperator(task_id='one_success', trigger_rule='one_success', dag=dag)

# Возвращает название одной из трёх последующих задач
def select_next_task():
   success_shard_count = get_success_shard_count()
   if success_shard_count == 0:
       return 'no_data_action'
   elif success_shard_count == 6:
       return 'all_shards_action'
   else:
       return 'several_shards_action'

select_next_task = BranchPythonOperator(task_id='select_next_task',
                                        python_callable=select_next_task,
                                        dag=dag)


t62eutcqg6z7muorh_k_zwpve_y.png


Документация по параметру оператора trigger_rule
Документация по оператору BranchPythonOperator


Макросы Airflow


Операторы Airflow также поддерживают рендеринг передаваемых параметров с помощью Jinja. Это мощный шаблонизатор, подробно о нём можно почитать в документации, я же расскажу только о тех его аспектах, которые мы применяем в работе с Airflow.


Шаблонизатор обрабатывает:


  • строковые параметры оператора, указанные в кортеже template_field;
  • файлы, переданные в параметрах оператора, с расширением, указанным в template_ext;
  • любые строки, обработанные функцией task.render_template сущности task, переданной через контекст. Пример функции PythonOperator«а с переданным контекстом (provide_context=True):


def index_finder(conn_id, task, **kwargs):
   sql = "SELECT MAX(idtransaction) FROM {{ params.billing }}"
   max_id_sql = task.render_template("", sql, kwargs)
   ...


Вот как мы применяем Jinja в Airflow:


  1. Конечно же, это работа с датами. {{ ds }}, {{ yesterday_ds }}, {{ tomorrow_ds }} — после препроцессинга эти шаблоны заменяются датой запуска, днём до него и следующим днём в формате YYYY-MM-DD. То же самое, но только цифры, без дефисов: {{ ds_nodash }}, {{ yesterday_ds_nodash }}, {{ tomorrow_ds_nodash }}
  2. Использование встроенных функций. Например, {{ macros.ds_add (ds, -5) }} — это способ отнять или добавить несколько дней; {{ macros.ds_format (ds,»%Y-%m-%d»,»%Y») }} — форматирование даты.
  3. Передача параметров. Они передаются в виде словаря в аргументе params, а получаются так: {{ params.name_of_our_param }}
  4. Использование пользовательских функций, точно так же переданных в параметрах. {{ params.some_func (ds) }}
  5. Использование встроенных библиотек Python:
    {{ (macros.dateutil.relativedelta.relativedelta (day=1, months=-params.retention_shift)).strftime (»%Y-%m-%d») }}
  6. Использование конструкции if-else:
    {{ dag_run.conf[«message»] if dag_run else » }}
  7. Организация циклов:
    {% for idx in range (params.days_to_load,-1,-1) %}
    {{ macros.ds_add (ds, -idx) }}
    {% endfor %}


Приведу несколько примеров рендеринга параметров в интерфейсе Airflow. В первом мы удаляем записи старше количества дней, передаваемого параметром cut_days. Так выглядит sql c использованием шаблонов jinja в Airflow:


lqet_z0mqqilu_dst05yefzx4dw.png


В обработанном sql вместо выражения уже подставляется конкретная дата:


k9_ghe-h9vmjv5__psisqdsbi44.png


Второй пример посложнее. В нём используется преобразование даты в unixtime для упрощения фильтрации данных на источнике. Конструкция »{:.0f}» используется, чтобы избавиться от вывода знаков после запятой:


6xxvtuep2lxxgnhsm9xytnpo7gu.png


Jinja заменяет выражения между двойными фигурными скобками на unixtime, соответствующий дате исполнения DAG«а и следующей за ней дате:


n_lokfakckmcm6uwtcsj8wsawxe.png


Ну и в последнем примере мы используем функцию truncshift, переданную в виде параметра:


weg-rhc5agqu6lrkzgqsuxpmrs8.png


Вместо этого выражения шаблонизатор подставляет результат работы функции:


ep2glm1kfmmuyyc2ijnefwrcftg.png


Документация по шаблонизатору jinja


Коммуникация между задачами


В одном из наших источников интересная система хранения логов. Каждые пять дней источник создаёт новую таблицу такого вида: squads_02122017. В её названии присутствует дата, поэтому возник вопрос, как именно её высчитывать. Какое-то время мы использовали таблицы с названиями из всех пяти дней. Четыре запроса падали, но trigger_rule=«one_success» спасал нас (как раз тот случай, когда выполнение всех пяти задач необязательно).


Спустя какое-то время мы стали использовать вместо trigger_rule встроенную в Airflow технологию для обмена сообщениями между задачами в одном DAG«е — XCom (сокращение от cross-communication). XCom«ы определяются парой ключ-значение и названием задачи, из которой его отправили.


2ipfx04lolsphz7oaj7ftu3vnmw.png


XCom создаётся в PythonOperator«е на основании возвращаемого им значения. Можно создать XCom вручную с помощью функции xcom_push. После выполнения задачи значение сохраняется в контексте, и любая последующая задача может принять XCom функцией xcom_pull в другом PythonOperator«е или из шаблона jinja внутри любой предобработанной строки.


Вот как выглядит получение названия таблицы сейчас:


def get_table_from_mysql(**kwargs):
  """
  Выбирает существующую из пяти таблиц и пушит значение
  """
  hook = MySqlHook(conn_name)
  cursor = hook.get_conn().cursor()
  cursor.execute(kwargs['templates_dict']['sql'])
  table_name = cursor.fetchall()
  # Посылаем XCom с названием ‘table_name’
  kwargs['ti'].xcom_push(key='table_name', value=table_name[0][1])
  # Второй вариант отправления XCom’а:
  # return table_name[0][1]
  # Можно получить по названию задачи-отправителя без ключа

# Запрос, вынимающий из метаданных PostgreSQL название нужной таблицы
select_table_from_mysql_sql = '''
SELECT table_name
  FROM information_schema.TABLES
 WHERE table_schema = 'jungle_logs'
   AND table_name IN
    ('squads_{{ macros.ds_format(ds, "%Y-%m-%d", "%d%m%Y") }}',
     'squads_{{ macros.ds_format( macros.ds_add(ds, -1), "%Y-%m-%d", "%d%m%Y") }}',
     'squads_{{ macros.ds_format( macros.ds_add(ds, -2), "%Y-%m-%d", "%d%m%Y") }}',
     'squads_{{ macros.ds_format( macros.ds_add(ds, -3), "%Y-%m-%d", "%d%m%Y") }}',
     'squads_{{ macros.ds_format( macros.ds_add(ds, -4), "%Y-%m-%d", "%d%m%Y") }}')
'''

select_table_from_mysql = PythonOperator(
   task_id='select_table_from_mysql',
   python_callable=get_table_from_mysql,
   provide_context=True,
   templates_dict={'sql': select_table_from_mysql_sql},
   dag=dag
)

# Получаем XCom из задачи 'select_table_from_mysql' по ключу 'table_name'
sensor_jh_squad_sql = '''
SELECT 1
  FROM jungle_logs.{{ task_instance.xcom_pull(task_ids='select_table_from_mysql',
                                              key='table_name') }}
 LIMIT 1
'''


Ещё один пример использования технологии XCom — рассылка email-уведомлений с текстом, отправленным из PythonOperator«а:


kwargs['ti'].xcom_push(key='mail_body', value=mail_body)


А вот получение текста письма внутри оператора EmailOperator:


email_notification_lost_keys = EmailOperator(
   task_id='email_notification_lost_keys',
   to=alert_mails,
   subject='[airflow] Lost keys',
   html_content='''{{ task_instance.xcom_pull(task_ids='find_lost_keys',
                                              key='mail_body') }}''',
   dag=dag
)


Документация по технологии XCom


Заключение


Я рассказала о способах ветвления, коммуникации между задачами и шаблонах подстановки. С помощью встроенных механизмов Airflow можно решать самые разные задачи, не отходя от общей концепции реализации DAG«ов. На этом интересные нюансы Airflow не заканчиваются. У нас с коллегами есть идеи для следующих статей на эту тему. Если вас заинтересовал этот инструмент, пишите, о чём именно вам хотелось бы прочитать в следующий раз.

© Habrahabr.ru