Как работать с MassTransit и RabbitMQ: реализуем RPC с Saga и без

Привет, Хабр! Меня зовут Евгений, я backend-разработчик SimbirSoft. В этой статье я разберу два варианта решения нетривиальной задачи создания RPC через брокер сообщений RabbitMQ и библиотеку MassTransit. Подробно разберём подключение MassTransit и работу с Saga. Тема будет полезна как для начинающих, так и опытных backend-разработчиков .NET.

Введение

В микросервисной архитектуре общение между отдельными сервисами как правило делится на два типа:

  1. Синхронное (Request-Response) применяется, когда на запрос нужен незамедлительный ответ (таймаут соединения в расчёт не берётся).

  2. Асинхронное (AMQP). Применяется, когда не нужен незамедлительный ответ.

64d728040d31876eb31990bacb88d61e.png

Мы рассмотрим работу брокера RabbitMQ и асинхронную модель взаимодействия сервисов. Подробнее о его особенностях в сравнении с его конкурентом Apache Kafka можно почитать здесь.

Итак, официальная документация RabbitMQ гласит, что брокер сообщений — это сервер промежуточного ПО между автором (Producer)  и получателем сообщений (Consumer), который выполняет функции получения сообщения от автора, маршрутизации и установки сообщений в очередь получателя сообщения.

Для наглядности ниже приведена принципиальная схема устройства брокера сообщений.

Источник: официальная документация RabbitMQ

При использовании AMQP асинхронность достигается за счет очередей сообщений. То есть если получатель сообщений (Consumer) занят, то ему не нужно всё бросать и хвататься обрабатывать сообщение, он сможет обработать сообщение из очереди, когда будет к этому готов.

Также стоит отметить, что брокер сообщения выполняет функцию маршрутизации. На рисунке ниже приведена схема взаимодействия сервисов без брокера сообщений, легко заметить, что тут царит хаос:

3df2da1a77d564bbb20b7611e24e0114.png

Применение брокера сообщений упорядочивает и упрощает взаимодействие сервисов между собой.

А вот представление схемы взаимодействия с картинки выше, если бы использовался брокер сообщений:

cc1a1ecce7ec0470d94f0d5760a7d78e.png

Существует две основные архитектуры брокеров сообщений:

1. Push-архитектура. Если у сервера (брокера) есть сообщения для получателя (Consumer), то он по своей инициативе отправляет сообщение получателю.

Источник: https://dev.to/anubhavitis/push-vs-pull-api-architecture-1djo 

2. Pull-архитектура. Клиент (он же Consumer) по своей инициативе запрашивает у сервера (брокера) сообщения, и если у брокера есть сообщения для клиента, то отправляет эти сообщения.

Источник: https://dev.to/anubhavitis/push-vs-pull-api-architecture-1djo

В данной статье будет рассмотрена Push-архитектура, наиболее распространенная при использовании брокера сообщений RabbitMQ.

От слов к делу!

Реализуем RPC c помощью MassTransit

Наиболее популярные библиотеки для работы с RabbitMQ в .NET:

  • RabbitMQ.Client

  • MassTransit.RabbitMQ

  • EasyNetQ

  • NServiceBus.RabbitMQ

В качестве примера работы с RabbitMQ рассмотрим нестандартную задачу построения RPC (Remote Procedure Call, удаленный вызов процедур) c использованием библиотеки MassTransit.

Условия задачи:

Программа генерирует число, которое пользователь должен угадать. При каждом вводе числа программа пишет результат: больше или меньше отгадываемого. Количество попыток угадывания и диапазон чисел должны задаваться из настроек. 

Рассмотрим два варианта решения:

Saga — это долгоживущая транзакция, управляемая координатором. Саги инициируются событием, саги организуют события, и саги поддерживают состояние всей транзакции. Саги предназначены для управления сложностью распределенных транзакций без блокировки и немедленной согласованности. Они управляют состоянием и отслеживают любые компенсации, необходимые в случае частичного сбоя.

Краткое описание реализованной функциональности:

  • Консольный клиент принимает от пользователя число и отправляет его в брокер сообщений.

  • Сервер получает в сообщении из брокера сообщений число, отправленное пользователем, обрабатывает его и высылает в брокер сообщений результат обработки.

  • Клиент получает от брокера сообщений ответ сервера и выводит его пользователю.

Полный код проекта приведен на github

В демонстрационных целях будем использовать «облачный» RabbitMQ, где после регистрации будет выдан бесплатный экземпляр RabbitMQ и информация для подлючения из нашего приложения.

Решение без использования Saga MassTransit

Принципиальная схема приложения без использования Saga:

e4475c2a83ee70c7f5b86d216845b29f.png

Теперь переходим в Visual Studio и создаем наши проекты.

9b3a5afa6e2d23d8973a990521319df9.png

Настройки подключения к облаку RabbitMQ указаны в appsettings.json файлах:

91c247f2187b749c9a0596c3a74e027a.png

При старте Producer-приложения и Consumer-приложения происходит инициализация настроек для подключения к RabbitMQ.

Для Producer:

3077b3a5ece5169cb84c223cecbafd36.png

И для Consumer:

8fe599dbe1543d1e20cfae9b87d9a146.png61b57483fd490b9ef7fda127bb255157.png

Для примера работы с брокером сообщений мы используем библиотеку MassTransit.RabbitMQ v. 8.1.3

В данном проекте вся логика работы с брокером представлена в библиотеке классов «Infrastructure.MassTransit»:

3ea17bfb0a4d1671b8312e2cc7402f47.png

Процесс отправки сообщения из консольного клиента:

ef230ac1297c60efb4066b3da77f23d2.png

Процесс получения сообщений из очереди ответов для консольного клиента:

8a77ec8d046f66a1fde2443516e47a7f.png

Устанавливаем класс, который будет отвечать за получение сообщений для консольного клиента из очереди:

dd745e414e213d25dc50939d3234e8ec.png

Для прослушивания сообщений из очереди ответов для консольного клиента определен класс RabbitMqConsumer, который реализует интерфейс IConsumer библиотеки MassTransit.

Логика работы нашего консольного клиента лишь в отправке числа, которое ввел пользователь через консоль на сервер и получение ответа (более подробно можно посмотреть в github), поэтому класс RabbitMqConsumer лишь получает сообщения и выставляет флаг, отгадано ли число с сервера или нет:

40079782068038856bef37d0ee371d51.png

Теперь рассмотрим request от клиента к серверу через брокер и реализацию responce от сервера к клиенту через брокер:

В классе Program проекта WebApi.Consumer указан метод расширения, внутри которого, помимо установки настроек подключения к облаку RabbitMQ, также указываются настройки класса, который будет прослушивать сообщения из очереди для сервера:

ea1d7a00ad57451c0c8738c9b48d5224.png

Метод AddBLLServicesWithoutSaga ():

70607807e7138488d89d7db623a75d62.png

Настройка подключения к облаку RabbitMQ и настройка класса прослушивания входящих сообщений:

d1a7732fc5aa46c34d47e448203fc3f7.png

Так как мы реализуем RPC, то класс, который получает сообщения из очереди от клиента, должен обрабатывать данные и возвращать ответ клиенту через очередь ответов. Это всё реализовано в классе RabbitMqConsumerWebApi:

a9600c7a5bcfb108738841652fb66705.png

Этот класс реализует интерфейс IConsumer из библиотеки MassTransit.

В методе Consume мы получаем сообщение из очереди клиента, обрабатываем его и отправляем результат обработки в очередь ответов.

Более подробное описание работы программы указано в файле Readme на github.

Стоит отметить, что при старте приложений в облаке RabbitMQ с помощью подключенной ранее в наш проект библиотеки MassTransit.RabbitMQ будут созданы очереди сообщений, которые мы указали в коде (очередь для отправки на сервер и очередь для отправки клиенту). Для просмотра статистических данных по очередям необходимо открыть ресурс:

a8cda5ac2b101536e5a511af46898bae.png

Далее откроется страница управления RabbitMQ c довольно богатой функциональностью, если мы перейдем во вкладку очередей, то увидим наши очереди:

1239daad040637e8eaef84430bee82b8.png

Решение с использованием Saga от MassTransit

Библиотека MassTransit имеет возможность организовать и управлять серий событий.

Схема приложения с использованием Saga от MassTransit:

3957c890d94105c041aeb07ed7bce280.png

Для использования Saga изменим класс Program консольного клиента:

0f9398ccc3e91343f5456fd027b1c078.png

Внутри метода расширения устанавливаем настройки типов запроса и настройки конфигурации:

1768d41a2b93a289c93af8d1ae9aad24.png

Для отправки сообщений и получения ответа от саги необходимо использовать метод GetResponse интерфейса IRequestClient библиотеки MassTransit:

afdc87bf71a55b040f6798d4e966bb1c.png34a5e21059fecc35562a42dc32fdcc5b.png

Далее создаем проект ASP.NET Core c названием WebApi.Saga, внутри которого реализуем сагу для управления запросами и ответами наших микросервисов. В классе Program проекта ASP.NET Core добавим настройки конфигурации:

afd4a67b38b1a5a18f30f31b5ac10177.pngc6233f29d0a1088e20e0f16b52dacb13.png

Далее необходимо создать класс, описывающий состояние саги:

791223e42cce20260f700cb35ef9e20c.png

И класс, описывающий обработку событий, возникающих в саге:

a6de32e92b89aefc6ae8bdf2b89ec470.png

Теперь отредактируем класс-обработчик сообщений от клиента:

7398c28260e95dbc8203d4e516c0dcf0.png

Также стоит отметить, что при запуске программы в облаке RabbitMQ  автоматически создадутся очереди, которыми будет управлять сага:

06b855ca2d8d1d51332dd8c6dbf6995e.png

Вывод

При взаимодействии ряда микросервисов между собой предпочтительным вариантом является использование Saga от MassTransit в силу того, что Saga представляет мощную функциональность управления распределенными транзакциями. Помимо этого имеется возможность создавать и обрабатывать события. Более подробно с Saga можно ознакомиться здесь.

Надеюсь, теперь вы получили представления о работе брокера сообщений RabbitMQ по Push-архитектуре в связке с библиотекой Masstransit. А реализовать это на практике помогут два шаблона проекта реализации RPC (с использованием Saga и без).

Спасибо за внимание!

Больше авторских материалов для backend-разработчиков от моих коллег читайте в соцсетях SimbirSoft — ВКонтакте и Telegram.

© Habrahabr.ru