RabbitMQ + Java Spring

Понять статью не составит труда тем, кто знаком с Spring и Spring Web и хотя бы раз создавал простое приложение с контроллерами, сервисами и моделями (проще говоря — реализовывал паттерн Model View Controller).

С чего всё начиналось

По работе ко мне пришли с предложением начать разработку небольшого проекта с использованием RabbitMQ в связке с Spring Framework. До того момента я только лишь читал о RabbitMQ и с очередями сообщений особо не работал, так что часть своих выходных решил потратить на изучение данной технологии и её применениях.

О RabbitMQ вкратце

RabbitMQ — брокер сообщений. Данная технология позволяет вести асинхронную обработку данных, а также делать микросервисы слабосвязанными, что может облегчить разработку. RabbitMQ можно сравнить с почтой, куда приходят письма. Если письмо пришло на почту, то его можно прочитать в любое удобное время. Также и в RabbitMQ: пришедшее письмо микросервис сможет забрать в любой момент. Тем самым можно ослабить нагрузку на микросервис, ведь «письма» он сможет обрабатывать в своём темпе.

Сообщения отправляются в очереди, где они хранятся до момента прочтения. После прочтения сообщение удаляется. Дальнейшая его судьба определяется сервисом, прочитавшим сообщение. Также сообщение обладает некоторыми атрибутами, в зависимости от которых определяется, в какую очередь оно попадёт. Данные атрибуты задаются разработчиком. Для примера атрибутом может служить название очереди или topic. Подробнее о атрибутах и обмене сообщениями по ним стоит прочитать на официальном сайте: https://www.rabbitmq.com/tutorials/amqp-concepts.

Для более полного понимания приведу для примера следующую схему:

Схема отправления сообщений

Схема отправления сообщений

На схеме P — producer, сервис отправляющий сообщения. X — маршрутизатор сообщений (он определяет в какую очередь попадёт сообщение), Q1, Q2, Q3 — очереди, C1, C2, C3 — потребители. Схема взаимосвязи микросервисов не обязательно такая, как указана на изображении. У одной очереди вполне может быть несколько продюсеров и несколько потребителей, всё зависит от самой архитектуры микросервисов.

Для получения полной информации о RabbitMQ стоит посетить официальный сайт: https://www.rabbitmq.com/. В данной же статье будет описание простого проекта на Java с двумя микросервисами, взаимодействующими посредством RabbitMQ.

Развёртывание RabbitMQ

Первым делом я взялся за развёртывание RabbitMQ. Благо для этого на Windows нужен лишь предустановленный Docker и интернет. Чтобы поднять контейнер с RabbitMQ на своём компьютере необходимо выполнить следующую команду:

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management

После успешного выполнения этой команды перейдя по ссылке http://localhost:15672 можно увидеть следующую картину:

страница авторизации в RabbitMQ

страница авторизации в RabbitMQ

По дефолту логин и пароль: guest

После авторизации можно увидеть следующую чудесную картину:

UI RabbitMQ

UI RabbitMQ

При должном знании английского разобраться в UI не составит труда.

UI предоставляет всю информацию о состоянии RabbitMQ, о её очередях, пользователях, использовании, готовых к обработке сообщений и так далее…

Дальше перейдём к написанию проекта

RabbitMQProducer

Данный сервис отправляет сообщения в одну из двух очередей RabbitMQ.

Для начала подключим необходимые библиотеки:



	4.0.0
	
		org.springframework.boot
		spring-boot-starter-parent
		3.4.1
		 
	
	org.example
	RabbitMQProducer
	0.0.1-SNAPSHOT
	RabbitMQProducer
	RabbitMQProducer
	
	
		
	
	
		
	
	
		
		
		
		
	
	
		21
	
	
		
			org.springframework.boot
			spring-boot-starter-amqp
		
		
			org.springframework.boot
			spring-boot-starter-web
		

		
			org.springframework.boot
			spring-boot-starter-test
			test
		
		
			org.springframework.amqp
			spring-rabbit-test
			test
		
		
			org.springdoc
			springdoc-openapi-starter-webmvc-ui
			2.0.2
		
	

	
		
			
				org.springframework.boot
				spring-boot-maven-plugin
			
		
	


Среди них Spring Web для написания REST, Spring RabbitMQ для работы с RabbitMQ и Srping Openapi для подключения Swagger.

Сперва сконфигурируем наше приложение с помощью файла application.properties:

spring.application.name=RabbitMQProducer

spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672

server.port=8081

rabbitmq.queue.name=message_queue
rabbitmq.queue_with_delay.name=delay_queue

Под spring.rabbitmq указываем подключения к rabbitmq, server.port для того, чтобы открыть веб приложение на определённом порту (нужно учитывать, что он может быть попросту занят другим приложением). Две последних строки указывают названия очередей (де-факто их можно указать и напрямую в Java Code, но такой подход не совсем правильный, как мне кажется, да и менять названия очередей затем проще из application.properties, чем потом копаться в коде и менять там).

Затем укажем конфигурации для очередей:

package org.example.rabbitmqproducer.config;

import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitMQConfig {

    @Value("${rabbitmq.queue.name}")
    private String queueName;

    @Value("${rabbitmq.queue_with_delay.name}")
    private String queueWithDelayName;

    /**
     * Конфигурируем очередь
     * @return Очередь RabbitMQ
     */
    @Bean
    public Queue queue() {
        return new Queue(queueName, false);
    }

    @Bean
    public Queue queue2() {
        return new Queue(queueWithDelayName, false);
    }

}

Здесь с помощью аннотации @Value забираем значения из application.properties.

Далее напишем небольшой сервис, который будет просто посылать сообщения в брокер:

package org.example.rabbitmqproducer.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class MessageService {

    private final RabbitTemplate template;

    @Value("${rabbitmq.queue.name}")
    private String messageQueue;

    @Value("${rabbitmq.queue_with_delay.name}")
    private String messageWithDelayQueue;

    @Autowired
    public MessageService(RabbitTemplate template) {
        this.template = template;
    }

    /**
     * Отправляет сообщение в очередь
     * @param message Сообщение
     */
    public void sendMessage(String message) {
        template.convertAndSend(messageQueue, message);
    }

    /**
     * Посылает сообщение в очередь, которая обрабатывается слушателем с некоторой задержкой
     * @param message Сообщение
     */
    public void sendMessageToQueueWithDelay(String message) {
        template.convertAndSend(messageWithDelayQueue, message);
    }

}

И теперь напишем контроллер, где метод POST по пути /message отправит сообщение в message_queue очередь, а метод POST по пути /message/with_delay отправит сообщение в очередь delay_queue:

package org.example.rabbitmqproducer.controller;

import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.example.rabbitmqproducer.service.MessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;

@Controller
@Tag(name = "Message Controller", description = "Controller to send message to RabbitMQ")
@RequestMapping("/message")
public class MessageController {

    private final MessageService service;

    @Autowired
    public MessageController(MessageService service) {
        this.service = service;
    }

    @Operation(description = "Send message to RabbitMQ")
    @ApiResponse(responseCode = "204")
    @PostMapping
    public ResponseEntity sendMessage(String message) {
        service.sendMessage(message);
        return ResponseEntity.noContent().build();
    }

    @Operation(description = "Send message to RabbitMQ in queue with delay")
    @ApiResponse(responseCode = "204")
    @PostMapping("/with_delay")
    public ResponseEntity sendMessageWithDelay(String message) {
        service.sendMessageToQueueWithDelay(message);
        return ResponseEntity.noContent().build();
    }
}

Запустим приложение.

Если всё правильно написано и сконфигурировано, то по ссылке http://localhost:8081/swagger-ui/index.html#/ можно увидеть поднятый Swagger:

906b5c113691aa77e971e90525864dcc.png

Воспользуемся эндпоинтом /message два раза и эндпоинтом /message/with_delay один раз:

результат отправления сообщений

результат отправления сообщений

Здесь мы можем увидеть, что в очереди пришли сообщения. Теперь обработаем их.

RabbitMQConsumer

Данный сервис отвечает за обработку сообщений из очереди.

Подключим нужные библиотеки:



	4.0.0
	
		org.springframework.boot
		spring-boot-starter-parent
		3.4.1
		 
	
	org.example
	RabbitMQConsumer
	0.0.1-SNAPSHOT
	RabbitMQProducer
	RabbitMQConsumer
	
	
		
	
	
		
	
	
		
		
		
		
	
	
		21
	
	
		
			org.springframework.boot
			spring-boot-starter-amqp
		
		
			org.springframework.boot
			spring-boot-starter-test
			test
		
		
			org.springframework.amqp
			spring-rabbit-test
			test
		
	

	
		
			
				org.springframework.boot
				spring-boot-maven-plugin
			
		
	


Сконфигурируем в application.properties:

spring.application.name=RabbitMQConsumer

spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672

server.port=8082

rabbitmq.queue.name=message_queue
rabbitmq.queue_with_delay.name=delay_queue

Здесь из Spring библиотек нужна только Spring RabbitMQ

Напишем слушатель сообщений:

package org.example.rabbitmqconsumer.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Random;

@Component
public class RabbitMQListener {

    private final static Random RANDOM = new Random();

    @RabbitListener(queues = "#{@environment.getProperty('rabbitmq.queue.name')}")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }

    @RabbitListener(queues = "#{@environment.getProperty('rabbitmq.queue_with_delay.name')}")
    public void receiveMessageWithDelay(String message) throws InterruptedException {
        var delay = RANDOM.nextInt(10000);
        Thread.sleep(delay);
        System.out.println("Received message: " + message + " with delay: " + delay);
    }

}

Особым образом здесь подтягиваются названия очередей из application.properties.

В методе receiveMessage сообщение сразу печатается в консоле. В методе receiveMessageWithDelay симулируется задержка до 10 секунд. Можно увидеть, что сообщения постепенно вытаскиваются из очереди delay_queue.

Запустим приложение и увидим вывод в консоле:

вывод в консоль

вывод в консоль

Приложение запустилось и само забрало из очереди сообщение. Можно ещё «побаловаться» и поотправлять сообщений. Посмотрим, что в мониторинге очереди delay_queue RabbitMQ:

мониторинг delay_queue

мониторинг delay_queue

Также на message_queue:

мониторинг message_queue

мониторинг message_queue

Видим разницу. В message_queue сообщения не задерживаются. А вот в delay_queue задерживаются из-за случайной задержки.

GitHub проекта: https://github.com/3abubenni/rabbitmq

© Habrahabr.ru