Запуск кластера RabbitMQ в Kubernetes

При микросервисной организации приложения существенная работа ложится на механизмы интеграционной связи микросервисов. Причем эта интеграция должна быть отказоустойчива, с высокой степенью доступности.

В наших решениях мы используем интеграцию и с помощью Kafka, и с помощью gRPC, и с помощью RabbitMQ.

В этой статье мы поделимся нашим опытом кластеризации RabbitMQ, ноды которого размещены в Kubernetes.

image

До RabbitMQ версии 3.7 его кластеризация в K8S была не очень тривиальной задачей, со множеством хаков и не очень красивых решений. В версии 3.6 использовался autocluster плагин из RabbitMQ Community. А в 3.7 появился Kubernetes Peer Discovery Backend. Он встроен плагином в базовую поставку RabbitMQ и не требует отдельной сборки и установки.

Мы опишем итоговую конфигурацию целиком, попутно комментируя происходящее.

В теории


У плагина существует репозиторий на гитхабе, в котором есть пример базового использования.
Этот пример не предназначен для Production, о чём явно указано в его описании, и более того, часть настроек в нём установлено вразрез с логикой использования в проде. Также в примере никак не упомянута персистентность хранилища, таким образом при любой нештатной ситуации наш кластер превратится в пшик.

На практике


Сейчас расскажем, с чем столкнулись сами и как установили и настроили RabbitMQ.

Опишем конфигурации всех частей RabbitMQ как сервиса в K8s. Сразу уточним, что мы устанавливали RabbitMQ в K8s как StatefulSet. На каждой ноде кластера K8s будет всегда функционировать один экземпляр RabbitMQ (одна нода в классической конфигурации кластера). Мы также установим в K8s панель управления RabbitMQ и дадим доступ до этой панели за пределы кластера.

Права и роли:


rabbitmq_rbac.yaml
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: rabbitmq 
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
  name: endpoint-reader
rules:
- apiGroups: [""]
  resources: ["endpoints"]
  verbs: ["get"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
  name: endpoint-reader
subjects:
- kind: ServiceAccount
  name: rabbitmq
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: endpoint-reader


Права доступа для RabbitMQ взяты целиком из примера, никаких изменений в них не требуется. Создаём ServiceAccount для нашего кластера и выдаём ему права на чтение Endpoints K8s.

Персистентное хранилище:


rabbitmq_pv.yaml
kind: PersistentVolume
apiVersion: v1
metadata:
  name: rabbitmq-data-sigma
  labels:
    type: local
  annotations:
    volume.alpha.kubernetes.io/storage-class: rabbitmq-data-sigma
spec:
  storageClassName: rabbitmq-data-sigma
  capacity:
    storage: 10Gi
  accessModes:
    - ReadWriteMany
  persistentVolumeReclaimPolicy: Recycle
  hostPath:
    path: "/opt/rabbitmq-data-sigma"


В качестве персистентного хранилища здесь мы взяли самый простой случай — hostPath (обычную папку на каждой ноде K8s), но можно использовать любой из множества типов персистентных томов, поддерживаемых в K8s.

rabbitmq_pvc.yaml
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: rabbitmq-data
spec:
  storageClassName: rabbitmq-data-sigma
  accessModes:
    - ReadWriteMany
  resources:
    requests:
      storage: 10Gi


Создаём Volume Claim на томе, созданном в предыдущем шаге. Этот Claim затем будет использоваться в StatefulSet как хранилище постоянных данных.

Сервисы:


rabbitmq_service.yaml
kind: Service
apiVersion: v1
metadata:
  name: rabbitmq-internal
  labels:
    app: rabbitmq
spec:
  clusterIP: None
  ports:
   - name: http
     protocol: TCP
     port: 15672
   - name: amqp
     protocol: TCP
     port: 5672
  selector:
    app: rabbitmq


Создаём внутренний headless сервис, через который будет работать Peer Discovery plugin.

rabbitmq_service_ext.yaml
kind: Service
apiVersion: v1
metadata:
  name: rabbitmq
  labels:
    app: rabbitmq
    type: LoadBalancer
spec:
  type: NodePort
  ports:
   - name: http
     protocol: TCP
     port: 15672
     targetPort: 15672
     nodePort: 31673
   - name: amqp
     protocol: TCP
     port: 5672
     targetPort: 5672
     nodePort: 30673
  selector:
    app: rabbitmq


Для работы приложений в K8s с нашим кластером создаём сервис балансировщика.
Так как нам нужен доступ к кластеру RabbitMQ снаружи K8s, прокидываем NodePort. RabbitMQ будет доступен при обращении к любой ноде кластера K8s по портам 31673 и 30673. В реальной работе большой необходимости в этом нет. Вопрос удобства пользования админкой RabbitMQ.
При создании сервиса с типом NodePort в K8s также неявно создаётся сервис с типом ClusterIP для его обслуживания. Поэтому приложения в K8s, которым нужно работать с нашим RabbitMQ, смогут обращаться к кластеру по адресу amqp://rabbitmq:5672

Конфигурация:


rabbitmq_configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: rabbitmq-config
data:
  enabled_plugins: |
      [rabbitmq_management,rabbitmq_peer_discovery_k8s].

  rabbitmq.conf: |
      cluster_formation.peer_discovery_backend  = rabbit_peer_discovery_k8s
      cluster_formation.k8s.host = kubernetes.default.svc.cluster.local
      cluster_formation.k8s.port = 443
      ### cluster_formation.k8s.address_type = ip
      cluster_formation.k8s.address_type = hostname
      cluster_formation.node_cleanup.interval = 10
      cluster_formation.node_cleanup.only_log_warning = true
      cluster_partition_handling = autoheal
      queue_master_locator=min-masters
      cluster_formation.randomized_startup_delay_range.min = 0
      cluster_formation.randomized_startup_delay_range.max = 2
      cluster_formation.k8s.service_name = rabbitmq-internal
      cluster_formation.k8s.hostname_suffix = .rabbitmq-internal.our-namespace.svc.cluster.local


Создаём конфигурационные файлы RabbitMQ. Основная магия.

enabled_plugins: |
  [rabbitmq_management,rabbitmq_peer_discovery_k8s].


Добавляем нужные плагины в разрешенные к загрузке. Теперь мы можем использовать автоматический Peer Discovery в K8S.

cluster_formation.peer_discovery_backend  = rabbit_peer_discovery_k8s


Выставляем в качестве backend для peer discovery нужный плагин.

cluster_formation.k8s.host = kubernetes.default.svc.cluster.local
cluster_formation.k8s.port = 443


Указываем адрес и порт, через которые можно достучаться до kubernetes apiserver. Здесь можно указать напрямую ip-адрес, но более красиво будет сделать так.
В namespace default обычно создан service с именем kubernetes, ведущий на k8-apiserver. В разных вариантах установки K8S namespace, имя сервиса и порт могут быть другими. Если что-то в конкретной установке отличается, нужно, соответственно, поправить здесь.

Например, мы столкнулись с тем, что в некоторых кластерах сервис на порту 443, а в некоторых на 6443. Понять, что что-то не так, можно будет в логах старта RabbitMQ, там явно выделен момент подключения по указанному здесь адресу.

### cluster_formation.k8s.address_type = ip
cluster_formation.k8s.address_type = hostname


По умолчанию в примере был указан тип адресации нод кластера RabbitMQ по ip-адресу. Но при перезапуске pod он каждый раз получает новый IP. Сюрприз! Кластер умирает в муках.
Меняем адресацию на hostname. StatefulSet гарантирует нам неизменность hostname в рамках жизненного цикла всего StatefulSet, что нас полностью устроит.

cluster_formation.node_cleanup.interval = 10
cluster_formation.node_cleanup.only_log_warning = true


Поскольку при потере одной из нод мы предполагаем, что она рано или поздно восстановится, отключаем самоудаление кластером недоступных нод. В этом случае, как только нода вернётся в онлайн, она войдёт в кластер без потери своего предыдущего состояния.

cluster_partition_handling = autoheal


Этим параметром определяем действия кластера при потере кворума. Тут стоит просто почитать документацию по этой теме и понять для себя, что ближе к конкретному сценарию использования.

queue_master_locator=min-masters


Определяем выбор мастера для новых очередей. При данной настройке мастером будет выбираться нода с наименьшим количеством очередей, таким образом очереди будут распределяться равномерно по нодам кластера.

cluster_formation.k8s.service_name = rabbitmq-internal


Задаём имя headless сервиса K8s (созданного нами ранее), через который ноды RabbitMQ будут общаться между собой.

cluster_formation.k8s.hostname_suffix = .rabbitmq-internal.our-namespace.svc.cluster.local


Важная штука для работы адресации в кластере по hostname. FQDN пода K8s формируется как короткое имя (rabbitmq-0, rabbitmq-1) + суффикс (доменная часть). Здесь мы и указываем этот суффикс. В K8S он выглядит как .<имя сервиса>.<имя namespace>.svc.cluster.local

kube-dns без какой-либо дополнительной настройки резолвит имена вида rabbitmq-0.rabbitmq-internal.our-namespace.svc.cluster.local в ip-адрес конкретного пода, что и делает возможной всю магию кластеризации по hostname.

Конфигурация StatefulSet RabbitMQ:


rabbitmq_statefulset.yaml
apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
  name: rabbitmq
spec:
  serviceName: rabbitmq-internal
  replicas: 3
  template:
    metadata:
      labels:
        app: rabbitmq
      annotations:
        scheduler.alpha.kubernetes.io/affinity: >
            {
              "podAntiAffinity": {
                "requiredDuringSchedulingIgnoredDuringExecution": [{
                  "labelSelector": {
                    "matchExpressions": [{
                      "key": "app",
                      "operator": "In",
                      "values": ["rabbitmq"]
                    }]
                  },
                  "topologyKey": "kubernetes.io/hostname"
                }]
              }
            }
    spec:
      serviceAccountName: rabbitmq
      terminationGracePeriodSeconds: 10
      containers:        
      - name: rabbitmq-k8s
        image: rabbitmq:3.7
        volumeMounts:
          - name: config-volume
            mountPath: /etc/rabbitmq
          - name: rabbitmq-data
            mountPath: /var/lib/rabbitmq/mnesia
        ports:
          - name: http
            protocol: TCP
            containerPort: 15672
          - name: amqp
            protocol: TCP
            containerPort: 5672
        livenessProbe:
          exec:
            command: ["rabbitmqctl", "status"]
          initialDelaySeconds: 60
          periodSeconds: 10
          timeoutSeconds: 10
        readinessProbe:
          exec:
            command: ["rabbitmqctl", "status"]
          initialDelaySeconds: 10
          periodSeconds: 10
          timeoutSeconds: 10
        imagePullPolicy: Always
        env:
          - name: MY_POD_IP
            valueFrom:
              fieldRef:
                fieldPath: status.podIP
          - name: HOSTNAME
            valueFrom:
              fieldRef:
                fieldPath: metadata.name
          - name: NAMESPACE
            valueFrom:
              fieldRef:
                fieldPath: metadata.namespace
          - name: RABBITMQ_USE_LONGNAME
            value: "true"
          - name: RABBITMQ_NODENAME
            value: "rabbit@$(HOSTNAME).rabbitmq-internal.$(NAMESPACE).svc.cluster.local"
          - name: K8S_SERVICE_NAME
            value: "rabbitmq-internal"
          - name: RABBITMQ_ERLANG_COOKIE
            value: "mycookie"
      volumes:
        - name: config-volume
          configMap:
            name: rabbitmq-config
            items:
            - key: rabbitmq.conf
              path: rabbitmq.conf
            - key: enabled_plugins
              path: enabled_plugins
        - name: rabbitmq-data
          persistentVolumeClaim:
            claimName: rabbitmq-data


Собственно, сам StatefulSet. Отметим интересные моменты.

serviceName: rabbitmq-internal


Прописываем имя headless-сервиса, через который общаются поды в StatefulSet.

replicas: 3


Задаём количество реплик в кластере. У нас оно равно числу рабочих нод K8s.

annotations:
        scheduler.alpha.kubernetes.io/affinity: >
            {
              "podAntiAffinity": {
                "requiredDuringSchedulingIgnoredDuringExecution": [{
                  "labelSelector": {
                    "matchExpressions": [{
                      "key": "app",
                      "operator": "In",
                      "values": ["rabbitmq"]
                    }]
                  },
                  "topologyKey": "kubernetes.io/hostname"
                }]
              }
            }


При падении одной из нод K8s statefulset стремится сохранить количество экземпляров в наборе, поэтому создаёт по нескольку подов на одной и той же ноде K8s. Это поведение совершенно нежелательно и в принципе бессмысленно. Поэтому мы прописываем anti-affinity правило для подов из statefulset. Правило делаем жестким (Required), чтобы kube-scheduler не мог его нарушать при планировании подов.

Суть проста: планировщику запрещено размещать (в пределах namespace) более одного пода с тегом app: rabbitmq на каждой ноде. Ноды различаем по значению метки kubernetes.io/hostname. Теперь если по какой-то причине число работающих нод K8S меньше, чем требуемое количество реплик в StatefulSet, новые реплики не будут создаваться, пока снова не появится свободная нода.

serviceAccountName: rabbitmq


Прописываем ServiceAccount, под которым работают наши поды.

image: rabbitmq:3.7


Образ RabbitMQ совершенно стандартный и берётся с docker hub, не требует никакой пересборки и доработки напильником.

- name: rabbitmq-data
    mountPath: /var/lib/rabbitmq/mnesia


Персистентные данные у RabbitMQ хранятся в /var/lib/rabbitmq/mnesia. Здесь мы монтируем наш Persistent Volume Claim в эту папку, чтобы при перезапуске подов/нод или даже всего StatefulSet данные (как служебные, в том числе о собранном кластере, так и пользовательские) оставались в целости и сохранности. Можно встретить некоторые примеры, когда персистентной делают папку /var/lib/rabbitmq/ целиком. Мы пришли к выводу, что это не самая лучшая идея, так как при этом начинает запоминаться и вся информация, заданная конфигами Rabbit. То есть для того, чтобы изменить что-то в конфигурационном файле, требуется почистить персистентное хранилище, что очень неудобно в эксплуатации.

          - name: HOSTNAME
            valueFrom:
              fieldRef:
                fieldPath: metadata.name
          - name: NAMESPACE
            valueFrom:
              fieldRef:
                fieldPath: metadata.namespace
          - name: RABBITMQ_USE_LONGNAME
            value: "true"
          - name: RABBITMQ_NODENAME
            value: "rabbit@$(HOSTNAME).rabbitmq-internal.$(NAMESPACE).svc.cluster.local"


Этим набором переменных окружения мы, во-первых, говорим RabbitMQ использовать в качестве идентификатора членов кластера FQDN-имя, а во-вторых, задаём формат этого имени. Формат описывался ранее при разборе конфига.

- name: K8S_SERVICE_NAME
            value: "rabbitmq-internal"


Имя headless сервиса для общения членов кластера.

- name: RABBITMQ_ERLANG_COOKIE
            value: "mycookie"


Содержимое Erlang Cookie должно быть одинаковым на всех нодах кластера, нужно прописать ваше собственное значение. Нода с отличающимся cookie не сможет войти в кластер.

volumes:
        - name: rabbitmq-data
          persistentVolumeClaim:
            claimName: rabbitmq-data


Определяем подключаемый том из созданного ранее Persistent Volume Claim.

На этом мы закончили с настройкой в K8s. В результате получился кластер RabbitMQ, равномерно распределяющий очереди по нодам и устойчивый к проблемам в среде выполнения.

image

При недоступности одной из нод кластера, очереди, содержащиеся на ней, перестанут быть доступны, всё остальное продолжит работу. Как только нода вернётся в строй, она вернётся в кластер, и очереди, для которых она была Master’ом, снова станут работоспособными с сохранением всех содержащихся в них данных (если не сломалось персистентное хранилище, разумеется). Все эти процессы проходят полностью автоматически и не требуют вмешательства.

Бонус: настраиваем HA


В одном из проектов был нюанс. В требованиях звучало полное зеркалирование всех содержащихся в кластере данных. Это нужно, чтобы в ситуации, когда хотя бы одна нода кластера работоспособна, с точки зрения прикладного приложения всё продолжало работать. Этот момент никак не связан именно с K8s, описываем просто в качестве mini how-to.

Для включения полного HA необходимо в RabbitMQ dashboard на вкладке Admin → Policies создать Policy. Имя произвольное, Pattern пустой (все очереди), в Definitions добавить два параметра: ha-mode: all, ha-sync-mode: automatic.

image

image

После этого все создаваемые в кластере очереди будут находиться в режиме High Availability: при недоступности Master-ноды новым мастером автоматически будет выбираться один из Slave«ов. А данные, поступающие в очередь, будут зеркалироваться на все ноды кластера. Что, собственно, и требовалось получить.

image

Подробнее прочитать о HA в RabbitMQ можно тут

Полезная литература:

Успехов!

© Habrahabr.ru