Кастомные email-оповещения в Apache Airflow

Пролог

 Как ИИ представляет себе

 Как ИИ представляет себе «Этакое желание»

Каждый разработчик сталкивался, или непременно столкнется, с ситуацией, когда в бизнес-требованиях видишь «этакое желание».

Этакое желание (каламбурное определение) — достичь чего-то невозможного или близкого к невозможному с помощью программирования.

Дорогой читатель, наверное, задаст риторический вопрос:
— Как это?! Разве чего-то нельзя достичь с помощью программирования?!
Вопрос, конечно, к месту, и ответ в большинстве случаев очевиден:
— Нет ничего невозможного… главное грамотно спроектировать техническое решение.
Но сейчас немного не об этом, а о целесообразности расхода ресурсов: трудозатрат, количество привлеченных специалистов и т.п.

Стоит ли овчинка выделки?

Увидев «этакое желание» хочется сразу пресечь его на корню и больше не возвращаться к обсуждению безумной идеи.
Но что если все-таки дать шанс и извлечь из этого выгоду в будущем?

И тут без ИИ не обошлось

И тут без ИИ не обошлось

Конкретный случай

Спойлер: стоит отметить, что дальше не будет rocket science и метапрограммирования, а будет описан подход к решению задачи.

Мне, как разработчику команды «Разработка и автоматизация загрузок данных» Газпромбанка, посчастливилось встретить в одном из технических заданий «этакое желание» — реализовать в загрузочном DAG кастомизированные email-оповещения.
Они должны собирать необходимую информацию на всех стадиях пайплайна, то есть в каждой из задач DAG’a, а в зависимости от успешного выполнения или при отклонении алгоритма, формировать тело email-сообщения. Например: в задаче происходит ошибка валидации данных и в этом случае должна сообщаться конкретная причина падения DAG. Информация должна быть полезной и понятной для бизнеса.

Для загрузок в нашей кодовой базе уже был реализован инструмент оповещения,
но оповещения выполнялись только при удачном сценарии отработки DAG, например,
с количеством обработанных файлов и записанных строк в базу данных.
Формат был стандартным и не конфигурировался.

Первое впечатление от увиденного требования у меня было крайне скептическое,
но на одной из ежедневных встреч с командой, обсудив все за и против, было принято решение реализовать данную задачу. Один из основных аргументов — создание универсального инструмента нотификации, который можно будет переиспользовать в других наших DAG«ах. Это переиспользование даст возможность получить другим Заказчикам более четкое представление о загрузках.
Чем подробнее алерты, тем меньше вопросов к нам.

dbaaa67d37a222fce063cc66757352de.png

Ближе к коду

Собрав команду на встрече «побрейнштормить» задачку, мы наметили итеративный подход к реализации: в первой итерации — основной механизм, во второй — доработка и исправление багов.

Умный и находчивый читатель, наверное, сразу догадался, что можно использовать встроенные в библиотеку Airflow Callbacks. Но это не наш случай, так как нужно обрабатывать ошибки по месту, конкретные случаи. Наиболее предпочтительный вариант — использовать  отдельный Task Airflow (далее dag_email_notification) с определенным TriggerRule = NONE_SKIPPED, так как необходимо обработать все задачи DAG’а со статусами success и failed.

С обработчиком определились, а как передавать информацию между задачами? — XCom, скажете вы, и будете правы. Но что и как передавать? Хотелось бы использовать лаконичный и понятный для эксплуатации объект, который сам бы отправился в XCom.
Мы решили, что для этого хорошо подходит — dataclass (далее EmailNotification) с несколькими методами. Dataclass хороший вариант, так как в XCom следует отправлять сериализуемые в JSON данные.

Рассмотрим, как выглядит наш объект:

@dataclass
class EmailNotification:
    task_description: str | None = None
    lines: list[dict[str, t.Any] | str | BaseException] = field(default_factory=list)
 
    XCOM_KEY: str = "email_notification"


    @classmethod
    def set_task_description(cls, task_description: str) -> None:
        """
        Добавление описания таски в тело сообщения. 
 
        :param task_description: Описание таски - дополнительно 
         раскрывает смысл/задачу.
 
        :return: None
 
        """
        instance = cls.get_from_xcom()
 
        instance.task_description = task_description
 
        cls.push_into_xcom(instance)


    @classmethod
    def add_line(
        cls,
        message: str | BaseException | dict[str, t.Any],
    ) -> None:
        """
        Добавление дополнительной строки в инстанс EmailNotification 
        и пуш в XCom.
 
        :param message: Содержание строки с сообщением
        :return: None
 
        """
        instance = cls.get_from_xcom()
 
        instance.lines.append(message)
 
        cls.push_into_xcom(instance)


    @classmethod
    def push_into_xcom(cls, instance: EmailNotification) -> None:
        # util,который из context возвращает текущий TaskInstance
        ti: TaskInstance = get_current_ti()
 
        ti.xcom_push(key=cls.XCOM_KEY, value=instance)

    
    @classmethod
    def get_from_xcom(cls) -> EmailNotification:
        """
        Возвращает инстанс EmailNotification из XCOM или создает новый.
 
        :return: EmailNotification
 
        """
        # util, который из context возвращает текущий TaskInstance
        ti: TaskInstance = get_current_ti()
 
        instance = ti.xcom_pull(key=cls.XCOM_KEY, task_ids=ti.task_id)
 
        return instance or cls()

Итак, на борту имеется — основной метод add_line, который будет добавлять необходимую строку в тело сообщения, как мы и хотели. Classmethod выбран не случайным образом, а для лаконичного использования, исключается необходимость дополнительно создавать инстанс, инициализируя класс.
Основная работа скрыта в методе get_from_xcom: при добавлении строки в тело сообщения инстанс класса будет создан в XCom, а если объект уже есть, то добавится строка в существующий инстанс.

Как это выглядит на практике:

@task
def first_task() -> None:
    EmailNotification.set_task_description("Описание для task 1")
 
    EmailNotification.add_line("Первая строка в task 1")
 
    EmailNotification.add_line("Вторая строка в task 1")

По результатам выполнения задачи first_task, в XCom получаем следующий объект:

EmailNotification(
    task_description="Описание для task 1",
    lines=["Первая строка в task 1", "Вторая строка в task 1"],
    XCOM_KEY="email_notification"
)

Остается только получить все эти объекты из XCom задач со статусами success/failed с помощью обработчика dag_email_notification, собрать все это в одно тело сообщения, разделяя на блоки для наглядности, и разослать всем заинтересованным лицам.

Ну мёд, медятина!

Вот как выглядит наше сообщение:

d25325f946cf331421e084d077c6ab8f.png

Конечно, для красоты добавлены всякие «рюшечки» и «бантики», например, если статус failed, фон блока задачи в HTML-разметке меняется на красный.
Автоматически определяются адресаты и тема сообщения в зависимости от статуса выполнения DAG.
Параметр message в методе add_line неспроста может быть: строкой, exception или словарем — обработчик dag_email_notification учитывает это и, в зависимости от типа данных, по-разному формируется содержимое блока с описанием процесса выполнения задачи в теле сообщения.

И что, всё?

 Учим ИИ шутить

 Учим ИИ шутить

Как сказал Тимлид:

«Попробуй протестить с Dynamic Task Mapping (далее Mapped Tasks), там могут быть нюансы…».

И действительно, нюансы есть…

Первое, что идет не по плану — в EmailNotification.get_from_xcom после успешного выполнения первой Mapped Task остальные падают, и ничего непонятно.
В ходе деббагинга выявился фигурант дела — LazyXComAccess.

Кратко говоря, это последовательность XCom«ов с Mapped Tasks.
В get_from_xcom, чтобы получить инстанс класса из XCom, одного параметра task_id становится мало — у всех Mapped Tasks один и тот же task_id.
К счастью, XCom’ы внутри LazyXComAccess хранятся с использованием индексов.
Чтобы решить вопрос, добавим map_indexes=ti.map_index в вызов xcom_pull.
Также теперь стоит учитывать индексы при получении данных из XCom в обработчике dag_email_notification.

С Mapped Tasks сообщение может выглядеть так:

b6180513513f1f25266d580b7a333527.png

Напоследок

Хотелось бы показать больше кода, особенно обработчик dag_email_notification,
но цель статьи — не перегружать вас техническими деталями, а главное — продемонстрировать, как можно совместно с командой предоставить бизнесу эффективный инструмент для мониторинга и анализа процессов.

802df8e6a24619027edce0d13c2e4078.png

© Habrahabr.ru