RabbitMQ Direct Reply-to. RPC поверх кролика без дополнительных очередей (пример на Python)

Реализацией RPC запросов поверх брокеров сообщений никого не удивишь: очередь для запроса, очередь для ответа — ничего сложного.

Тот же RabbitMQ имеет пример в официальной документации. Других примеров там нет, поэтому создается впечатление, что отправка ответных сообщений в другую очередь — единственный возможный способ реализации RPC.

Этот сценарий отлично работает когда у нас есть непрерывный поток сообщений и непрерывный поток ответов на них. Однако, данный подход не применим в случаях, когда нам нужно отправить только одно сообщение и получить ответ именно на него. Мы сразу же попадаем в какой-то ад с фильтрацией ответов по correlation_id.

На самом деле, в RabbitMQ есть механизм и для такого сценария. Но он спрятан в недрах документации и о нем почти нет информации в интернете (особенно рабочих примеров кода).

Вот это недоразумение мы сейчас и исправим.

rpc

P.S: Здесь я не буду объяснять, кто такой этот ваш RabbitMQ и зачем он нужен: эту информацию вы можете найти в другой моей статье.


Direct Reply-TO

Работает все достаточно просто, за исключением некоторых нюансов, которые всплывают на практике.

Концепция заключается в следующем:


  • мы подписываемся на специальную псевдоочередь amqp.rabbitmq.reply-to
  • отправляем сообщение с указанием этой очереди в качестве reply-to заголовка
  • кролик генерирует для нас уникальный routing_key, по которому будет должно быть опубликовано ответное сообщение в default exchange
  • сервер получает наше сообщение и отправляет ответ по этому routing_key.

Нет нужды создавать какие-либо дополнительные очереди, нет дополнительных расходов на управление ими со стороны RMQ. Это абсолютно win-to-win механизм.

Алгоритм действий:

Со стороны клиента:


  • (СНАЧАЛА) подписываемся на очередь с волшебным названием amqp.rabbitmq.reply-to в no-ack режиме, объявлять ее не нужно
  • отправляем сообщение с указанием заголовка reply-to = amqp.rabbitmq.reply-to

Со стороны сервера:


  • получаем сообщение. В нем, в качестве reply-to заголовка будет нечто вида amqp.rabbitmq.reply-to.
  • отправляем ответ в default exchange с reply-to значением в качестве ключа маршрутизации

На этом, в принципе, все. Однако каждое слово в этом алгоритме важно: сначала отправили, потом подписались — провал, попытались объявить очередь — провал, подписались в режиме ack — снова провал и т.д.

Поэтому мне пришлось потратить некоторое количество времени на написание рабочего кода. Давайте перейдем к нему, чтобы разобраться подробнее?


Python Example

Пример будет приведен с использование библиотеки aio-pika так как свою реализацию я писал имеенно на ней.


Пишем сервер

Сначала напишем некий бойлерплейт для подключения к очереди:

import asyncio
from functools import partial
import aio_pika

async def consumer(
    msg: aio_pika.IncomingMessage,
    channel: aio_pika.RobustChannelб
):
    ...

async def main():
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/"
    )

    queue_name = "test"

    async with connection:
        channel = await connection.channel()
        queue = await channel.declare_queue(queue_name)
        # через partial прокидываем в наш обработчик сам канал
        await queue.consume(partial(consumer, channel=channel))

        try:
            await asyncio.Future()
        except Exception:
            pass

asyncio.run(main())

А теперь перейдем к нашей функции-обработчику:

async def consumer(
    msg: aio_pika.IncomingMessage,
    channel: aio_pika.RobustChannel,
):
    # используем контекстный менеджер для ack'а сообщения
    async with msg.process():
        print(msg.body)

        # проверяем, требует ли сообщение ответа
        if msg.reply_to:
            # отправляем ответ в default exchange
            await channel.default_exchange.publish(
                message=aio_pika.Message(
                    body=b"hi!",
                    correlation_id=msg.correlation_id,
                ),
                routing_key=msg.reply_to,  # самое важное
            )

Как вы видите, действительно ничего сложного.


Пишем клиент

А вот тут будет немного веселья. Наша цель сделать такой же просто интерфейс как у requests:

data = requests.get("https://my-url.com").json()

Однако, это не так просто. Помните, что сначала нужно подписаться на ответную очередь? Так мы получаем следующий код:

import asyncio
import aio_pika

RABBIT_REPLY = "amq.rabbitmq.reply-to"

async def consume_response(msg: aio_pika.IncomingMessage):
    print(msg.body)

async def main():
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/"
    )

    async with connection:
        channel = await connection.channel()

        callback_queue = await channel.get_queue(RABBIT_REPLY)

        # сначала подписываемся
        consumer_tag = await callback_queue.consume(
            callback=consume_response,
            no_ack=True,  # еще один важный нюанс
        )

        # потом публикуем
        await channel.default_exchange.publish(
            message=aio_pika.Message(
                body=b"hello",
                reply_to=RABBIT_REPLY  # указываем очередь для ответа
            ),
            routing_key="test"
        )

asyncio.run(main())

Так мы получаем ответное сообщение в нашу функцию-обработчик. Однако, теперь его нужно как-то достать оттуда. Для этого будем использовать asyncio.Queue.

import asyncio
import aio_pika

RABBIT_REPLY = "amq.rabbitmq.reply-to"

async def main():
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/"
    )

    async with connection:
        channel = await connection.channel()

        callback_queue = await channel.get_queue(RABBIT_REPLY)

        # создаем asyncio.Queue для ответа
        rq = asyncio.Queue(maxsize=1)

        # сначала подписываемся
        consumer_tag = await callback_queue.consume(
            callback=rq.put,  # помещаем сообщение в asyncio.Queue
            no_ack=True,  # еще один важный нюанс
        )

        # потом публикуем
        await channel.default_exchange.publish(
            message=aio_pika.Message(
                body=b"hello",
                reply_to=RABBIT_REPLY  # указываем очередь для ответа
            ),
            routing_key="test"
        )

        # получаем ответ из asyncio.Queue
        response = await rq.get()
        print(response.body)

        # освобождаем RABBIT_REPLY
        await callback_queue.cancel(consumer_tag)

asyncio.run(main())

Теперь у нас уже есть что-то похожее на синхронный запрос-ответ. Можно немного поколдовать над интерфейсами и вы получите RPC over RMQ запрос, идентичный натуральному requests.


Вместо заключения

Ну, а я уже поколдовал над этими интерфейсами. И вы можете увидеть результат этого колдовства в моем фреймворке Propan.

С его использование RPC запросы будут выглядеть для вас следующим образом:

from propan import PropanApp, RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = PropanApp(rabbit_broker)

# server side
@broker.handle("ping")
async def heartbeat():
    return "pong"

@app.after_startup
async def self_ping():
    # client RPC request
    response = await broker.publish(queue="ping", callback=True)
    assert response == "pong"

И теперь вы точно знаете, что у них под капотом.


Нюансы

RabbitMQ Direct Reply-to действительно отличный механзим, однако и у него есть ограничения.

На псевдочередь amqp.rabbitmq.reply-to можно подписываться из разных сервисов неограниченное число раз одновременно, однако, если вы хотите отправить несколько разных запросов в рамках одного сервиса (одного connection, если быть точным) одновременно, у вас не получится это сделать: вы словите ошибку, что очередь уже имеет потребителя.

Поэтому в рамках одного сервиса необходимо использовать локи на отправку RPC запросов, что, к слову, также реализовано в Propan.

© Habrahabr.ru