Airflow. Основы airflow. Работа с дагами. Dags example

cdb1a7b829b59744585a0cbcf5132c0b

Apache Airflow — открытое программное обеспечение для создания, выполнения, мониторинга и оркестровки потоков операций по обработке данных. 1

Изначально разработан в Airbnb в октябре 2014 года. В марте 2016 года стал проектом Apache Incubator, в январе 2019 года — проектом верхнего уровня Apache Software Foundation. 1

Airflow подходит не только для ETL-процессов, но и для автоматизации других задач, например, создания и отправки отчётов, управления инфраструктурой. 2

Некоторые области применения Airflow:

  • Инженерам данных — для проектирования, разработки и обслуживания систем обработки данных. 2

  • Аналитикам и специалистам по Data Science— для построения витрин данных, отчётов и подготовки данных для машинного обучения. 2

  • Разработчикам — для автоматизации загрузки данных для тестирования приложения, настройки обмена информацией между базами данных или с внешними системами. 2

  • Менеджерам проектов — для планирования и мониторинга процессов обработки данных. 

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago, timedelta
import logging
import pymongo
from pymongo import MongoClient

# Конфигурация MongoDB
MONGO_URI = 'mongodb://username:password@localhost:27017/mydatabase'  # Заменить на актуальные данные
DATABASE_NAME = 'mydatabase'
COLLECTION_NAME = 'mycollection'

def collect_data_from_db():
    try:
        # Подключение к MongoDB
        client = MongoClient(MONGO_URI)
        db = client[DATABASE_NAME]
        collection = db[COLLECTION_NAME]
        
        # Пример запроса - получаем все записи из коллекции
        data = collection.find()
        records = list(data)
        
        if records:
            logging.info(f"Получено {len(records)} записей из MongoDB.")
        else:
            logging.info("Записей не найдено.")
        
        client.close()  # Закрываем соединение с MongoDB
    except Exception as e:
        logging.error(f"Ошибка при подключении к MongoDB: {e}")
        raise

def preprocess_data():
    try:
        # Логика препроцессинга данных
        logging.info("Предобработка данных выполнена успешно.")
    except Exception as e:
        logging.error(f"Ошибка при препроцессинге данных: {e}")
        raise

def load_data_to_storage():
    try:
        # Логика загрузки данных в хранилище
        logging.info("Загрузка данных в хранилище выполнена успешно.")
    except Exception as e:
        logging.error(f"Ошибка при загрузке данных в хранилище: {e}")
        raise

def update_data():
    try:
        # Логика обновления данных
        logging.info("Обновление данных выполнено успешно.")
    except Exception as e:
        logging.error(f"Ошибка при обновлении данных: {e}")
        raise

def schedule_tasks():
    try:
        # Логика автоматического запуска задач
        logging.info("Задачи успешно запланированы.")
    except Exception as e:
        logging.error(f"Ошибка при планировании задач: {e}")
        raise

def run_etl_process():
    try:
        # Логика выполнения ETL процесса
        logging.info("ETL процесс выполнен успешно.")
    except Exception as e:
        logging.error(f"Ошибка в ETL процессе: {e}")
        raise

default_args = {
    'owner': 'airflow',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'start_date': days_ago(1),
}

with DAG('data_processing_dag',
         default_args=default_args,
         description='DAG для автоматизации сбора, обработки и загрузки данных',
         schedule_interval=timedelta(days=1),  # запуск каждый день
         catchup=False) as dag:

    task_1 = PythonOperator(
        task_id='collect_data_from_db',
        python_callable=collect_data_from_db
    )

    task_2 = PythonOperator(
        task_id='preprocess_data',
        python_callable=preprocess_data
    )

    task_3 = PythonOperator(
        task_id='load_data_to_storage',
        python_callable=load_data_to_storage
    )

    task_4 = PythonOperator(
        task_id='update_data',
        python_callable=update_data
    )

    task_5 = PythonOperator(
        task_id='schedule_tasks',
        python_callable=schedule_tasks
    )

    task_6 = PythonOperator(
        task_id='run_etl_process',
        python_callable=run_etl_process
    )

    # Устанавливаем порядок выполнения задач
    task_1 >> task_2 >> task_3 >> task_4 >> task_5 >> task_6

Apache Airflow — открытое программное обеспечение для создания, выполнения, мониторинга и оркестровки потоков операций по обработке данных. 1

Изначально разработан в Airbnb в октябре 2014 года. В марте 2016 года стал проектом Apache Incubator, в январе 2019 года — проектом верхнего уровня Apache Software Foundation. 1

Airflow подходит не только для ETL-процессов, но и для автоматизации других задач, например, создания и отправки отчётов, управления инфраструктурой. 2

Некоторые области применения Airflow:

  • Инженерам данных — для проектирования, разработки и обслуживания систем обработки данных. 2

  • Аналитикам и специалистам по Data Science— для построения витрин данных, отчётов и подготовки данных для машинного обучения. 2

  • Разработчикам — для автоматизации загрузки данных для тестирования приложения, настройки обмена информацией между базами данных или с внешними системами. 2

  • Менеджерам проектов — для планирования и мониторинга процессов обработки данных. 

© Habrahabr.ru