Как мы деплоем Apache Airflow для промышленного использования в Kubernetes
Что делает инженер, если DAG не выполняется? Проверяет Airflow 50 раз, а потом вспоминает, что забыл поставить @dag над функцией.
Развертывание Apache Airflow в промышленной среде — это сложная задача, требующая учета множества аспектов: от обеспечения безопасности конфиденциальных данных до эффективного управления ресурсами. Одной из ключевых проблем, с которыми сталкиваются команды, является безопасное управление секретами, оптимизация конфигураций и наблюдаемость.
В этой статье мы рассмотрим, как использовать инструменты, такие как Sops и YAML-якоря, для упрощения управления конфиденциальными данными и улучшения читаемости конфигураций. А так же как обеспечить полную наблюдаемость инсталляции Apache Airflow
Коллеги, здарова!
Мне часто стали попадаться новые статьи о том, как деплоят Apache Airflow разные ребята у себя в организациях. Кто-то использует Docker Compose, кто-то показывает, как запустить это приложение на выделенных нодах, кто-то даже компилирует для этого свою необходимую версию Python (хотя, наверное, статья от 2022 года уже немножко устарела).
Сегодня я хочу вам рассказать о том, как мы деплоем промышленную версию Apache Airflow в Kubernetes.
Что мы сейчас рассмотрим:
1. Соберем свой Helm Chart который будет включать все необходимые компоненты.
2. Покроем Airflow мониторингом и логированием.
3. Отдельно поговорим про логирование, есть парочка интересных моментов.
4. И настроим правильное выделение ресурсов, для запуска подов.
Маленькая правочка от меня по поводу русификации:
Секрет = какая-нибудь тайна, которую лучше не рассказывать жене или любовнице:) .
Сикрет = Это поле в манифесте K8s, которое указывает тип ресурса Secret
Я так и не смог определится с написанием терминов на русском или английском языке. По этому вы увидите в тексте о том, как я пишу что-то на английском и тоже самое на русском языке.
Перед началом, я вам расскажу про способ шифрования Secret в k8s. Ведь обычно когда мы создаем манифест с kind: Secret
, то все частные данные мы указываем в открытом виде, что не есть хорошо, особенно есть это потом попадет в какой-нибудь репозиторий Git.
Мои коллеги своевременно рассказали мне про такой инструмент, как Sops https://github.com/getsops/sops
Основные преимущества Sops:
Шифрование на уровне файлов.
Интеграция с популярными инструментами, такими как Helm и Terraform.
Поддержка различных провайдеров ключей (AWS KMS, GCP KMS, Azure Key Vault).
Пример использования Sops
Создайте файл secrets.yaml
, содержащий необходимые секреты:
secrets:
mysql-root-password: ENC[AES256_GCM,data:2nLud5nDZEmm/sCW,...]
mysql-password: ENC[AES256_GCM,data:e4m5nwmy+zknWtWJ,...]
airflow-user-username: ENC[AES256_GCM,data:8RiizFAMKh+9LWM=,...]
airflow-user-password: ENC[AES256_GCM,data:lkjasd98adf7...]
Все значения зашифрованы и будут расшифрованы автоматически при деплое. Для шифрования используйте команду sops -e secrets.yaml
.
Теперь в файле values.yaml
подключите секреты: secrets: {}
Большим плюсом являет то, что он интегрирован с Helm и позволяет деплоить приложения с зашифрованными сикретами и хранить из в вашей системе контроля версий, если вы по какой-то причине не используете другие популярные решения.
Давайте начинать
Мы будем собирать свой чарт на основе следующего чарта:
https://github.com/airflow-helm/charts
Это чарт которые поддерживается комьюнити и был выбран мной, потому что, как мне показалось, он более наглядный и исчерпывающий, чем оригинальный чарт Apache Airflow.
apiVersion: v2
name: airflow
description: A Helm chart for Kubernetes with Apache Airflow
type: application
sources:
- https://github.com/airflow-helm/charts/tree/main/charts/airflow
version: 8.8.0
dependencies:
- condition: mysql.enabled
name: mysql
repository: bitnamicharts
version: 9.14.1
- condition: prometheus-statsd-exporter.enable
name: prometheus-statsd-exporter
repository: https://prometheus-community.github.io/helm-charts
version: 0.10.1
- condition: airflow.enable
name: airflow
repository: https://airflow-helm.github.io/charts
version: 8.8.0
- condition: fluent-bit.enabled
name: fluent-bit
repository: https://fluent.github.io/helm-charts
version: 0.39.0
- condition: minio.enable
name: minio
repository: bitnamicharts
version: 12.8.17
Для начала подключим необходимые чарты и сделаем helm upgrade
Давайте соберем для начала чарт, в котором будет только Airflow и дальше по ходу рассказа будем подключать остальные библиотеки. В конце статьи я прикреплю полностью готовый, который можно будет скопировать и запустить.
airflow:
airflow:
legacyCommands: false
image:
repository: apache/airflow
tag: v2.7.1
Так как мы используем Kubernetes, то нам нужно выбрать Executor для выполнения. Kubernetes Executor позволяет выполнять таски внутри дагов в отдельном поде, который будет создаваться под каждую таску.
executor: KubernetesExecutor
Далее разберемся с конфигами и переменными. Мы настраиваем следующие параметры. Детально остановимся только на нескольких.
config:
AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "True"
AIRFLOW__CORE__LOAD_EXAMPLES: "False"
AIRFLOW__METRICS__STATSD_ON: "True"
AIRFLOW__METRICS__STATSD_HOST: airflow-cluster-prometheus-statsd-exporter
AIRFLOW__METRICS__STATSD_PORT: "9125"
AIRFLOW__LOGGING__LOGGING_LEVEL: INFO
AIRFLOW__LOGGING__REMOTE_LOGGING: True
AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: "minio_S3"
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: s3://airflow-logs/
AIRFLOW__WEBSERVER__BASE_URL: "https://airflow.hramoff.local"
AIRFLOW__WEBSERVER__SHOW_TRIGGER_FORM_IF_NO_PARAMS: True
Указываем адрес, по которому будет обслуживаться Airflow.
AIRFLOW__WEBSERVER__BASE_URL: "https://airflow.hramoff.local"
Данная переменная позволит нам не загружать Даги, которые разработчики оставили нам для примера.
AIRFLOW__CORE__LOAD_EXAMPLES: "False"
Данная переменная позволит нам скрывать вс. Конфиденциальную информацию звездочками, которую мы будем задавать далее, данная настройка относится к веб-интерфейсу.
AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "True"
С остальными переменными мы будем разбираться по ходу действия.
Далее у нас есть такая сущность как KubernetesPodTemplate, которую мы настроим дальше. Нам в ней, в первую очереди интересуют ресурсы, выдаваемы подам. Каким конкретно подам? А тем подам, внутри которых идет выполнения дагов и их тасок, то есть, по задумке, эти значения не применяются к подам, которые отвечают за веб-интерфейс, планировщик и так далее. Мы пришли к выводу, что эти поды нам лучше запускать на выделенных нодах, поэтому укажем еще селекторы и tolerations.
Важное уточнение. Как вы знаете Apache Airflow написан на Python. А рабочая среда для выполнения дагов однопоточная. Это значит, что мы не можем использовать более одного ядра по умолчанию, если конечно написанный вами даг, этого не поддерживает, но такое решение будет не правильным, в контексте Airflow. Поэтому в лимитах мы максимум указывает cpu: 1
, так как больше мы использовать не можем и резервировать выше этого лимита не имеет смысла.
kubernetesPodTemplate:
resources:
requests:
memory: 512Mi
cpu: 100m
limits:
memory: 16Gi
cpu: 1
nodeSelector:
node-role/airflowKubernetesExecutor: ""
tolerations:
- key: node-role/airflowKubernetesExecutor
operator: Exists
Дальше у нас идет компонент scheduler. Это планировщик, который отслеживает все таски и даги, так же отвечает за выполнение и состояние.
scheduler:
replicas: 1
resources:
limits:
memory: 8Gi
cpu: 2
requests:
memory: 8Gi
cpu: 2
logCleanup:
enabled: false
livenessProbe:
enabled: true
initialDelaySeconds: 30
Внутри этого компонента мы так же указываем необходимые для этого ресурсы и пробы. Так же мы настроем LogCleanup для того, чтобы не хранить логи пода.
Дальше у нас идет настройка компонента, который отвечает за веб-интерфейс.
web:
replicas: 1
resources:
limits:
memory: 8Gi
cpu: 2
requests:
memory: 8Gi
cpu: 2
livenessProbe:
enabled: true
initialDelaySeconds: 30
readinessProbe:
enabled: true
initialDelaySeconds: 30
webserverConfig:
existingSecret: *basic
service:
type: ClusterIP
externalPort: 8080
Этому компоненты мы так же задаем ресурсы и настраиваем пробы.
Здесь вы сможете увидеть такой ключ и значение как existingSecret: *basic
Знатоки Yaml-программирования уже знают, что этот механизм называется якорем, а для остальных я поясню. Если в нашем чарте или любом другом yaml документе есть повторяющиеся разделы, то мы сможем использовать, называемое мной, наследование. Работает это следующим образом: мы можем объявить некоторую структуру yaml и несколько раз ссылаться на нее и использовать значения уже в ней. Что нам это даст? Вместо того, чтобы указывать одинаковые значения везде, мы можем заранее определить нужный нам ключ и наследоваться от него.
YAML-якоря позволяют избежать дублирования конфигураций, упрощая управление и внесение изменений. Рассмотрим пример с использованием секретов.
В начале файла определите ключ:
basic_secret: &basic airflow-secret
А в различных частях конфигурации ссылайтесь на этот якорь:
web:
webserverConfig:
existingSecret: *basic
externalDatabase:
passwordSecret: *basic
passwordSecretKey: mysql-password
gitSync:
httpSecret: *basic
httpSecretUsernameKey: airflow-gitcync-username
httpSecretPasswordKey: airflow-gitcync-user-token
Преимущества якорей YAML:
Централизованное управление повторяющимися параметрами.
Легкость в обновлении данных (изменение в одном месте распространяется на весь документ).
Упрощение чтения и поддержки конфигурации.
Далее создадим файл, допустим, secrets.yaml и внутри него будем указывать нужные значения:
secrets:
airflow-user-username: ENC[AES256_GCM,data:...]
airflow-user-password: ENC[AES256_GCM,data:...]
AWS_KEY_ID: ENC[AES256_GCM,data:...]
AWS_ACCESS_KEY: ENC[AES256_GCM,data:...]
А деплоить мы будем следующим образом:
helm secrets upgrade airflow-cluster . --namespace apache-airflow --install -f values.yaml -f secrets.yaml
Таким образом у нас автоматически будут подставляться и расшифровываться сикреты и другая конфиденциальная информация.
Далее у нас есть такой компонент как workers. Мы его авансом отключаем, потому что будем использовать KubernetesExecutor. Если мне не изменяет память, то данный компонент будет создавать заранее работающие поды, на которых будут выполнятся необходимые нам даги.
workers:
enabled: false
Есть еще компонент с триггерами. В Apache Airflow компонент Trigger используется для определения условий, при которых определенные таски могут быть выполнены. Это позволяет создавать более сложные и динамичные рабочие процессы, основанные на событиях или условиях, а не только на временных интервалах или последовательности выполнения.
triggerer:
enabled: true
replicas: 1
resources:
limits:
memory: 6Gi
cpu: 2
requests:
memory: 6Gi
cpu: 2
capacity: 1000
livenessProbe:
enabled: true
initialDelaySeconds: 30
У него мы так же задаем ресурсы и пробы.
Есть еще компонент flower, который используется как веб-интерфейс для мониторинга и управления тасками в apache airflow. Его использование остается под вопросом, вроде инструмент хороший, умеет визуализировать много полезной информации, но пару раз натыкались в нем на баги, да и какой-то особо важной информации он нам не дает, по этому решили его отключить, но я вам приложу фотокарточку из статьи с medium, чтобы было примерно понятно, что это за компонент такой.
Веб-интерфейс Flower
flower:
enabled: false
Далее у нас идет настройка логов. Мы указываем путь к ним и отключаем хранение.
logs:
path: /opt/airflow/logs
persistence:
enabled: false
Далее конфигурируем среду выполнения дагов.
dags:
path: /opt/airflow/dags
persistence:
enabled: false
gitSync:
enabled: true
image:
repository: /git-sync/git-sync
repo: http://git.hramoff.local/airflow/airflow.git
branch: main
syncWait: 30
httpSecret: *basic
httpSecretUsernameKey: airflow-gitcync-username
httpSecretPasswordKey: airflow-gitcync-user-token
Нам нужно указать путь по которому airflow будет искать даги и отключить их хранение. Делаем это мы для того, чтобы у нас работало приложение git-sync, которое будет синхронизировать необходимые нам даги из нашего репозитория.
У него мы указываем образ, который будет использоваться, а так же параметры для его работы, например интервал, ветку из которой будем клонировать и реквизиты пользователя.
Далее самым стандартным образом настраиваем ингрес.
ingress:
enabled: true
web:
enabled: true
host: airflow.hramoff.local
ingressClassName: nginx
Далее включим Service Account.
serviceAccount:
create: true
name: ""
annotations: { }
По умолчанию Apache Airflow использует PostgreSQL, но мы можем задать другую базу данных для хранения. Я использую MySQL потому что он мне роднее и более знаком :).
pgbouncer:
enabled: false
postgresql:
enabled: false
externalDatabase:
type: mysql
host: airflow-cluster-mysql
port: 3306
database: airflow
user: airflow
passwordSecret: *basic
passwordSecretKey: mysql-password
properties: ""
И отключаем использование Redis, как брокера сообщений, потому что мы работаем внутри Kubernetes и KubernetesExecutor закрывает потребность в дополнительном брокере.
redis:
enabled: false
Далее настроим MySQL.
mysql:
image:
registry: ""
repository: bitnami/mysql
enabled: true
auth:
username: airflow
database: airflow
existingSecret: *basic
defaultAuthPlugin: caching_sha2_password
primary:
persistence:
storageClass: "vsphere-csi-sc"
size: 50Gi
secondary:
replicaCount: 1
persistence:
storageClass: "vsphere-csi-sc"
size: 50Gi
Используем отказоустойчивую конфигурацию mysql с репликой.
Далее мы включим ServiceMonitor для того чтобы забирать метрики от Apache Airflow.
serviceMonitor:
enabled: true
interval: 25s
path: /metrics
port: http
selector:
component: prometheus-statsd-exporter
А загружать мы будем в Prometheus StatsD exporter.
prometheus-statsd-exporter:
image:
repository: prom/statsd-exporter
enable: true
statsd:
mappingConfigMapName: airflow-cluster-statsd
mappingConfigMapKey: mappingConf
Каким образом это работает. Для начала в документации apache airflow есть вся необходимая информация.
https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/metrics.html
Теперь давайте вернемся к инструкции повыше и включим метрики в apache airflow.
AIRFLOW__METRICS__STATSD_ON: "True"
AIRFLOW__METRICS__STATSD_HOST: airflow-cluster-prometheus-statsd-exporter
AIRFLOW__METRICS__STATSD_PORT: "9125"
Использовать для этого мы будем следующую конфигурацию:
https://github.com/data-burst/airflow-monitoring-and-alerting
В ней ней нам нужно использовать маппинг для StatsD. Для этого мы возьмем соответствующий код и положим его в ConfigMap.
Выглядеть это будет так
---
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "airflow.fullname" . }}-statsd
data:
mappingConf: |
mappings:
# Airflow StatsD metrics mappings (https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/metrics.html)
# === Counters ===
- match: "(.+)\\.(.+)_start$"
match_metric_type: counter
name: "af_agg_job_start"
match_type: regex
labels:
airflow_id: "$1"
job_name: "$2"
это не полный пример конфига
Думаю не стоит говорить о том, что ваш сервис монитор должен быть натравлен на Prometheus, а как загрузить дашборы в Grafana вы и так знаете. Вот ссылка на Dashboards — https://github.com/data-burst/airflow-monitoring-and-alerting/tree/main/config_files/grafana/var/lib/grafana/dashboards
Далее настроим логирование подов, в которых выполняются даги.
Использовать для этого будем FluentBit со следующим конфигом.
fluent-bit:
image:
repository: fluent/fluent-bit
tag: 2.1.10
extraVolumes:
- name: statedir
hostPath:
path: /opt/data/airflow/fluentbit
extraVolumeMounts:
- name: statedir
mountPath: /opt/data/
nodeSelector:
node-role/airflowKubernetesExecutor: ""
tolerations:
- key: node-role/airflowKubernetesExecutor
operator: Exists
config:
inputs: |
[INPUT]
Name tail
Path /var/log/containers/*_apache-airflow_base*.log
multiline.parser docker, cri
Tag kube.*
Mem_Buf_Limit 5MB
Skip_Long_Lines On
Refresh_Interval 1
read_from_head true
DB /opt/data/airflow_state.db
filters: |
[FILTER]
Name grep
Match kube.*
Exclude log /.*MYSQL_OPT_RECONNECT.*/
Exclude log /\s200\s\d.*kube-probe/
Exclude log /input:tail:tail/
[FILTER]
Name kubernetes
Match kube.*
Merge_Log On
Keep_Log Off
K8S-Logging.Parser On
K8S-Logging.Exclude On
[FILTER]
Name nest
Match kube.*
Operation lift
Nested_under kubernetes
Add_prefix k8s:
[FILTER]
Name nest
Match kube.*
Operation lift
Nested_under k8s:labels
Add_prefix k8s:labels:
[FILTER]
Name nest
Match kube.*
Operation lift
Nested_under k8s:annotations
Add_prefix k8s:annotations:
[FILTER]
Name record_modifier
Match kube.*
Allowlist_key k8s:annotations:dag_id
Allowlist_key k8s:labels:task_id
Allowlist_key log
Allowlist_key stream
Allowlist_key message
outputs: |
[OUTPUT]
Name gelf
Match kube.*
Host graylog.hramoff.local
Port 12205
Mode UDP
Gelf_Short_Message_Key log
Давайте разберем что мы тут видим. Начнем с секции Input. Мы будем читать файлы с логами контейнера, которые запускаются. Нас интересуют файлы по такому пути и с такой маской /var/log/containers/*_apache-airflow_base*.log
Далее рассмотрим фильтры. Во первых мы избавляемся от логов в которых есть MYSQL_OPT_RECONNECT
Хоть мы и используем последнюю версию, но в логах постоянно есть такие сообщения о несовместимости.
Логи пода с веб-интерфейсом Apache Airflow
Не понятно еще, когда уже исправят такую штучку
А так же избавимся от логов, связанных с пробами Kubernetes.
Рассмотрим следующие фильтры. По умолчанию поля с labels и annotations имеют очень не удобный для чтения вид. Всеми остальными фильтрами мы приводим в читаемый вид логи, чтобы можно было удобно отфильтровать записи по названию DAG или таски которая там выполняется.
Так выглядит лог в Graylog
Таким образом мы получим сообщение, в котором удобно производить поиск по полям: название дага и название таски.
Ну и наконец нам потребуется Minio.
minio:
enabled: true
persistence:
enabled: true
existingClaim: "minio"
podAnnotations:
prometheus.io/scrape: "true"
prometheus.io/path: "/minio/v2/metrics/cluster"
prometheus.io/port: "9000"
metrics:
serviceMonitor:
enabled: true
interval: 25s
paths:
- /minio/v2/metrics/cluster
- /minio/v2/metrics/node
resources:
limits:
cpu: 1
memory: 8Gi
requests:
cpu: 300m
memory: 512Mi
ingress:
enabled: true
ingressClassName: "nginx"
hostname: airflow-minio.hramoff.local
Зачем в данной конфигурации нам потребуется минио? А для того чтобы хранить логи. Но не те логи о которых мы говорили выше.
Ранее мы хранили логи пода, в котором запускался Airflow. А теперь мы храним логи выполнения Дага.
Лог выполнения таски
Ознакомится с документацией вы можете здесь.
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/logging/s3-task-handler.html
Так как мы не храним все логи в Persistence Volume, то нам нужно какое-то отдельное хранилище. Airflow умеет класть логи в Amazon S3.
Для этого мы еще раз вернемся к переменным окружения и настроем хранилище логов.
AIRFLOW__LOGGING__LOGGING_LEVEL: INFO
AIRFLOW__LOGGING__REMOTE_LOGGING: True
AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: "minio_S3"
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: s3://airflow-logs/
Подключаться мы будем к коннекшену minio_s3, которое создадим далее. И класть логи мы будем в отдельный бакет.
Теперь я вам покажу как создавать коннекшены в Airflow, при условии, что некоторые переменные являются конфиденциальными и зашифрованными.
Мы можем использовать переменную connections
connections:
- id: minio_S3
type: aws
description: Minio_S3_connection
extra: |-
{ "aws_access_key_id": "${AWS_KEY_ID}",
"aws_secret_access_key": "${AWS_ACCESS_KEY}",
"endpoint_url": "http://airflow-cluster-minio:9000" }
И задать следующие значения. Как вы видите ключи для подключения в основном чарте у нас скрыты за переменную.
Для того чтобы задать им конкретное значение будем использовать ConnectionsTemplates
connectionsTemplates:
AWS_KEY_ID:
kind: secret
name: *basic
key: AWS_KEY_ID
AWS_ACCESS_KEY:
kind: secret
name: *basic
key: AWS_ACCESS_KEY
Здесь мы объявляем название эти переменных, указываем им тип как сикрет и опять встречаем якоря.
Для того чтобы задать значения этим ключам мы переходим в отдельный файл с сикретами и указываем таком образом.
secrets:
mysql-root-password: mysql-root-password
mysql-password: mysql-password
airflowUser-username: airflowUser-username
airflowUser-password: airflowUser-password
airflow-gitcync-username: airflow-git-user
airflow-gitcync-user-token: airflow-git-user-token
webserver_config.py: sample-webserver-config
AWS_KEY_ID: AWS_KEY_ID
AWS_ACCESS_KEY: AWS_ACCESS_KEY
После этого мы можем задеплоить Apache Airflow
helm secrets upgrade airflow-cluster . --namespace apache-airflow --install -f values.yaml -f secrets.yaml
Что мы ожидаем увидеть.
1. Работающий Apache airflow
Apache Airflow
2. Мониторинг всех компонентов внутри Grafana.
Cluster Dashboard
Dag Dashboard
MinIO Dashboard
3. Логирование подов.
Логи пода внутри GrayLog
4. Хранение логов в Minio
Хранение логов в MinIO
В заключение, успешное развертывание Apache Airflow в Kubernetes требует комплексного подхода, охватывающего все аспекты observability, управления ресурсами и безопасного обращения с данными. Применяя рассмотренные методы и практики, мы можем значительно повысить надежность и эффективность наших ETL-процессов, а также обеспечить безопасность и управляемость нашей инфраструктуры. Надеемся, что представленные решения будут полезны для вашей команды и помогут вам в реализации проектов на базе Apache Airflow в Kubernetes.
Талант — это прежде всего труд
Антон Павлович Чехов русский писатель, драматург, врач 1860–1904