Airflow в Kubernetes. Часть 1
Приветствую!
На пути инженера данных часто встречаются задачи связанные с DevOps. Одна из таких — развернуть Airflow в Kubernetes кластере. Если до этого похожего опыта работы не было, то эта задача может показаться не тривиальной. Конечно, можно выполнить несколько команд из официального гайда, но если нужно будет что-то поправить, то без понимания, что происходит внутри, обойтись будет сложно. Эта статья призвана облегчить данную задачу. Она поможет тем, кто уже работал с Airflow, но еще не касался технологии Kubernetes.
Чтобы легче понять эту тему будем использовать подготовленный репозиторий. Он создан на основе официального, путем упрощения.
Предварительная работа
Airflow, который развернем, будет иметь следующую архитектуру (рис. 1):
Рис. 1 Архитектура Airflow
Как видите, довольно стандартная схема. Тип executor- Celery, брокер сообщений — Redis, БД — Postgresql.
Прежде чем перейти к аспектам работы с Kubernetes, необходимо выполнить несколько предварительных работ:
Создание кластера. Рекомендую использовать сервис для управления кластером kubernetes в одном из облаков. Лично я использовал Yandex Cloud. Тут находится инструкция по его созданию.
Установка kubectl. Утилита, которая позволяет взаимодействовать с кластером.
Установка Helm. Ниже этот инструмент будет описан подробнее.
Создание БД. Здесь также можно воспользоваться готовым облачным решением — Managed service for Postgresql или поднять самому.
Helm
Helm используется для установки приложений в Kubernetes кластере. Этот инструмент предоставляет широкий ряд возможностей, но сейчас нас интересует одна из них — шаблонизирование.
Основная сущность в Helm — chart. Helm chart — это пакет, содержащий шаблоны и информацию, которая необходима для их заполнения. Давайте подробнее посмотрим на содержимое нашего chart (см. репозиторий). Там мы увидим:
Chart.yaml — некоторая информация о нашем чарте
templates — директория со всеми шаблонами, которые будут развернуты в Kubernetes кластере
values.yaml — тут мы определяем переменные, которые будем использовать в шаблонах
Например, у нас есть значение executor, которое мы хотим использовать. Запишем его в values.yaml
executor: "CeleryExecutor"
Теперь, чтобы использовать эту переменную в шаблоне scheduler, необходимо прописать в соответствующем template файле следующее:
{{ .Values.executor }}
Похоже на jinja template в airflow, не правда ли?
Однако, иногда неудобно просто прописывать все значения в values, хочется их как-то предварительно обработать. Для этого нам поможет _helpers.yaml, который расположен вместе с шаблонами в директории templates. Например, мы хотим объединить значения репозитория и тега для docker image:
{{- define "airflow_image" -}}
{{- $repository := .Values.images.airflow.repository -}}
{{- $tag := .Values.images.airflow.tag -}}
{{- printf "%s:%s" $repository $tag -}}
{{- end }}
Теперь мы можем вызвать эту функцию в шаблоне scheduler:
{{ template "airflow_image" . }}
В представленном репозитории используются также более сложные функции, чтобы понять, что они делают, можно посмотреть на готовый манифест. Чтобы его получить нужно выполнить
helm template airflow dn-airflow/part1/.
Также я положил результат выполнения этого кода в репозиторий. В статье я буду использовать уже заполненные шаблоны.
Deployment (Scheduler)
Итак, давайте сообщим Kubernetes, что мы от него хотим. Для этого необходимо создать манифесты — yaml файлы, в которых описываем некоторые правила.
Начнем с манифеста для планировщика :
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow-scheduler
labels:
tier: airflow
component: scheduler
release: airflow
chart: "airflow-1"
executor: CeleryExecutor
spec:
replicas: 1
selector:
matchLabels:
tier: airflow
component: scheduler
release: airflow
template:
metadata:
labels:
tier: airflow
component: scheduler
release: airflow
spec:
initContainers:
- name: wait-for-airflow-migrations
image: apache/airflow:2.6.2
imagePullPolicy: IfNotPresent
volumeMounts:
- name: config
mountPath: "/opt/airflow/airflow.cfg"
subPath: airflow.cfg
readOnly: true
args:
- airflow
- db
- check-migrations
- --migration-wait-timeout= 60
env:
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: airflow-fernet-key
key: fernet-key
…
- name: AIRFLOW__CORE__LOAD_EXAMPLES
value: "True"
containers:
- name: scheduler
image: apache/airflow:2.6.2
imagePullPolicy: IfNotPresent
args:
- bash
- -c
- exec airflow scheduler
env:
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: airflow-fernet-key
key: fernet-key
…
- name: AIRFLOW__CORE__LOAD_EXAMPLES
value: "True"
livenessProbe:
initialDelaySeconds: 10
timeoutSeconds: 20
failureThreshold: 5
periodSeconds: 60
exec:
command:
- sh
- -c
- |
CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR exec /entrypoint \
airflow jobs check --job-type SchedulerJob --local
volumeMounts:
- name: logs
mountPath: "/opt/airflow/logs"
- name: config
mountPath: "/opt/airflow/airflow.cfg"
subPath: airflow.cfg
readOnly: true
volumes:
- name: config
configMap:
name: airflow-airflow-config
- name: logs
emptyDir: {}
Первое что нам попадается на глаза, это тип ресурса:
kind: Deployment
Чтобы понять, что такое Deployment, начнем погружение с более простой сущности — Pod. Это несколько контейнеров, которые запускаются вместе на одном узле, с общей сетью и хранилищем. В нашем случае их 2: контейнер с приложением и контейнер, который с приложением, которое ожидает миграций (об этом далее)
Схематично обозначим Pod таким образом (рис. 2) :
Рис. 2 Схематичное изображение Pod
Чтобы запустить несколько реплик одного и того же Pod, нам необходим ресурс ReplicaSet. (рис. 3), который следит за тем, чтобы кол-во Pods соответствовало заявленному кол-ву в манифесте.
Рис. 3 Схематичное изображение ReplicaSet
Обычно не используют в продакшене отдельно Pod и ReplicaSet. Используют их родительский объект — Deployment (рис. 4). Т.к. Deployment контролирует обновления Pods. Существует 2 стратегии обновления. Одна из них — Rolling Update, позволяет обновлять Pods один за другим, что полезнее в ряде случаях, чем сначала убить все Pods, а затем запустить новые с новой версией приложения.
Рис. 4 Схематичное изображение Deployment
Именно Deployment, который используется для Airflow scheduler, мы будем подробно разбирать:
apiVersion: apps/v1
Задаем версию API, которую будем использовать для нашего ресурса. Чтобы посмотреть поддерживаемые версии для кластера, можно использовать команду:
kubectl api-versions
metadata
Тут мы задаем имя Deployment, а также задаем labels. Это опциональная информация о нашем Deployment. Но благодаря им мы можем фильтровать выборку ресурсов. Например, команда
kubectl get pods -n airflow -l component=scheduler
вернет только поды с labels component=scheduler
replicas
Это кол-во реплик, которое будет развернуто
selector
Тут мы описываем каким образом ReplicaSet будет понимать, какими Pods управлять. Мы используем matchLabels где указываем labels. Точно такие же labels мы должны указать в template.
template
Здесь мы описываем шаблон Pod. Также задаем metadata, в которой укажем labels (которые использует selector)
spec
спецификация нашего пода, включает в себя информацию о контейнерах, переменных окружения, подключаемых томах и другую информацию, о которой поговорим ниже:
initContainers
Это контейнер, который будет запускаться перед запуском основного приложения. Мы используем его, чтобы проверить накатились ли уже миграции. Делаем это с помощью команды:
airflow db check-migrations
Описание initContainers похоже на описание Containers, которое рассмотрим подробнее:
containers
image: название образа, который будем использовать
imagePullPolicy: политика загрузки image. В нашем случае Kubernetes сравнивает digest указанного контейнера и тех, что находятся на машине. Если не совпадают, то скачивает.
command и args: задают команду, которую выполняем при запуске контейнера.
env: хранит переменные окружения, которые будут доступны внутри пода, кроме обычных, с названием и значением, сюда можно добавить переменные, значение которых будет приходить из Secret, еще один ресурс в Kubernetes, который обсудим ниже.
livenessProbe
Необходима, если что-то идет не так с приложением запущенным в Pod, то Kubernetes поймет это и перезапустит сломанный Pod.
Внутри содержатся дополнительные метрики:
initDelaySeconds: сколько ждать секунд перед запуском первой проверки. Если Pod долго запускается, то стоит поставить более высокое значение, чтобы Kubernetes не убил Pod, прежде, чем приложение запустится.
periodSeconds: как часто запускать проверки.
timeoutSeconds: время, в течении которого Kubernetes ждет ответа от приложения. Если в течении этого времени приложении не ответило, значит проверка завалилась.
failureThreshold: кол-во неудачных попыток, которое произведет Kubernetes, прежде чем убить Pod.
exec: описывает способ, которым мы проверяем состояние Pod. В нашем случае запускаем shell скрипт.
volumeMounts
определяет по какому пути подключать тома, которые описаны в volumes
volumes
В нашем случае их 2: config, в который мы монтируем configMap (этот ресурс обсудим ниже). А также logs. Для него создаем том типа emptyDir. Это хранилище будет доступно, пока жив Pod. После рестарта Pod, директория logs будет пустой.
StatefulSet (Worker)
Для запуска приложений с сохранением состояния, redis и worker в нашем случае, используется StatefulSet
В отличие от Deployment он запускает Pods с предсказуемым именем, например, worker-0, worker-1. Также масштабирование Pods происходит по порядку. Также у StatefulSet другая политика по управлению подключаемыми хранилищ, которую обсудим в следующей части. Причины, по которым для worker использует StatefulSet можно посмотреть здесь.
Шаблон описания почти такой же, как у Deployment, поэтому не будем на нем останавливаться.
ConfigMap (Config)
При описании scheduler мы упоминали ресурс ConfigMap, который монтируем по пути '/opt/airflow/airflow.cfg'. Давайте рассмотрим его.
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-airflow-config
labels:
tier: airflow
component: config
release: airflow
chart: "airflow-1"
heritage: Helm
data:
airflow.cfg: |-
[celery]
worker_concurrency = 16
[core]
dags_folder = /opt/airflow/dags
executor = CeleryExecutor
load_examples = True
Большинство полей для нас знакомы, кроме data. Тут описываем название и содержимое файла, который будет примонтирован.
Secret (Airflow-fernet-key)
При рассмотрении конструкции env мы встретили такой ресурс, как secret. Рассмотрим его на примере секрета для webserver:
apiVersion: v1
kind: Secret
metadata:
name: airflow-fernet-key
labels:
tier: airflow
release: airflow
chart: airflow
heritage: Helm
annotations:
"helm.sh/hook": "pre-install"
"helm.sh/hook-delete-policy": "before-hook-creation"
"helm.sh/hook-weight": "0"
type: Opaque
data:
fernet-key: "TTAxc05IQlBNakZsWVdwclEzSklXbFI2VkRWU01XUjFUM0JZVVV4aFV6ST0="
Описание почти такое же как у ConfigMap, однако есть дополнительное поле type. В Kubernetes есть несколько типов секретов. Мы используем тип Opaque, который содержит произвольные пользовательские данные. В data содержится сам секрет закодированный в формат base64. Создание ключа и кодирование производится с помощью helm.
Также в annotations можно увидеть helm hooks, которые позволяют управлять порядком деплоя приложения. Т.к. нам необходимо, чтобы Secret был создан до того, как будет разворачиваться scheduler, мы используем hook «pre-install».
Job (Airflow-run-airflow-migrations)
Для scheduler нам необходим Pod, который постоянно работает. Но что, если нам необходимо разово выполнить какое-то действие, например, накатить миграции? Для этого подходит такой Kubernetes ресурс как Job.
Помните, что при описании scheduler Deployment мы описывали InitContainer, который проверяет, были ли уже произведена миграция? Сама миграция как раз описана в Job:
apiVersion: batch/v1
kind: Job
metadata:
name: airflow-run-airflow-migrations
labels:
tier: airflow
component: run-airflow-migrations
release: airflow
chart: "airflow-1"
heritage: Helm
spec:
template:
metadata:
labels:
tier: airflow
component: run-airflow-migrations
release: airflow
spec:
restartPolicy: OnFailure
containers:
- name: run-airflow-migrations
image: apache/airflow:2.6.2
imagePullPolicy: IfNotPresent
args:
- bash
- -c
- |-
exec \
airflow db upgrade
env:
- name: PYTHONUNBUFFERED
value: "1"
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: airflow-fernet-key
key: fernet-key
…
- name: AIRFLOW__CORE__LOAD_EXAMPLES
value: "True"
volumeMounts:
- name: config
mountPath: "/opt/airflow/airflow.cfg"
subPath: airflow.cfg
readOnly: true
volumes:
- name: config
configMap:
name: airflow-airflow-config
Новое поле для нас это restartPolicy, значение OnFailure говорит о том, что Job будет перезапускаться в случае неудачи.
Service (Airflow-webserver)
Осталось рассмотреть такой ресурс как Service.
Этот ресурс необходим для предоставления доступа к нашему приложению. На данный момент мы будем использовать его, чтобы у нас был доступ к UI Airflow из сети. Это не продовое решение, но для того, чтобы убедится, что все работает корректно, в самый раз:
apiVersion: v1
kind: Service
metadata:
name: airflow-webserver
labels:
tier: airflow
component: webserver
release: airflow
chart: "airflow-1"
heritage: Helm
spec:
type: NodePort
selector:
tier: airflow
component: webserver
release: airflow
ports:
- name: airflow-ui
port: 8080
Существует несколько типов сервисов. Мы используем NodePort, который предоставит доступ к Pod по ip адресу узла, на котором крутится Pod. Чтобы узнать этот ip можно зайти в консоль Yandex Cloud и посмотреть его.
Также необходимо указать порт, на котором крутится webserver, он должен быть такой же, как в настройках самого Airflow.
Namespace
Перед деплоем приложения нам нужно разобрать еще один ресурс — Namespace. Он позволяет разбить кластер на логические группы. Мы будем деплоить все компоненты Airflow в одном Namespace. Также на Namespace можно наложить различные политики доступа.
Проверяем, что все работает
Отлично, мы описали все наши ресурсы, давайте взглянем на схему (рис. 5), как это все теперь выглядит:
Рис. 5 Изображение ресурсов Kubernetes используемых Airflow
Давайте теперь попробуем запустить наш chart.
Сначала клонируем репозиторий:
git clone git@github.com:Siplatov/dn-airflow.git
Далее необходимо заменить значения host, user и pass для postgresql на свои. Эти значения находятся в values.yaml.
После этого можно делать релиз:
helm upgrade --install -n airflow --create-namespace airflow dn-airflow/part1.
Через несколько минут можно проверить, запустились ли уже pods (Рис. 6):
kubectl get pods -n airflow
Рис. 6 Проверка работоспособности Pods
Видим, что Jobs успешно отработали, а все остальные Pods в статусе running. Значит все хорошо. Однако видно, что webserver перезапускался несколько раз. Чтобы понять почему, необходимо выполнить команду:
kubectl describe pod airflow-webserver-dc54c9884-g7x5s -n airflow
В конце увидим следующую картину (рис. 7):
Рис. 7 Вывод команды describe pod
Webserver не успел запуститься за время указанное в initDelaySeconds livenessProbe и Kubernetes сделал несколько проверок, посчитал, что Webserver мертв и решил его перезапустить. Поэтому стоит увеличить initDelaySeconds.
Чтобы проверить UI airflow, давайте посмотрим на services (рис. 8):
kubectl get services -n airflow
Рис. 8 Вывод команды get services
Видим, что проверять нужно на порту 32462
Если при открытии ссылки увидим панель с регистрацией (рис. 9), значит все запустилось успешно:
Рис. 9 Окно с авторизацией Airflow
Логин и пароль мы указали в values.yaml, в пункте defaultUser.
Заключение
Мы рассмотрели минимальный набор сущностей, необходимый для запуска Airflow. Однако некоторые моменты требуют доработки: секреты лежат в yaml файле, логи будут удаляться после передеплоя, нет синхронизации с репозиторием с DAG, доступ не по https и т.д. Как это исправить, обсудим в следующей части.