[Из песочницы] RabbitMQ Spring tutorial
На сайте rabbitmq.com уже есть подробные примеры и клиент для java. Однако если в проекте уже используется спринг, то намного удобнее использовать библиотеку Spring AMQP. Эта статья содержит реализацию всех шести официальных примеров работы с RabbitMQ.
Сразу ссылка на проекты на GitHub.
Для примеров я буду использовать простейшее приложение на спринге. После того, как пользователь перейдёт по опредленной ссылке, в RabbitMQ будет посылаться сообщение которое будет отправляться в один из листенеров. Листенер в свою очередь будет просто выводить сообщение в лог. На хабре уже были переводы официальных туториалов на php и python, и я думаю многие уже знакомы с принципами работы rabbitmq, поэтому я сконцентрируюсь на работе именно с Spring AMQP.
Установка RabbitMQ
Установка RabbitMQ детально описана на официальном сайте. Тут проблем возникнуть не должно.
Настройка Spring
Для простоты я использовал Spring Boot. Он отлично подходит, чтобы быстро развернуть приложения на спринге и не заниматься его долгим кофигурированием. При этом сам Spring AMQP я буду конфигурировать «классическим способом» — т.е. так, как я конфигурировал в реальном проекте без Spring Boot (разве что в ConnectionFactory не описаны некоторые специфичные для heroku вещи).
Cодержимое минимального pom.xml необходимого нам для запуска. Здесь уже есть Spring boot и Spring AMQP.
4.0.0
rabbitmq
example-1
1.0-SNAPSHOT
org.springframework.boot
spring-boot-starter-parent
1.2.4.RELEASE
org.springframework.amqp
spring-rabbit
1.4.5.RELEASE
org.springframework.boot
spring-boot-starter-web
Основной файл конфигурации. Кроме имени класса, его содержимое будет одинаковым для всех наших примеров.
package com.rabbitmq.example1;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Import;
@EnableAutoConfiguration
@ComponentScan
@Import(RabbitConfiguration.class)
public class Example1Configuration {
public static void main(String[] args) throws Exception {
SpringApplication.run(Example1Configuration.class, args);
}
}
Пример 1. «Hello World!»
Для работы с RabbitMQ нам потребуются следующие бины:
— сonnectionFactory — для соединения с RabbitMQ;
— rabbitAdmin — для регистрации/отмены регистрации очередей и т.п.;
— rabbitTemplate — для отправки сообщений (producer);
— myQueue1 — собственно очередь куда посылаем сообщения;
— messageListenerContainer — принимает сообщения (consumer).
package com.rabbitmq.example1;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
public class RabbitConfiguration {
Logger logger = Logger.getLogger(RabbitConfiguration.class);
//настраиваем соединение с RabbitMQ
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
//объявляем очередь с именем queue1
@Bean
public Queue myQueue1() {
return new Queue("queue1");
}
//объявляем контейнер, который будет содержать листенер для сообщений
@Bean
public SimpleMessageListenerContainer messageListenerContainer1() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueNames("queue1");
container.setMessageListener(new MessageListener() {
//тут ловим сообщения из queue1
public void onMessage(Message message) {
logger.info("received from queue1 : " + new String(message.getBody()));
}
});
return container;
}
}
В этом и следующих примерах в качестве продюссера будет контроллер, который будет посылать сообщения в rabbitmq.
package com.rabbitmq.example1;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class SampleController {
Logger logger = Logger.getLogger(SampleController.class);
@Autowired
AmqpTemplate template;
@RequestMapping("/emit")
@ResponseBody
String queue1() {
logger.info("Emit to queue1");
template.convertAndSend("queue1","Message to queue");
return "Emit to queue";
}
}
Теперь, если запустить Example1Configuration и перейти в браузере по адресу http://localhost:8080/emit, то в консоли мы увидим что-то типа:
2015-06-23 21:16:26.250 INFO 6460 --- [nio-8080-exec-2] com.rabbitmq.example1.SampleController : Emit to queue1 2015-06-23 21:16:26.252 INFO 6460 --- [cTaskExecutor-1] c.rabbitmq.example1.RabbitConfiguration : received from queue 1: Message to queue
Рассмотрим подробнее получившийся результат. Тут мы в SampleController.java отправляем сообщение:
template.convertAndSend("queue1","Message to queue");
А здесь мы его получаем:
public void onMessage(Message message) {
logger.info("received from queue 1: " + new String(message.getBody()));
}
Ничего сложного, но описывать все листенеры в конфигурации не совсем удобно, особенно когда их становится много. Гораздо удобнее конфигуририровать листенеры аннотациями.
Пример 1.1. «Hello World!» на аннотациях
Вместо листенера в конфигурации добавим в проект класс RabbitMqListener, в который будут приходить сообщения. Соответственно messageListenerContainer1 уже не нужен.
RabbitMqListener — это обыкновенный компонент (@Component) спринга с методом, помеченным анотацией @RabbitListener. В этом метод будут приходить сообщения. При этом мы можем получать как полное сообщение Message заголовками и телом как массив байт, так и просто сконвертированное тело в том виде, в каком мы его отправляли.
@RabbitListener(queues = "queue1")
public void processQueue1(String message) {
logger.info("Received from queue 1: " + message);
}
package com.rabbitmq.example1annotated;
import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@EnableRabbit //нужно для активации обработки аннотаций @RabbitListener
@Component
public class RabbitMqListener {
Logger logger = Logger.getLogger(RabbitMqListener.class);
@RabbitListener(queues = "queue1")
public void processQueue1(String message) {
logger.info("Received from queue 1: " + message);
}
}
package com.rabbitmq.example1annotated;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfiguration {
Logger logger = Logger.getLogger(RabbitConfiguration.class);
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public Queue myQueue1() {
return new Queue("queue1");
}
}
Пример 2. Work Queues
В данном примере одну очередь слушают уже два листенера. Для эмуляции полезной работы используем Thread.sleep. Важно, что листенеры одной очереди могут быть и на разных инстансах программы. Так можно распараллелить очередь на несколько компьютеров или нод в облаке.
@RabbitListener(queues = "query-example-2")
public void worker1(String message) throws InterruptedException {
logger.info("worker 1 : " + message);
Thread.sleep(100 * random.nextInt(20));
}
@RabbitListener(queues = "query-example-2")
public void worker2(String message) throws InterruptedException {
logger.info("worker 2 : " + message);
Thread.sleep(100 * random.nextInt(20));
}
Результат:
2015-06-23 22:03:48.018 INFO 6784 --- [nio-8080-exec-1] com.rabbitmq.example2.SampleController : Emit to queue 2015-06-23 22:03:48.029 INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener : worker 2 : Message 1 2015-06-23 22:03:48.029 INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener : worker 1 : Message 0 2015-06-23 22:03:48.830 INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener : worker 2 : Message 2 2015-06-23 22:03:49.331 INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener : worker 2 : Message 3 2015-06-23 22:03:49.432 INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener : worker 2 : Message 4 2015-06-23 22:03:49.634 INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener : worker 1 : Message 5 2015-06-23 22:03:49.733 INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener : worker 2 : Message 6 2015-06-23 22:03:49.735 INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener : worker 1 : Message 7 2015-06-23 22:03:50.236 INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener : worker 1 : Message 8 2015-06-23 22:03:50.537 INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener : worker 1 : Message 9
package com.rabbitmq.example2;
import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Random;
@Component
public class RabbitMqListener {
Logger logger = Logger.getLogger(RabbitMqListener.class);
Random random = new Random();
@RabbitListener(queues = "query-example-2")
public void worker1(String message) throws InterruptedException {
logger.info("worker 1 : " + message);
Thread.sleep(100 * random.nextInt(20));
}
@RabbitListener(queues = "query-example-2")
public void worker2(String message) throws InterruptedException {
logger.info("worker 2 : " + message);
Thread.sleep(100 * random.nextInt(20));
}
}
package com.rabbitmq.example2;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class SampleController {
Logger logger = Logger.getLogger(SampleController.class);
@Autowired
AmqpTemplate template;
@RequestMapping("/queue")
@ResponseBody
String queue1() {
logger.info("Emit to queue");
for(int i = 0;i<10;i++)
template.convertAndSend("query-example-2","Message " + i);
return "Emit to queue";
}
}
Пример 3. Publish/Subscribe
Тут одно и то же сообщение приходит сразу двум консьюмерам.
2015-06-23 22:12:24.669 INFO 1664 --- [nio-8080-exec-1] com.rabbitmq.example3.SampleController : Emit to exchange-example-3 2015-06-23 22:12:24.684 INFO 1664 --- [cTaskExecutor-1] com.rabbitmq.example3.RabbitMqListener : accepted on worker 1 : Fanout message 2015-06-23 22:12:24.684 INFO 1664 --- [cTaskExecutor-1] com.rabbitmq.example3.RabbitMqListener : accepted on worker 2 : Fanout message
Для этого, подключим обе очереди к FanoutExchange:
@Bean
public FanoutExchange fanoutExchangeA(){
return new FanoutExchange("exchange-example-3");
}
@Bean
public Binding binding1(){
return BindingBuilder.bind(myQueue1()).to(fanoutExchangeA());
}
@Bean
public Binding binding2(){
return BindingBuilder.bind(myQueue2()).to(fanoutExchangeA());
}
И будем отправлять не в очередь, а в exchange exchange-example-3:
template.setExchange("exchange-example-3");
template.convertAndSend("Fanout message");
Каждый раз указывать exchange необязательно. Его можно указать и один раз при создании RabbitTemplate.
package com.rabbitmq.example3;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@EnableRabbit
@Configuration
public class RabbitConfiguration {
Logger logger = Logger.getLogger(RabbitConfiguration.class);
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
return rabbitTemplate;
}
@Bean
public Queue myQueue1() {
return new Queue("query-example-3-1");
}
@Bean
public Queue myQueue2() {
return new Queue("query-example-3-2");
}
@Bean
public FanoutExchange fanoutExchangeA(){
return new FanoutExchange("exchange-example-3");
}
@Bean
public Binding binding1(){
return BindingBuilder.bind(myQueue1()).to(fanoutExchangeA());
}
@Bean
public Binding binding2(){
return BindingBuilder.bind(myQueue2()).to(fanoutExchangeA());
}
}
package com.rabbitmq.example3;
import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Random;
@Component
public class RabbitMqListener {
Logger logger = Logger.getLogger(RabbitMqListener.class);
Random random = new Random();
@RabbitListener(queues = "query-example-3-1")
public void worker1(String message) {
logger.info("accepted on worker 1 : " + message);
}
@RabbitListener(queues = "query-example-3-2")
public void worker2(String message) {
logger.info("accepted on worker 2 : " + message);
}
}
package com.rabbitmq.example3;
import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class SampleController {
Logger logger = Logger.getLogger(SampleController.class);
@Autowired
RabbitTemplate template;
@RequestMapping("/")
@ResponseBody
String home() {
return "Empty mapping";
}
@RequestMapping("/emit")
@ResponseBody
String emit() {
logger.info("Emit to exchange-example-3");
template.setExchange("exchange-example-3");
template.convertAndSend("Fanout message");
return "Emit to exchange-example-3";
}
}
Пример 4. Routing
Здесь используется routing key, в зависимости от которого сообщение может попасть в одну из очередей или сразу в обе. Для этого вместо FanoutExchange используем DirectExchange:
@Bean
public DirectExchange directExchange(){
return new DirectExchange("exchange-example-4");
}
@Bean
public Binding errorBinding1(){
return BindingBuilder.bind(myQueue1()).to(directExchange()).with("error");
}
@Bean
public Binding errorBinding2(){
return BindingBuilder.bind(myQueue2()).to(directExchange()).with("error");
}
@Bean
public Binding infoBinding(){
return BindingBuilder.bind(myQueue2()).to(directExchange()).with("info");
}
@Bean
public Binding warningBinding(){
return BindingBuilder.bind(myQueue2()).to(directExchange()).with("warning");
}
И при отправке используем указываем Routing key, например, так:
template.convertAndSend("info", "Info");
В результате получаем:
2015-06-23 22:29:24.480 INFO 5820 --- [nio-8080-exec-2] com.rabbitmq.example4.SampleController : Emit as info 2015-06-23 22:29:24.483 INFO 5820 --- [cTaskExecutor-1] com.rabbitmq.example4.RabbitMqListener : accepted on worker 2 : Info 2015-06-23 22:29:29.721 INFO 5820 --- [nio-8080-exec-4] com.rabbitmq.example4.SampleController : Emit as error 2015-06-23 22:29:29.727 INFO 5820 --- [cTaskExecutor-1] com.rabbitmq.example4.RabbitMqListener : accepted on worker 2 : Error 2015-06-23 22:29:29.731 INFO 5820 --- [cTaskExecutor-1] com.rabbitmq.example4.RabbitMqListener : accepted on worker 1 : Error 2015-06-23 22:29:36.779 INFO 5820 --- [nio-8080-exec-5] com.rabbitmq.example4.SampleController : Emit as warning 2015-06-23 22:29:36.781 INFO 5820 --- [cTaskExecutor-1] com.rabbitmq.example4.RabbitMqListener : accepted on worker 2 : Warning
package com.rabbitmq.example4;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@EnableRabbit
@Configuration
public class RabbitConfiguration {
Logger logger = Logger.getLogger(RabbitConfiguration.class);
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setExchange("exchange-example-4");
return rabbitTemplate;
}
@Bean
public Queue myQueue1() {
return new Queue("query-example-4-1");
}
@Bean
public Queue myQueue2() {
return new Queue("query-example-4-2");
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange("exchange-example-4");
}
@Bean
public Binding errorBinding1(){
return BindingBuilder.bind(myQueue1()).to(directExchange()).with("error");
}
@Bean
public Binding errorBinding2(){
return BindingBuilder.bind(myQueue2()).to(directExchange()).with("error");
}
@Bean
public Binding infoBinding(){
return BindingBuilder.bind(myQueue2()).to(directExchange()).with("info");
}
@Bean
public Binding warningBinding(){
return BindingBuilder.bind(myQueue2()).to(directExchange()).with("warning");
}
}
package com.rabbitmq.example4;
import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Random;
@Component
public class RabbitMqListener {
Logger logger = Logger.getLogger(RabbitMqListener.class);
Random random = new Random();
@RabbitListener(queues = "query-example-4-1")
public void worker1(String message) {
logger.info("accepted on worker 1 : " + message);
}
@RabbitListener(queues = "query-example-4-2")
public void worker2(String message) {
logger.info("accepted on worker 2 : " + message);
}
}
package com.rabbitmq.example4;
import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class SampleController {
Logger logger = Logger.getLogger(SampleController.class);
@Autowired
RabbitTemplate template;
@RequestMapping("/")
@ResponseBody
String home() {
return "Empty mapping";
}
@RequestMapping("/emit/error")
@ResponseBody
String error() {
logger.info("Emit as error");
template.convertAndSend("error", "Error");
return "Emit as error";
}
@RequestMapping("/emit/info")
@ResponseBody
String info() {
logger.info("Emit as info");
template.convertAndSend("info", "Info");
return "Emit as info";
}
@RequestMapping("/emit/warning")
@ResponseBody
String warning() {
logger.info("Emit as warning");
template.convertAndSend("warning", "Warning");
return "Emit as warning";
}
}
Пример 5. Topics
Здесь вместо DirectExchange используем TopicExchange
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("exchange-example-5");
}
@Bean
public Binding binding1(){
return BindingBuilder.bind(myQueue1()).to(topicExchange()).with("*.orange.*");
}
@Bean
public Binding binding2(){
return BindingBuilder.bind(myQueue2()).to(topicExchange()).with("*.*.rabbit");
}
@Bean
public Binding binding3(){
return BindingBuilder.bind(myQueue2()).to(topicExchange()).with("lazy.#");
}
В результате получаем:
2015-06-23 22:42:28.414 INFO 6560 --- [nio-8080-exec-1] com.rabbitmq.example5.SampleController : Emit 'to 1 and 2' to 'quick.orange.rabbit' 2015-06-23 22:42:28.428 INFO 6560 --- [cTaskExecutor-1] com.rabbitmq.example5.RabbitMqListener : accepted on worker 2 : to 1 and 2 2015-06-23 22:42:28.428 INFO 6560 --- [cTaskExecutor-1] com.rabbitmq.example5.RabbitMqListener : accepted on worker 1 : to 1 and 2 2015-06-23 22:42:55.802 INFO 6560 --- [nio-8080-exec-2] com.rabbitmq.example5.SampleController : Emit 'to 2' to 'lazy.black.cat' 2015-06-23 22:42:55.805 INFO 6560 --- [cTaskExecutor-1] com.rabbitmq.example5.RabbitMqListener : accepted on worker 2 : to 2
package com.rabbitmq.example5;
import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class SampleController {
Logger logger = Logger.getLogger(SampleController.class);
@Autowired
RabbitTemplate template;
@RequestMapping("/")
@ResponseBody
String home() {
return "Empty mapping";
}
@RequestMapping("/emit/{key}/{message}")
@ResponseBody
String error(@PathVariable("key") String key, @PathVariable("message") String message) {
logger.info(String.format("Emit '%s' to '%s'",message,key));
template.convertAndSend(key, message);
return String.format("Emit '%s' to '%s'",message,key);
}
}
package com.rabbitmq.example5;
import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Random;
@Component
public class RabbitMqListener {
Logger logger = Logger.getLogger(RabbitMqListener.class);
Random random = new Random();
@RabbitListener(queues = "query-example-5-1")
public void worker1(String message) {
logger.info("accepted on worker 1 : " + message);
}
@RabbitListener(queues = "query-example-5-2")
public void worker2(String message) {
logger.info("accepted on worker 2 : " + message);
}
}
package com.rabbitmq.example5;
import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class SampleController {
Logger logger = Logger.getLogger(SampleController.class);
@Autowired
RabbitTemplate template;
@RequestMapping("/")
@ResponseBody
String home() {
return "Empty mapping";
}
@RequestMapping("/emit/{key}/{message}")
@ResponseBody
String error(@PathVariable("key") String key, @PathVariable("message") String message) {
logger.info(String.format("Emit '%s' to '%s'",message,key));
template.convertAndSend(key, message);
return String.format("Emit '%s' to '%s'",message,key);
}
}
Пример 6. Remote procedure call (RPC)
Spring AMQP позволяет использовать convertSendAndReceive, чтобы получить ответ на отправленное сообщение. При этом, при дефолтной настройке, в случае если у нас RabbitMQ версии до 3.4.0, то для ответного сообщения будет создана временная очередь. Этот способ не очень производительный, поэтому его использовать не рукомендуется и следует создать самому также и очередь для обратных сообщений и указать её как ReplyQueue у RabbitTemplate. Если же у нас RabbitMQ версии 3.4.0 и выше, то будет использован механизм Direct reply-to, который намного быстрее. Подробнее в документации по Spring AMQP.
Таким образом осуществить удаленный вызов процедуры можно всего одной строкой:
String response = (String) template.convertSendAndReceive("query-example-6",message);
А так процедура обрабатывается на консьюмере:
@RabbitListener(queues = "query-example-6")
public String worker1(String message) throws InterruptedException {
logger.info("received on worker : " + message);
Thread.sleep(3000); //эмулируем полезную работу
return "received on worker : " + message;
}
В результате получаем:
2015-06-23 23:12:36.677 INFO 6536 --- [nio-8080-exec-5] com.rabbitmq.example6.SampleController : Emit 'Hello world' 2015-06-23 23:12:36.679 INFO 6536 --- [cTaskExecutor-1] com.rabbitmq.example6.RabbitMqListener : Received on worker : Hello world 2015-06-23 23:12:39.681 INFO 6536 --- [nio-8080-exec-5] com.rabbitmq.example6.SampleController : Received on producer 'Received on worker : Hello world'
package com.rabbitmq.example6;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@EnableRabbit
@Configuration
public class RabbitConfiguration {
Logger logger = Logger.getLogger(RabbitConfiguration.class);
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setQueue("query-example-6");
rabbitTemplate.setReplyTimeout(60 * 1000);
//no reply to - we use direct-reply-to
return rabbitTemplate;
}
@Bean
public Queue myQueue() {
return new Queue("query-example-6");
}
}
package com.rabbitmq.example6;
import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Random;
@Component
public class RabbitMqListener {
Logger logger = Logger.getLogger(RabbitMqListener.class);
@RabbitListener(queues = "query-example-6")
public String worker1(String message) throws InterruptedException {
logger.info("Received on worker : " + message);
Thread.sleep(3000);
return "Received on worker : " + message;
}
}
package com.rabbitmq.example6;
import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class SampleController {
Logger logger = Logger.getLogger(SampleController.class);
@Autowired
RabbitTemplate template;
@RequestMapping("/")
@ResponseBody
String home() {
return "Empty mapping";
}
@RequestMapping("/process/{message}")
@ResponseBody
String error(@PathVariable("message") String message) {
logger.info(String.format("Emit '%s'",message));
String response = (String) template.convertSendAndReceive("query-example-6",message);
logger.info(String.format("Received on producer '%s'",response));
return String.valueOf("returned from worker : " + response);
}
}
Заключение
У себя я использовал RabbitMQ в проекте в облачном хостинге heroku. Использовать RabbitMQ в heroku довольно просто — достаточно подключить одного из провайдеров RabbitMQ в консоли администрирования и тогда в переменных окружения появится адрес для доступа к кролику. Этот адрес нужно использовать при создании connectionFactory.
@Bean
public ConnectionFactory connectionFactory()
{
//получаем адрес AMQP у провайдера
String uri = System.getenv("CLOUDAMQP_URL");
if (uri == null) //значит мы запущены локально и нужно подключаться к локальному rabbitmq
uri = "amqp://guest:guest@localhost";
URI url = null;
try
{
url = new URI(uri);
} catch (URISyntaxException e)
{
e.printStackTrace(); //тут ошибка крайне маловероятна
}
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(url.getHost());
connectionFactory.setUsername(url.getUserInfo().split(":")[0]);
connectionFactory.setPassword(url.getUserInfo().split(":")[1]);
if (StringUtils.isNotBlank(url.getPath()))
connectionFactory.setVirtualHost(url.getPath().replace("/", ""));
connectionFactory.setConnectionTimeout(3000);
connectionFactory.setRequestedHeartBeat(30);
return connectionFactory;
}
В остальном код мало отличается от приведенного в примере 4(Routing).
Использованные источники
Страница проекта Spring AMQP
Страница проекта Spring Boot
Страница с примерами RabbitMQ