Отложенные ретраи силами RabbitMQ

Меня зовут Алексей Казаков, я техлид команды Клиентских коммуникаций в ДомКлике. В этой статье я хочу поделиться с вами «рецептом», который позволил нам реализовать отложенные ретраи при использовании брокера сообщений RabbitMQ


rabbit_retry


Введение

В ДомКлик существенная часть взаимодействия между сервисами реализована асинхронно за счет брокера сообщений RabbitMQ. Типичная схема взаимодействия выглядит так.

schema_0


  • Сервис A на своём RabbitMQ-virtual_host (service_a_vh):
    • создаёт RabbitMQ-exchange (service_a_inner_exch), в который другие сервисы будут публиковать сообщения-задачи для сервиса A,
    • создаёт RabbitMQ-queue (service_a_input_q), из которой сообщения будут попадать в сервис A,
    • связывает service_a_input_q с service_a_inner_exch.
  • Сервис B, получив доступ к service_a_vh, публикует в service_a_inner_exch сообщения, которые должны быть обработаны сервисом А.

Обычно сервису A нужны результаты выполнения опубликованных задач. Для этого создаётся обратный RabbitMQ-exchange, в который сервис A публикует результаты, а другие сервисы посредством RabbitMQ-routing_key получают только нужные им данные. Но для нашего «рецепта» это будет не нужно.

Отличные руководства по RabbitMQ можно найти на их сайте.


Постановка проблемы

Наша команда занимается доставкой всевозможных СМС/пушей/писем до клиентов, и для этих целей мы используем сторонних провайдеров, которые не входят в зону нашей ответственности. В общем случае схема выглядит так. Сервис A синхронно взаимодействует по HTTP с внешним сервисом E. Иногда сервис E может испытывать проблемы и не отвечать/таймаутить/пятисотить. Если несколько HTTP-ретраев с возрастающей задержкой не помогают и сервис E по-прежнему отказывается корректно работать, то что делать с сообщением?

RabbitMQ позволяет сделать reject with requeue, что вернет задачу в очередь и она не потеряется. Проблема заключается в том, что эта же задача очень быстро (~100 раз в секунду) снова попадет в consumer, и так мы будем порождать лишнюю нагрузку на сервис E (реальный случай из практики).


Возможные решения

1) Хранить сообщение в памяти приложения, продолжая ретраить.

Недостатки:


  • Если consumer однопоточный, то таким образом мы блокируем выполнение других задач из очереди, а сервис E может испытывать проблемы именно с конкретной задачей.
  • Хранить задачу в памяти приложения, пока идут ретраи (а это могут быть десятки минут), не выглядит хорошей идеей.

2) С помощью механизма RabbitMQ-dead_letter_exchange сохранять задачи до лучших времен в отдельной очереди мертвых задач и считывать их оттуда отдельным consumer-ом.

Недостатки:


  • Ручной запуск дополнительного consumer требует вмешательства программиста.
  • Автоматический запуск и остановка — нетривиальная задача, которая требует лишнего кода.

3) Сохранять таски в базе, откуда снова доставлять их в consumer по истечении таймаута.

Недостатки:


  • Нужно писать код, который будет этим заниматься.


Выбранное нами решение

Последний вариант привлекателен тем, что тот же самый consumer будет заниматься обработкой задач. Вот бы ещё избавиться от необходимости работать с базой, ведь «Лучший код — не написанный код».

К счастью, можно реализовать механизм отложенных ретраев исключительно средствами RabbitMQ.

Для начала нужно узнать, что в RabbitMQ есть очереди с таймаутами: при создании очереди можно указать аргумент x-message-ttl, определяющий, сколько миллисекунд сообщение просуществует в очереди, прежде чем будет помечено «мертвым».

Ниже приведу схему очередей, описание маршрута задачи и минимальный код на Python, который позволит вам воспроизвести схему.

scheme_1

Все элементы схемы уже описаны ранее за исключением пути от dead_letter_queue в service_a_inner_exch. Такая «петля» получается за счет того, что для dead_letter_queue в качестве dead letter exchange мы указываем service_a_inner_exch. В этом и заключается основная идея. Мы зацикливаем путь сообщения, отправляя его после таймаута из dead_letter_queue снова в исходный exchange.

Путь задачи:


  • сервис B публикует сообщение в service_a_inner_exch,
  • сообщение попадает в очередь service_a_input_q,
  • сервис A не может обработать сообщение и делает reject,
  • сообщение попадает в dead_letter_exchange,
  • а оттуда сразу в dead_letter_queue,
  • в этой очереди сообщение проведет 5 минут и потом будет помечено «мертвым»,
  • «мертвое» сообщение попадает в dead letter exchange очереди dead_letter_queue, а это service_a_inner_exch.

Количество «кругов», которые проходит одна задача, можно ограничить с помощью анализа заголовков, которые изменяются при прохождении dead letter exchange. Это будет показано в примере кода ниже.

Код написан на Python 3.6.2 с использованием библиотеки pika==0.10.0.


publisher.py
import pika

import settings

def init_rmq():
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=settings.RMQ_HOST,
        port=settings.RMQ_PORT,
        virtual_host=settings.RMQ_VHOST,
        credentials=pika.PlainCredentials(settings.RMQ_USERNAME, settings.RMQ_PASSWORD),
    ))
    channel = connection.channel()

    channel.exchange_declare(exchange=settings.RMQ_INPUT_EXCHANGE, exchange_type='fanout')

    return channel, connection

if __name__ == '__main__':
    channel, connection = init_rmq()

    channel.basic_publish(exchange=settings.RMQ_INPUT_EXCHANGE, routing_key='', body='message from rmq')
    connection.close()


consumer.py
import logging

import pika

import settings

logger = logging.getLogger(__name__)

def init_rmq():
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=settings.RMQ_HOST,
        port=settings.RMQ_PORT,
        virtual_host=settings.RMQ_VHOST,
        credentials=pika.PlainCredentials(settings.RMQ_USERNAME, settings.RMQ_PASSWORD),
    ))
    channel = connection.channel()

    # создаем service_a_inner_exch
    channel.exchange_declare(exchange=settings.RMQ_INPUT_EXCHANGE, exchange_type='fanout')

    # создаем dead_letter_exchange
    channel.exchange_declare(exchange=settings.RMQ_DEAD_EXCHANGE, exchange_type='fanout')

    # создаем service_a_input_q
    channel.queue_declare(
        queue=settings.RMQ_INPUT_QUEUE,
        durable=True,
        arguments={
            # благодаря этому аргументу сообщения из service_a_input_q
            # при nack-е будут попадать в dead_letter_exchange
            'x-dead-letter-exchange': settings.RMQ_DEAD_EXCHANGE,
        }
    )

    # создаем очередь для "мертвых" сообщений
    channel.queue_declare(
        queue=settings.RMQ_DEAD_QUEUE,
        durable=True,
        arguments={
            # благодаря этому аргументу сообщения из service_a_input_q
            # при nack-е будут попадать в dead_letter_exchange
            'x-message-ttl': settings.RMQ_DEAD_TTL,
            # также не забываем, что у очереди "мертвых" сообщений
            # должен быть свой dead letter exchange
            'x-dead-letter-exchange': settings.RMQ_INPUT_EXCHANGE,
        }
    )
    # связываем очередь "мертвых" сообщений с dead_letter_exchange
    channel.queue_bind(
        exchange=settings.RMQ_DEAD_EXCHANGE,
        queue=settings.RMQ_DEAD_QUEUE,
    )

    # связываем основную очередь с входным exchange
    channel.queue_bind(settings.RMQ_INPUT_QUEUE, settings.RMQ_INPUT_EXCHANGE)

    return channel

def callback(ch, method, properties, body):
    logger.info('Processing message `%s`', body)
    if can_retry(properties):
        logger.warning('Retrying message')
        # requeue=False отправит сообщение не в исходную очередь, а в dead letter exchange
        ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
        return

    logger.error('Can`t retry, drop message')
    ch.basic_ack(delivery_tag=method.delivery_tag)

def can_retry(properties):
    """
    Заголовок x-death проставляется при прохождении сообщения через dead letter exchange.
    С его помощью можно понять, какой "круг" совершает сообщение.
    """
    deaths = (properties.headers or {}).get('x-death')
    if not deaths:
        return True
    if deaths[0]['count'] >= settings.RETRY_COUNT:
        return False
    return True

if __name__ == '__main__':
    channel = init_rmq()

    logger.info('Consuming.')
    channel.basic_consume(
        queue=settings.RMQ_INPUT_QUEUE, consumer_callback=callback,
    )
    channel.start_consuming()


settings.py
import logging.config

RMQ_HOST = ''
RMQ_PORT = 5672
RMQ_VHOST = ''
RMQ_USERNAME = ''
RMQ_PASSWORD = ''
RMQ_INPUT_EXCHANGE = ''
RMQ_INPUT_QUEUE = ''
RMQ_DEAD_EXCHANGE = ''
RMQ_DEAD_QUEUE = ''
RMQ_DEAD_TTL = 60 * 1000  # 1 секунда
RETRY_COUNT = 2

dict_config = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'detailed': {
            'class': 'logging.Formatter',
            'format': '%(asctime)s %(levelname)s %(name)s: %(message)s'
        }
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'level': 'INFO',
            'formatter': 'detailed',
        },
    },
    'root': {
        'level': 'INFO',
        'handlers': ['console']
    },
}

logging.config.dictConfig(dict_config)

Если в settings.py вы укажете необходимые данные для подключения к RabbitMQ, то последовательный запуск consumer.py и publisher.py выдаст следующий лог:

...
2020-05-02 12:16:32,260 INFO __main__: Consuming.
2020-05-02 12:16:35,233 INFO __main__: Processing message `b'message from rmq'`
2020-05-02 12:16:35,233 WARNING __main__: Retrying message
2020-05-02 12:17:35,241 INFO __main__: Processing message `b'message from rmq'`
2020-05-02 12:17:35,241 WARNING __main__: Retrying message
2020-05-02 12:18:35,249 INFO __main__: Processing message `b'message from rmq'`
2020-05-02 12:18:35,250 ERROR __main__: Can`t retry, drop message
...

Т.е. код создаст схему, показанную на рисунке, отправит одно сообщение в систему, попытается трижды его обработать и отбросит после двух ретраев.


Возможные улучшения. Разные таймауты

В качестве расширения функциональности предложенной схемы можно рассмотреть создание нескольких dead letter queue с разными таймаутами. После прохождения через через dead letter exchange:


  • routing key сохраняется, поэтому можно использовать topics-exchange для направления сообщений в разные dead letter queue в зависимости от исходного значения routing key,
  • заголовки дополняются, поэтому можно использовать headers-exchange для направления сообщений в разные dead letter queue в зависимости от исходных заголовков сообщения.


Возможные улучшения. Несколько consumer-ов

Если у вас с service_a_inner_exch связано несколько очередей, предназначенных для разных consumer-ов, то предложенная схема должна быть доработана. Например, у вас есть еще один сервис A_another, читающий из очереди service_a_another_input_q, связанной с service_a_inner_exch. Тогда текущая «петля» отправит сообщение повторно в обе очереди, и оба сервиса получат его повторно. Чтобы этого избежать, можно завести отдельный exchange dead_inner_exch, как показано на рисунке ниже.

scheme_2


Заключение

Описанное решение позволяет реализовать произвольное количество отложенных ретраев с равными промежутками между попытками. Для этого требуется минимальный дополнительный код, а вся логика по задержке и повторной доставке выполняется кластером RabbitMQ.

Эта схема успешно эксплуатируется примерно 7 месяцев, неоднократно спасала при проблемах с сервисом E и ни разу не потребовала ручного вмешательства в свою работу. Условия эксплуатации: RabbitMQ 3.6.12, 4 RPS в среднем, с пиками до 40 RPS.

Надеюсь, эта статья поможет какому-нибудь программисту крепче спать по ночам.

© Habrahabr.ru