[Из песочницы] Удивительно простой обмен сообщениями с Spring Cloud Stream
Привет, Хабр! Представляю вашему вниманию перевод статьи «Surprisingly simple messaging with Spring Cloud Stream» автора Richard Seroter.
Существует множество вариантов взаимодействия микросервисов. Вы можете использовать обнаружение сервисов (Service Discovery, например, Spring Cloud Discovery Server/Client в реализации Netflix Eureka) и совершать прямые вызовы. Или можете использовать общую базу данных для обмена результами работы. Но брокеры сообщений продолжают оставаться популярным выбором.
Они варьируются от простых движков вроде Amazon SQS или RabbitMQ до событийных потоковых процессоров вроде Azure Event Hubs или Apache Kafka и вплоть до служебных шин вроде Microsoft BizTalk Server. Когда разработчики выбирают один из движков, они критически нуждаются в знаниях об их эффективности. Как вы можете повысить производительность разработчиков? Для Java разработчиков Spring Cloud Stream предлагает ценную абстракцию.
Spring Cloud Stream предлагает интерфейс для разработчиков, которым не требуются нюансы базового брокера. Этот брокер, Apache Kafka или RabbitMQ, настраивается самим Spring Cloud Stream. Связь с брокером и обратно от брокера осуществляется также через библиотеку Stream.
Что меня волнует, так это то, что все брокеры обрабатываются одинаково. Spring Cloud Stream нормализует поведение, даже если оно не является родным для брокера. Например, хотите создать конкурирующую модель консюмера для своих клиентов или секционировать обработку? Эти концепции ведут себя по-разному в RabbitMQ и Kafka. Нет проблем. Spring Cloud Stream делает работу одинаково прозрачной. Давайте фактически попробуем оба этих сценария.
Конкурирующие консюмеры через «consumer groups» (группы консюмеров)
По умолчанию Spring Cloud Stream устанавливает отношения между публикациями сообщений и подписками на них. Это позволяет легко обмениваться данными между разными абонентами. Но что, если вы хотите иметь несколько экземпляров подписчиков (для масштабирования обработки)? Одним из решений являются «consumer groups» (группы консюмеров). Но в брокерах их реализация разная. Тут Spring Cloud Stream берется за работу! Давайте создадим пример приложения с помощью RabbitMQ.
Перед написанием кода нам нужен экземпляр сервера RabbitMQ. Простейший вариант запуска — Docker контейнер для RabbitMQ. Если у вас установлен Docker, вам нужно только выполнить следующую команду:
docker run -d –hostname local-rabbit –name demo-rmq -p 15672:15672 -p 5672:5672 rabbitmq:3.6.11-management
Вместо rabbitmq:3.6.11-management Вы можете использовать последнюю версию с hub.docker.com/_/rabbitmq
После запуска у меня появился локальный кеш docker image и запущенный контейнер с отображением портов, который делает контейнер доступным на моей рабочей машине.
Как мы отправляем сообщения в RabbitMQ? Spring Cloud Stream поддерживает несколько шаблонов. Мы можем публиковать по расписанию или по запросу. Здесь давайте создадим веб-приложение, которое публикует сообщение при выдаче пользователем команды POST на REST endpoint.
Приложение публикации сообщений (Продюсер, Producer)
Во-первых, создайте приложение Spring Boot, которое использует spring-cloud-starter-stream-rabbit (и spring-boot-starter-web). Это все, что мне нужно для использования Spring Cloud Stream и RabbitMQ — в качестве цели публикации сообщений.
Добавьте новый класс, который реализует REST контроллер. Простая аннотация @EnableBinding запускает конфигурацию приложения как проекта Spring Cloud Stream. Здесь я использую поставляемый простой интерфейс «Source», который определяет один канал связи, но вы также можете создавать свои собственные. Это всего две строки кода.
@EnableBinding(Source.class)
@RestController
public class BriefController {
В этом классе контроллера добавьте поле @Autowired, которое ссылается на bean, который Spring Cloud Stream создает в интерфейсе «Source». (Вот так! Конфигурируем канал в интерфейсе, который разбирается Spring Cloud Stream на бины). Затем мы можем использовать эту переменную для прямого опубликования сообщений в связанном канале! Общий код, будь то RabbitMQ или Kafka. Универсально и просто.
@EnableBinding(Source.class)
@RestController
public class BriefController {
//refer to instance of bean that Stream adds to container
@Autowired
Source mysource;
//take in a message via HTTP, publish to broker
@RequestMapping(path="/brief", method=RequestMethod.POST)
public String publishMessage(@RequestBody String payload) {
System.out.println(payload);
//send message to channel
mysource.output().send(MessageBuilder.withPayload(payload).build());
return "success";
}
Приложение публикации сообщений почти закончено, и все, что осталось, — это базовая конфигурация. Эта конфигурация сообщает Spring Cloud Stream, как подключиться к брокеру. Обратите внимание, что нам не нужно указывать Spring Cloud Stream использовать RabbitMQ; это происходит автоматически, по наличию классов boot starter RabbitMQ. Нет, все, что нам нужно, это информация о соединении с нашим брокером, явная ссылка на пункт назначения сообщений и команда для отправки JSON.
server.port=8080
#rabbitmq settings for Spring Cloud Stream to use
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.cloud.stream.bindings.output.destination=legalbriefs
spring.cloud.stream.default.contentType=application/json
Приложение для потребителей сообщений (Консюмер, Consumer)
Эта часть слишком проста. Здесь создайте новое приложение Spring Boot и выберите только зависимость от spring-cloud-starter-stream-rabbit.
Украсьте класс аннотацией @EnableBinding и используйте встроенный интерфейс Sink. Затем все, что осталось, — это создать метод обработки любых сообщений, найденных в брокере. Для этого мы обозначаем метод обработки с помощью аннотации @StreamListener, и вся обработка содержимого контента выполняется для нас.
@EnableBinding(Sink.class)
@SpringBootApplication
public class BlogStreamSubscriberDemoApplication {
public static void main(String[] args) {
SpringApplication.run(BlogStreamSubscriberDemoApplication.class, args);
}
@StreamListener(target=Sink.INPUT)
public void logfast(String msg) {
System.out.println(msg);
}
}
Конфигурация для этого приложения проста. Как и выше, у нас есть данные о соединении для RabbitMQ. Также обратите внимание, что привязка теперь ссылается на «input», который был именем канала в стандартном «Sink» интерфейсе. Наконец, обратите внимание, что я использовал то же место назначения, как и источник, чтобы обеспечить Spring Cloud Stream успешным подключением издателя и подписчика. Для ощущения силы Cloud Stream ниже добавлю настройки группы потребителей (consumers).
server.port=0
#rabbitmq settings for Spring Cloud Stream to use
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.cloud.stream.bindings.input.destination=legalbriefs
Тестирование решения
Давайте посмотрим, как это работает. Сначала запустите три экземпляра приложения-подписчика. Я создал файл jar и запустил три экземпляра в shell.
Когда вы запускаете эти приложения, Spring Cloud Stream начинает работу. Войдите в веб-консоль администратора RabbitMQ и обратите внимание, что точка обмена (Exchange) создана. Она названа «legalbriefs» и сопоставляется с именем, которое мы указали в файле конфигурации выше.
У нас также есть три очереди, которые отображаются в каждом из трех экземпляров приложений, которые мы запускали.
Наконец, запустите продюсера и отправьте сообщение через http запрос с /briefs на конце.
Что происходит? Как и ожидалось, каждый абонент получает копию сообщения, потому что по умолчанию все происходит в простом режиме публикация/подписка.
Добавить конфигурацию группы подписчиков (consumer groups)
Мы не хотим, чтобы каждый экземпляр подписчика получал копию сообщения. Скорее, мы хотим, чтобы эти экземпляры делили сообщения между собой. Каждому сообщению должно приходить только одно. В приложении-подписчике мы добавляем одну строку в наш файл конфигурации. Это сообщает Spring Cloud Stream, что все экземпляры образуют единую группу подписчиков, которые работают совместно.
#adds consumer group processing
spring.cloud.stream.bindings.input.group = briefProcessingGroup
После перегенерации jar файла подписчика и запуска каждого файла мы видим другую настройку в RabbitMQ. То, что вы видите, — это одна именованная очередь, но три «потребителя» очереди.
Отправьте два разных сообщения и убедитесь, что каждое обрабатывается только одним экземпляром подписчика. Это простой способ использовать брокер сообщений для масштабирования обработки.
Выполнение stateful обработки с использованием разбиения на разделы
Разделение выглядит как связанный, но другой сценарий, чем группы потребителей. Разделы (Partitions) в Kafka вводят уровень параллельной обработки путем записи данных в разные разделы. Затем каждый подписчик тянет сообщение из заданного раздела (partition), чтобы выполнить работу. Здесь, в Spring Cloud Stream, разбиение полезно для параллельной обработки, но также и для обработки с учетом состояния. При настройке вы указываете характеристику, которая направляет сообщения на данный раздел. Затем один экземпляр приложения обрабатывает все данные в этом разделе. Это может быть удобно для обработки событий или любого сценария, когда полезно, чтобы связанные сообщения обрабатывались одним и тем же экземпляром. Подумайте: счетчики, сложная обработка событий или чувствительные к времени вычисления задачи.
В отличие от групп потребителей для разбиения на разделы требуются изменения конфигурации как для издателей, так и для подписчиков. На стороне издателя (producer) все, что необходимо указать: (a) количество разделов и (b) выражение, описывающее разделение данных. Никаких изменений кода.
#adding configuration for partition processing
spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload.attorney
spring.cloud.stream.bindings.output.producer.partitionCount=3
На стороне подписчика (consumer) вы устанавливаете количество разделов и устанавливаете свойство «partitioned» равным «true». Что также интересно, но логично, так это то, что по мере запуска каждого абонента вам нужно дать ему «индекс», чтобы Spring Cloud Streams знал, из какого раздела он должен читать сообщения.
#add partition processing
spring.cloud.stream.bindings.input.consumer.partitioned=true
#spring.cloud.stream.instanceIndex=0
spring.cloud.stream.instanceCount=3
Давайте запустим все снова. Продюсер запускается так же, как и раньше. Однако, каждый экземпляр подписчика запускается с флагом «spring.cloud.stream.instanceIndex = X», который указывает, какой индекс применяется.
Как видим, в RabbitMQ настройка отличается от предыдущей. Теперь у нас есть три очереди, каждая с другим «ключом маршрутизации» («routing key»), который соответствует его разделу.
Отправьте сообщение и обратите внимание, что все сообщения с одним и тем же именем индекса переходят к одному экземпляру. Измените номер и убедитесь, что все сообщения по-прежнему идут в одно и то же место. Переключите номер и обратите внимание, что другой раздел (вероятно) получает его. Если у вас больше разновидностей данных, чем разделов, вы увидите дескриптор раздела с более, чем одним набором данных. Нет проблем, просто знайте, что это происходит.
Резюме
Не обязательно иметь дело непосредственно с брокерами сообщений. Конечно, существует множество сценариев, в которых вы хотите применить расширенные параметры брокера, но есть также много случаев, когда вы просто хотите надежного посредника. В таких случаях Spring Cloud Stream упрощает абстрагирование знаний брокера, но при этом нормализует поведение по уникальным реализациям брокеров.
В моем последнем курсе Pluralsight я потратил более часа, копаясь в Spring Cloud Stream, а еще 90 минут работал с Spring Cloud Data Flow. Этот проект поможет вам быстро объединить потоковые приложения. Проверьте это для более глубокого погружения!