Airflow. Основы airflow. Работа с дагами. Dags example
Apache Airflow — открытое программное обеспечение для создания, выполнения, мониторинга и оркестровки потоков операций по обработке данных. 1
Изначально разработан в Airbnb в октябре 2014 года. В марте 2016 года стал проектом Apache Incubator, в январе 2019 года — проектом верхнего уровня Apache Software Foundation. 1
Airflow подходит не только для ETL-процессов, но и для автоматизации других задач, например, создания и отправки отчётов, управления инфраструктурой. 2
Некоторые области применения Airflow:
Инженерам данных — для проектирования, разработки и обслуживания систем обработки данных. 2
Аналитикам и специалистам по Data Science— для построения витрин данных, отчётов и подготовки данных для машинного обучения. 2
Разработчикам — для автоматизации загрузки данных для тестирования приложения, настройки обмена информацией между базами данных или с внешними системами. 2
Менеджерам проектов — для планирования и мониторинга процессов обработки данных.
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago, timedelta
import logging
import pymongo
from pymongo import MongoClient
# Конфигурация MongoDB
MONGO_URI = 'mongodb://username:password@localhost:27017/mydatabase' # Заменить на актуальные данные
DATABASE_NAME = 'mydatabase'
COLLECTION_NAME = 'mycollection'
def collect_data_from_db():
try:
# Подключение к MongoDB
client = MongoClient(MONGO_URI)
db = client[DATABASE_NAME]
collection = db[COLLECTION_NAME]
# Пример запроса - получаем все записи из коллекции
data = collection.find()
records = list(data)
if records:
logging.info(f"Получено {len(records)} записей из MongoDB.")
else:
logging.info("Записей не найдено.")
client.close() # Закрываем соединение с MongoDB
except Exception as e:
logging.error(f"Ошибка при подключении к MongoDB: {e}")
raise
def preprocess_data():
try:
# Логика препроцессинга данных
logging.info("Предобработка данных выполнена успешно.")
except Exception as e:
logging.error(f"Ошибка при препроцессинге данных: {e}")
raise
def load_data_to_storage():
try:
# Логика загрузки данных в хранилище
logging.info("Загрузка данных в хранилище выполнена успешно.")
except Exception as e:
logging.error(f"Ошибка при загрузке данных в хранилище: {e}")
raise
def update_data():
try:
# Логика обновления данных
logging.info("Обновление данных выполнено успешно.")
except Exception as e:
logging.error(f"Ошибка при обновлении данных: {e}")
raise
def schedule_tasks():
try:
# Логика автоматического запуска задач
logging.info("Задачи успешно запланированы.")
except Exception as e:
logging.error(f"Ошибка при планировании задач: {e}")
raise
def run_etl_process():
try:
# Логика выполнения ETL процесса
logging.info("ETL процесс выполнен успешно.")
except Exception as e:
logging.error(f"Ошибка в ETL процессе: {e}")
raise
default_args = {
'owner': 'airflow',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'start_date': days_ago(1),
}
with DAG('data_processing_dag',
default_args=default_args,
description='DAG для автоматизации сбора, обработки и загрузки данных',
schedule_interval=timedelta(days=1), # запуск каждый день
catchup=False) as dag:
task_1 = PythonOperator(
task_id='collect_data_from_db',
python_callable=collect_data_from_db
)
task_2 = PythonOperator(
task_id='preprocess_data',
python_callable=preprocess_data
)
task_3 = PythonOperator(
task_id='load_data_to_storage',
python_callable=load_data_to_storage
)
task_4 = PythonOperator(
task_id='update_data',
python_callable=update_data
)
task_5 = PythonOperator(
task_id='schedule_tasks',
python_callable=schedule_tasks
)
task_6 = PythonOperator(
task_id='run_etl_process',
python_callable=run_etl_process
)
# Устанавливаем порядок выполнения задач
task_1 >> task_2 >> task_3 >> task_4 >> task_5 >> task_6
Apache Airflow — открытое программное обеспечение для создания, выполнения, мониторинга и оркестровки потоков операций по обработке данных. 1
Изначально разработан в Airbnb в октябре 2014 года. В марте 2016 года стал проектом Apache Incubator, в январе 2019 года — проектом верхнего уровня Apache Software Foundation. 1
Airflow подходит не только для ETL-процессов, но и для автоматизации других задач, например, создания и отправки отчётов, управления инфраструктурой. 2
Некоторые области применения Airflow:
Инженерам данных — для проектирования, разработки и обслуживания систем обработки данных. 2
Аналитикам и специалистам по Data Science— для построения витрин данных, отчётов и подготовки данных для машинного обучения. 2
Разработчикам — для автоматизации загрузки данных для тестирования приложения, настройки обмена информацией между базами данных или с внешними системами. 2
Менеджерам проектов — для планирования и мониторинга процессов обработки данных.