Airflow в Kubernetes. Часть 2
Приветствую!
В прошлой части мы развернули основные сервисы Airflow. Однако у нас остались незакрытые моменты. Такие как:
Синхронизация списка DAG с удаленным репозиторием
Сохранение логов Worker
Настройка доступа из внешней сети для Webserver
В этой части пройдем эти вопросы. В репозиторий добавил код к этой части статьи.
Синхронизация списка DAG с удаленным репозиторием
В прошлый раз, для проверки работоспособности деплоя, мы использовали DAG, которые предоставляет Airflow в качестве примеров. Однако, в реальной жизни мы будем писать свои DAG, поэтому поставим .values.config.coreload_examples в false и посмотрим на инструмент git-sync.
Это приложение каждые несколько секунд синхронизирует ветку заданного удаленного репозитория с указанной директорией. Эту директорию также используем как volume (в примере он называется dags). После этого мы также монтируем этот volume в другой контейнер, а именно в контейнеры с Worker и Scheduler (рис. 1). Сам же Scheduler раз в некоторое время будет искать новые файлы в этой директории и регистрировать их в БД.
Рис. 1 Связь репозитория с Pod Worker
Т.к. это отдельное приложение, которое будет работать вместе с основным, нам необходим дополнительный контейнер в Pod worker. Давайте определим шаблон для git-sync в _helpers.yaml, как это сделано и в официальном helm chart. Только будем использовать более новую 4 версию, в нем отличаются наименования переменных окружения, но основной принцип такой же. Вот так будут выглядеть настройки для git-sync:
...
- name: git-sync
image: registry.k8s.io/git-sync/git-sync:v4.1.0
imagePullPolicy: IfNotPresent
securityContext:
runAsUser: 65533
env:
# Путь до ssh key
- name: GITSYNC_SSH_KEY_FILE
value: "/etc/git-secret/ssh"
# Отключаем верификацию хостов
- name: GITSYNC_SSH_KNOWN_HOSTS
value: "false"
# Наименование ветки, с которой будем синхронизироваться
- name: GITSYNC_REF
value: "master"
# Ссылка на репозиторий, с которым будем синхронизироваться
- name: GITSYNC_REPO
value: "git@github.com:Siplatov/dn-airflow.git"
# Директория для операций git-sync
- name: GITSYNC_ROOT
value: "/git"
# Наименование директории, в которой будет находится код из репозитория
- name: GITSYNC_LINK
value: "repo"
# Как часто синхронизироваться с репозиторием
- name: GITSYNC_PERIOD
value: "10s"
# Кол-во сбоев, после которых прерываем выполнение
- name: GITSYNC_MAX_FAILURES
value: "0"
volumeMounts:
- name: dags
mountPath: /git
- name: git-sync-ssh-key
mountPath: /etc/git-secret/ssh
readOnly: true
subPath: gitSshKey
volumes:
- name: config
configMap:
name: airflow-airflow-config
- name: dags
emptyDir: {}
- name: git-sync-ssh-key
secret:
secretName: airflow-ssh-secret
defaultMode: 256
Обратите внимание, что мы будем использовать определенного юзера для запуска приложения (65533). Это необходимо, чтобы был доступ к ssh ключу.
Также мы дополнительно монтируем том с секретом, который содержит приватный ssh ключ, который используется для git clone, в base64 кодировке. Для этого можно воспользоваться следующей командой:
base64 ~/.ssh/id_rsa -w 0 > temp.txt
И файле temp.txt появится необходимая запись, которую нужно будет вставить в git-sync-secret.yaml:
apiVersion: v1
kind: Secret
metadata:
name: airflow-ssh-secret
data:
gitSshKey:
Не публикуйте информацию закодированную в base64, т.к. её также легко и декодировать.
Давайте посмотрим, что находится внутри контейнера git-sync (рис. 2):
kubectl exec -it airflow-worker-0 -c git-sync -n airflow -- sh
Рис. 2 Содержание директории /git контейнера git
А также, что находится внутри директории с dags в Worker (рис. 3):
kubectl exec -it airflow-worker-0 -c worker -n airflow -- /bin/bash
Рис. 3 Содержание директории /opt/airflow/dags контейнера worker
На рисунках 2, 3 видно, что содержание директорий одинаковое. Так как мы синхронизируем весь репозиторий, то полный путь до тестового DAG, который находится в репозитории, будет таким: /opt/airflow/dags/repo/part2/dags/test_dag.py. Теперь мы можем создавать новые DAG’и в репозитории и они будут появляться в UI Airflow.
Сохранение логов Worker
Чтобы хранить данные где-либо, k8s использует volumes. Мы встречались с этой конструкцией в прошлой части, когда монтировали Secrets и ConfigMaps. В этот раз будем использовать volume, чтобы сохранять логи. Конечно, мы можем их просто сохранять в какую-нибудь директорию, но это не будет гарантировать нам сохранность логов после перезапуска Airflow. Было бы круто сохранять их на какие-то внешние диски, не связанные с состоянием кластера. Для этого нам нужно использовать volume. Но, чтобы понять как это делать, необходимо познакомиться с другими типами ресурсов kubernetes:
PersistentVolume (PV)
Это ресурс который резервирует место в определенном хранилище. Можно создать несколько PersistentVolume с разными типами хранилищ в одном кластере, например, для работы ssd и hdd дисками. Мы будем использовать yc-network-hdd. Также для PersistentVolume определяется accessModes— политика доступа к volume. Мы используем ReadWriteOnce, что означает, что запись и чтение может происходить только с одного узла (виртуальной машины).PersistentVolumeClaim (PVC)
Это абстракция, которая позволяет Pod запросить определенное кол-во пространства из PersistentVolume.
Получается, чтобы выделить место на диске для Pod, необходимо:
Создать PV с определенным типом хранилища и размером
Создать PVC, которое будет использовать часть пространства PV
Определить volume в манифесте Pod, который будет вызывать PVC
Чтобы уменьшить кол-во шагов существует ресурс Provisioner. Он позволяет динамически создавать PV для PVC. В Yandex Cloud нам не нужно ничего дополнительно настраивать и при создании PVC автоматически будет создаваться PV такого же класса и размера.
Так как Airflow worker (именно он пишет логи) развернут как StatefulSet, то нам не придется создавать PVC руками, мы укажем volumeClaimTemplates в манифесте StatefulSet. Это необходимо делать, потому что каждая реплика StatefulSet создает отдельный PVC (в отличии от Deployment). Давайте дополним наш helm шаблон для worker следующим образом:
{{- if not .Values.logs.persistence.enabled }}
- name: logs
emptyDir: {}
{{else}}
volumeClaimTemplates:
- metadata:
name: logs
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: {{ .Values.logs.persistence.size }}
{{end}}
Давайте проверим, что PVC и PV успешно создались (рис. 4):
kubectl get pvc -n airflow
Рис. 4 Проверка наличия PVC и PV
Давайте теперь проведем небольшой эксперимент, чтобы понять, как ведут себя PVC при удалении Pod. Для этого предлагаю запустить еще один Pod с worker, для этого выполним команду:
kubectl scale statefulsets airflow-worker --replicas=2 -n airflow
Рис. 5 Масштабирование workers
Если вывести PVC и PV, то увидим, что добавилось еще одно хранилище:
Рис. 6 Добавление PV и PVC
При возвращении кол-во реплик к одной, то новые PVC и PV не удалятся.
Таким образом, если что-то с одной из реплик случится, то логи все равно останутся и их можно будет читать после восстановления реплики. Схематично картину можно представить так:
Рис. 7 Схематичное изображение StatefulSet и PVC
Настройка доступа из внешней сети для Webser
В прошлый раз, чтобы получить доступ к UI Airflow мы использовали ресурс Service типа NodePort. Конечно, в продовом окружении мы хотели бы обращаться не через ip адрес, а по доменному имени и чтобы доступ был по https. Чтобы это реализовать, нам потребуется ресурс Ingress — это ресурс, в котором мы описываем правила управления трафиком:
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: airflow-ingress
labels:
release: airflow
annotations:
kubernetes.io/ingress.class: nginx
spec:
rules:
- host: 51.250.108.134.nip.io
http:
paths:
- backend:
service:
name: airflow-webserver
port:
name: airflow-ui
path: /
pathType: ImplementationSpecific
В rules достаточно легко проследить логику. Мы указываем, что хотим открывать страницу по адресу 51.250.108.134.nip.io (c помощью nip.io можно бесплатно дать hostname для ip адреса), откуда нас будет направлять в service airflow-webserver на порт airflow-ui (указывали в values — 8080).
Кроме этого, мы также указываем в annotations, информацию об ingress-controller. Зачем он нужен? Ingress-controller это тот компонент, который будет производить всю работу по маршрутизации трафика. Внутри он может содержать Nginx, Traefik и т.д. Т.е. в ingress мы просто описываем правила и указываем, какой ingress-controller хотим использовать, а уже ingress-controller эти правила реализует с помощью nginx (в нашем случае).
Но сначала необходимо установить данныйingress-controller. Это легко сделать с помощью helm:
helm repo add ingress-nginx https://kubernetes.github.io/ingress-nginx
helm repo update
helm upgrade --install ingress-nginx ingress-nginx/ingress-nginx -n ingress --create-namespace
Мы установили ingress-controller в отдельный namespace, чтобы логически отделить его от нашего основного сервиса. Давайте посмотрим, что будет создано в этом namespace:
Рис. 8 Содержимое namespace ingress
Среди этих ресурсов, нас интересует service, а именно его EXTERNAL-IP — 51.250.108.134 в нашем случае. Именно по этому адресу будет доступен webserver и его мы использовали как часть доменного имени в манифесте с ingress.
Итак, как только мы запускаем в кластере ресурс ingress, то благодаря аннотации kubernetes.io/ingress.class: nginx, установленный ingress-controller понимает, что мы хотим передавать трафик определенным образом и изменяет параметры приложения, которое используется для маршрутизации трафика.
P.S. Также в YC за ingress-controller будет стоять NetworkBalancer, однако вам не нужно его никак настраивать, он создастся автоматически.
TLS Certificate
Отлично, но доступ у нас только по http, а хотелось бы https. Для этого нам необходимо добавить несколько строчек с ingress, а также развернуть в кластере еще несколько манифестов. А именно cert-manager. Давайте сделаем это с помощью kubectl:
kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.13.2/cert-manager.yaml
Развернем ClusterIssuer:
kubectl apply -f cluster-issuer.yaml -n cert-manager
И добавим несколько строк в манифест с ingress:
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: airflow-ingress
labels:
release: airflow
annotations:
kubernetes.io/ingress.class: nginx
cert-manager.io/cluster-issuer: letsencrypt
spec:
rules:
# использую приобретенное доменное имя, т.к. с nip.io трудно получить сертификат (слишком много желающих для этого домена).
- host: airflow-test.data-notes.ru
http:
paths:
- backend:
service:
name: airflow-webserver
port:
name: airflow-ui
path: /
pathType: ImplementationSpecific
tls:
- hosts:
- airflowtest.data-notes.ru
secretName: ingress-webserver-secret
В аннотациях указываем issuer, а также указываем в разделе tls, где будем хранить секрет для нашего хоста.
Принцип с работы cert-manager похож на работу ingress-controller. Благодаря аннотации cert-manager.io/cluster-issuer: letsencrypt cert-manager понимает, что мы хотим использовать letsencrypt, чтобы получить сертификат. Получает его и сохраняет в указанный secret.
Если сертификат не хочет устанавливаться (например, если использовать host с nip.io) или необходимо узнать срок действия сертификата, то это можно сделать следующей командой:
kubectl describe certificate ingress-webserver-secret -n airflow
Если все выполнено правильно, то в браузере мы увидим заветный замочек:
Рис. 9 Проверка https соединения
Заключение
В этот раз мы настроили синхронизацию удаленного репозитория с директорией Airflow, реализовали перманентное хранение логов, а также настроили https соединение для Webserver. После данных преобразований, наш кластер можно изобразить следующим образом (рис. 10).
Рис. 10 Схематичное изображение кластера с Airflow