Написание ETL пайплайна при помощи airflow, rabbitmq и postgres

Всем привет! В данной статье мы рассмотрим как можно локально развернуть airflow с помощью docker desktop’a и PyCharm’a. Кроме него развернём ещё и следующее: rabbitMQ, postgreSQL, redis и MongoDB.

Первый делом, нужно установить себе на компьютер Docker desktop, PyCharm и PgAdmin4. Теперь нам нужен yaml файл, в котором будет прописано все образы, которые мы хотим поставить. Готовый файл для нашей задачи можете скачать по ссылке. Когда всё необходимое было скачано, можно приступить к развёртыванию airflow.

Создаём проект в PyCharm, и в корневую папку загружаем наш yaml файл. В терминале выполняем следующий код: docker compose up -d. Когда нажмёте enter, то у вас начнётся скачивание необходимых образов в docker desktop и сразу из них запустятся контейнеры, кроме этого, в проекте у вас создастся необходимая структура папок.

Когда всё скачается, у вас docker desktop должен выглядеть примерно так:

На данный момент, у нас уже локально развернулось всё, что я обещал, теперь будем устанавливать необходимые соединения.

Объединение airflow и postgres

Начнём с Postgres; зайдём в PgAdmin и подключимся к нашему серверу

7bb41b194db15e56622bca71b009f88a.png8f6760e71c83d15370daab7b3ca864d2.png

Логин и пароль: airflow, airflow соответственно

0d04a28e2aaa53caa376676987f90cc1.png

После подключения у вас должно появиться в сервере две базы данных:

b0b8674b4114169be814bbdaed179a37.png

Для работы с airflow используется airflow. Для будущего написания ETL давайте сразу создадим таблицу:

475ff9ea2c5b8d3599a5aeff846e9dd3.png12ef803870f5eaa5406b141494f1b479.pngefb56e6be27e80f3f41640a9f443c0dd.png

Теперь займёмся установкой соединения между airflow и postgres. Для того, чтобы перейти в airflow, зайдём в docker desktop и нажмём на порт у airflow-webserver:

692356d610e2d77962a0a3c1f2562dc3.png

Нужно будет ввести логин и пароль: airflow и airflow

После этого, вы окажетесь на основной странице airflow:

b66b891dbdfbf1b3f3996d333a546f10.png

Теперь, чтобы наш airflow мог взаимодействовать с postgres, необходимо создать соединение между ними в airflow:

96685121f8b8f1240c47b5ae9ad657c1.png4a5664197ff3a5e17ae97c43f6e53de7.png

Пароль указываем от нашей базы данных, в нашем случае airflow

0688b86e0a955ce641b8917e8c6a7e85.png

Сохраняем соединение и теперь у нас налажена работа airflow и postgres.

Напишем самый простой dag, чтобы проверить, что всё работает. Для начала нужно установить дополнительные библиотеки в PyCharm: apache-airflow и apache-airflow-providers-postgres.

Напишем код:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook


hook = PostgresHook(postgres_conn_id='Postgres')

request = "insert into public.data (name, birth_date, age) values ('Anton', '2004.11.14', 19)"


def insert():
    hook.run(request)


with DAG(
    dag_id="base_dag",
    default_args={
        "owner": "Kirill",
    }
) as dag:

    t = PythonOperator(
        task_id='insert_postgres',
        python_callable=insert
        )

Как только мы закончим его писать, в airflow появится наш dag:

50feb50c9d9fa8197d213359f12b3fd0.png

Запустим его и посмотрим, что получилось.

b486e4beceedd14e835cb721d18c4b41.png4292ed1e6a26474d5b3ede1f7362f5f9.png

Как видим, dag отработал успешно и добавил новую строку в нашу таблицу.

Объединение airflow и rabbitmq

Начнём с настройки rabbitMQ. Для того, чтобы перейти в rabbitMQ, нужно нажать на его порт в docker desktop’е:

34c8bf419effe8bdc95f9b562ff8c3a8.png

Логин и пароль: guest, guest. Перейдём в раздел «Queues» и создадим там новую очередь с названием queue.airflow:

73f4d97e184af9e64277dff6dfa3abeb.png

Вернёмся в PyCharm и установим библиотеку airflow-provider-rabbitmq.

Перейдём в airflow в раздел с соединениями и добавим соединение с брокером, в поле с хостом нужно указать имя нашего контейнера с rabbit, то есть, rabbitmq_main:

837366fb7c60580da1d826d0a903b5c9.png

После всего этого, настройка брокера закончена и мы можем приступать к взаимодействию питона, airflow и брокера. Напишем базовую программу:

Для этого в папке dags создадим папку tasks_broker. В ней создадим sensor.py с следующим кодом:

from rabbitmq_provider.sensors.rabbitmq import RabbitMQSensor


sensor = RabbitMQSensor(
    task_id="sensor",
    queue_name="queue.airflow",
    rabbitmq_conn_id="RabbitMQ",
)

Данный код является таской, которая принимает данные из брокера и передаёт в другую таску.

Также в папке tasks_broker создадим get_data.py с следующим содержанием:

import json
from airflow.decorators import task


@task(task_id="get_data")
def get_data(**kwargs) -> None:
    message = json.loads(kwargs['task_instance'].xcom_pull(task_ids='sensor'))

    print("#########################################################################################")
    print(message)
    print("#########################################################################################")

После этого, в папке dags создадим python_broker.py:

from airflow import DAG
from tasks_broker.sensor import sensor
from tasks_broker.get_data import get_data


with DAG(
        dag_id="python_broker",
        schedule_interval=None,
) as dag:
    sensor >> get_data()

И у нас в airflow появится новый dag python_broker. Запустим его и он будет ждать до тех пор, пока из брокера не поступит сообщение. Чтобы отправить сообщение из брокера, нужно перейти в раздел «queues» и выбрать необходимую очередь, в нашем случае queue.airflow. Там выбрать «publish message», ввести нужное сообщение и отправить.

fd380232f6797a30561ee70faaa756c1.png

Как мы отправим сообщение, наш dag выполнится и мы можем убедиться в том, что всё прошло успешно, зайдя в логи второй таски:

2ccea2a038008a5a48db580926b23ac1.png

Решение задачи ETL. Объединение airflow, rabbitmq и postgres

Давайте теперь решим задачу ETL, будет три таски:

  1. Получаем данные о человеке в формате json из брокера (имя, пол, дату рождения, возраст, город проживания) и передаём его во вторую таску для обработки

  2. Вторая таска отвечает за обработку полученных полей и дальнейшую передачу результата в третью таску (будем из пяти полей передавать только три (имя, дату рождения и возраст))

  3. Третья таска будет загружать данные в postgres

По сути, все соединения у нас установлены, необходимость лишь заключается в правильно написанном коде. В папке dags создадим папку tasks_broker_postgres. Там создадим три файла extract.py, transformation.py и load.py.

extract.py:

from rabbitmq_provider.sensors.rabbitmq import RabbitMQSensor


extract = RabbitMQSensor(
    task_id="extract",
    queue_name="queue.airflow",
    rabbitmq_conn_id="RabbitMQ",
)

transformation.py:

import json
from airflow.decorators import task


@task(task_id="transformation")
def transformation(**kwargs) -> dict:
    message = json.loads(kwargs['task_instance'].xcom_pull(task_ids='extract'))

    new_message = {"name": message["name"],
                   "birth_date": message["birth_date"],
                   "age": message["age"]}

    return new_message

load.py:

from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook


@task(task_id="load")
def load(**kwargs) -> None:
    message = kwargs['task_instance'].xcom_pull(task_ids='transformation')

    hook = PostgresHook(postgres_conn_id='Postgres')

    request = ("insert into public.data (name, birth_date, age) values ('" + message["name"] + "','" +
                                                                            message["birth_date"] + "'," +
                                                                            str(message["age"]) + ")")

    hook.run(request)

Теперь в папке dags создадим python_broker_postgres.py:

from airflow import DAG
from tasks_broker_postgres.extract import extract
from tasks_broker_postgres.transformation import transformation
from tasks_broker_postgres.load import load


with DAG(
        dag_id="python_broker_postgres",
        schedule_interval=None,
) as dag:
    extract >> transformation() >> load()

Запустим dag в airflow, зайдём в брокер и отправим сообщение:

51706eae8ed5cb80dfda74b096a9b1b8.png

По логам смотрим, что все таски прошли успешно:

f873736061ee13a642fd92a4568dbbdc.png

Зайдём теперь в базу данных:

35db73fdc38d182a8151f23e05a0b22c.png

Как видим, наше сообщение успешно обработалось и добавилось в базу данных.

Заключение

В данной статье мы рассмотрели взаимодействие python, airflow, postgres и rabbitmq.

Если вы не забыли, то в yaml файл входит ещё и MongoDB, работа с ней осуществляется аналогично работе с postgres. Для удобной работы с MongoDB можно скачать эту программу.

До новых публикаций!

© Habrahabr.ru