Кастомные email-оповещения в Apache Airflow
Пролог
Как ИИ представляет себе «Этакое желание»
Каждый разработчик сталкивался, или непременно столкнется, с ситуацией, когда в бизнес-требованиях видишь «этакое желание».
Этакое желание (каламбурное определение) — достичь чего-то невозможного или близкого к невозможному с помощью программирования.
Дорогой читатель, наверное, задаст риторический вопрос:
— Как это?! Разве чего-то нельзя достичь с помощью программирования?!
Вопрос, конечно, к месту, и ответ в большинстве случаев очевиден:
— Нет ничего невозможного… главное грамотно спроектировать техническое решение.
Но сейчас немного не об этом, а о целесообразности расхода ресурсов: трудозатрат, количество привлеченных специалистов и т.п.
Стоит ли овчинка выделки?
Увидев «этакое желание» хочется сразу пресечь его на корню и больше не возвращаться к обсуждению безумной идеи.
Но что если все-таки дать шанс и извлечь из этого выгоду в будущем?
И тут без ИИ не обошлось
Конкретный случай
Спойлер: стоит отметить, что дальше не будет rocket science и метапрограммирования, а будет описан подход к решению задачи.
Мне, как разработчику команды «Разработка и автоматизация загрузок данных» Газпромбанка, посчастливилось встретить в одном из технических заданий «этакое желание» — реализовать в загрузочном DAG кастомизированные email-оповещения.
Они должны собирать необходимую информацию на всех стадиях пайплайна, то есть в каждой из задач DAG’a, а в зависимости от успешного выполнения или при отклонении алгоритма, формировать тело email-сообщения. Например: в задаче происходит ошибка валидации данных и в этом случае должна сообщаться конкретная причина падения DAG. Информация должна быть полезной и понятной для бизнеса.
Для загрузок в нашей кодовой базе уже был реализован инструмент оповещения,
но оповещения выполнялись только при удачном сценарии отработки DAG, например,
с количеством обработанных файлов и записанных строк в базу данных.
Формат был стандартным и не конфигурировался.
Первое впечатление от увиденного требования у меня было крайне скептическое,
но на одной из ежедневных встреч с командой, обсудив все за и против, было принято решение реализовать данную задачу. Один из основных аргументов — создание универсального инструмента нотификации, который можно будет переиспользовать в других наших DAG«ах. Это переиспользование даст возможность получить другим Заказчикам более четкое представление о загрузках.
Чем подробнее алерты, тем меньше вопросов к нам.
Ближе к коду
Собрав команду на встрече «побрейнштормить» задачку, мы наметили итеративный подход к реализации: в первой итерации — основной механизм, во второй — доработка и исправление багов.
Умный и находчивый читатель, наверное, сразу догадался, что можно использовать встроенные в библиотеку Airflow Callbacks. Но это не наш случай, так как нужно обрабатывать ошибки по месту, конкретные случаи. Наиболее предпочтительный вариант — использовать отдельный Task Airflow (далее dag_email_notification
) с определенным TriggerRule = NONE_SKIPPED, так как необходимо обработать все задачи DAG’а со статусами success и failed.
С обработчиком определились, а как передавать информацию между задачами? — XCom, скажете вы, и будете правы. Но что и как передавать? Хотелось бы использовать лаконичный и понятный для эксплуатации объект, который сам бы отправился в XCom.
Мы решили, что для этого хорошо подходит — dataclass (далее EmailNotification
) с несколькими методами. Dataclass хороший вариант, так как в XCom следует отправлять сериализуемые в JSON данные.
Рассмотрим, как выглядит наш объект:
@dataclass
class EmailNotification:
task_description: str | None = None
lines: list[dict[str, t.Any] | str | BaseException] = field(default_factory=list)
XCOM_KEY: str = "email_notification"
@classmethod
def set_task_description(cls, task_description: str) -> None:
"""
Добавление описания таски в тело сообщения.
:param task_description: Описание таски - дополнительно
раскрывает смысл/задачу.
:return: None
"""
instance = cls.get_from_xcom()
instance.task_description = task_description
cls.push_into_xcom(instance)
@classmethod
def add_line(
cls,
message: str | BaseException | dict[str, t.Any],
) -> None:
"""
Добавление дополнительной строки в инстанс EmailNotification
и пуш в XCom.
:param message: Содержание строки с сообщением
:return: None
"""
instance = cls.get_from_xcom()
instance.lines.append(message)
cls.push_into_xcom(instance)
@classmethod
def push_into_xcom(cls, instance: EmailNotification) -> None:
# util,который из context возвращает текущий TaskInstance
ti: TaskInstance = get_current_ti()
ti.xcom_push(key=cls.XCOM_KEY, value=instance)
@classmethod
def get_from_xcom(cls) -> EmailNotification:
"""
Возвращает инстанс EmailNotification из XCOM или создает новый.
:return: EmailNotification
"""
# util, который из context возвращает текущий TaskInstance
ti: TaskInstance = get_current_ti()
instance = ti.xcom_pull(key=cls.XCOM_KEY, task_ids=ti.task_id)
return instance or cls()
Итак, на борту имеется — основной метод add_line
, который будет добавлять необходимую строку в тело сообщения, как мы и хотели. Classmethod выбран не случайным образом, а для лаконичного использования, исключается необходимость дополнительно создавать инстанс, инициализируя класс.
Основная работа скрыта в методе get_from_xcom
: при добавлении строки в тело сообщения инстанс класса будет создан в XCom, а если объект уже есть, то добавится строка в существующий инстанс.
Как это выглядит на практике:
@task
def first_task() -> None:
EmailNotification.set_task_description("Описание для task 1")
EmailNotification.add_line("Первая строка в task 1")
EmailNotification.add_line("Вторая строка в task 1")
По результатам выполнения задачи first_task
, в XCom получаем следующий объект:
EmailNotification(
task_description="Описание для task 1",
lines=["Первая строка в task 1", "Вторая строка в task 1"],
XCOM_KEY="email_notification"
)
Остается только получить все эти объекты из XCom задач со статусами success/failed с помощью обработчика dag_email_notification
, собрать все это в одно тело сообщения, разделяя на блоки для наглядности, и разослать всем заинтересованным лицам.
Ну мёд, медятина!
Вот как выглядит наше сообщение:
Конечно, для красоты добавлены всякие «рюшечки» и «бантики», например, если статус failed, фон блока задачи в HTML-разметке меняется на красный.
Автоматически определяются адресаты и тема сообщения в зависимости от статуса выполнения DAG.
Параметр message
в методе add_line
неспроста может быть: строкой, exception или словарем — обработчик dag_email_notification
учитывает это и, в зависимости от типа данных, по-разному формируется содержимое блока с описанием процесса выполнения задачи в теле сообщения.
И что, всё?
Учим ИИ шутить
Как сказал Тимлид:
«Попробуй протестить с Dynamic Task Mapping (далее Mapped Tasks), там могут быть нюансы…».
И действительно, нюансы есть…
Первое, что идет не по плану — в EmailNotification.get_from_xcom
после успешного выполнения первой Mapped Task остальные падают, и ничего непонятно.
В ходе деббагинга выявился фигурант дела — LazyXComAccess.
Кратко говоря, это последовательность XCom«ов с Mapped Tasks.
В get_from_xcom
, чтобы получить инстанс класса из XCom, одного параметра task_id
становится мало — у всех Mapped Tasks один и тот же task_id
.
К счастью, XCom’ы внутри LazyXComAccess хранятся с использованием индексов.
Чтобы решить вопрос, добавим map_indexes=ti.map_index
в вызов xcom_pull
.
Также теперь стоит учитывать индексы при получении данных из XCom в обработчике dag_email_notification
.
С Mapped Tasks сообщение может выглядеть так:
Напоследок
Хотелось бы показать больше кода, особенно обработчик dag_email_notification
,
но цель статьи — не перегружать вас техническими деталями, а главное — продемонстрировать, как можно совместно с командой предоставить бизнесу эффективный инструмент для мониторинга и анализа процессов.