Пример использования RabbitMQ Delayed Message Exchange в Java Spring Framework
В этом посте я хочу показать как использовать отложенные сообщения в RabbitMQ. В качестве примера задачи, где удобно использовать отложенную очередь, возьму механизм постбеков (s2s ping back, s2s pixel).
1. Происходит некое событие
2. Ваше приложение должно уведомить об этом событии сторонний сервис
3. Если сторонний сервис оказался недоступен, то необходимо повторить уведомление еще раз через несколько минут
За основу я возьму официальный образ с management plugin, пригодится для тестирования.
Spring Framework полностью поддерживает плагин в проекте s
При использовании бутстраппера и аннотаций Spring берет большую часть работы на себя. Достаточно лишь написать:
И при запуске приложения RabbitAdmin автоматически объявит
Для отправки отложенного сообщения удобно использовать RabbitTemplate:
Отправлено оно будет моментально, но доставлено будет с задержкой, указанной в заголовке
Проверяем отложенную доставку через панель управления (rmq-management), она доступна на порту 15672:
При использовании XML конфигураций нужно просто установить у exchange-бина свойство
В двух словах о механизме постбеков:
1. Происходит некое событие
2. Ваше приложение должно уведомить об этом событии сторонний сервис
3. Если сторонний сервис оказался недоступен, то необходимо повторить уведомление еще раз через несколько минут
Для повторного уведомления я буду использовать отложенную очередь.
RabbitMQ по умолчанию не умеет задерживать сообщения, они доставляются сразу после публикации. Функционал отложенной доставки доступен в виде плагина rabbitmq-delayed-message-exchange.
Сразу хочу отметить, что плагин экспериментальный. Не смотря на то, что в целом он достаточно стабилен, использовать в продакшене его нужно на свой страх и риск.
Собираем Docker контейнер с RMQ и плагином
За основу я возьму официальный образ с management plugin, пригодится для тестирования.
Dockerfile:
FROM rabbitmq:3.6-management
RUN apt-get update && apt-get install -y curl
RUN curl http://www.rabbitmq.com/community-plugins/v3.6.x/rabbitmq_delayed_message_exchange-0.0.1.ez > $RABBITMQ_HOME/plugins/rabbitmq_delayed_message_exchange-0.0.1.ez
RUN rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange
Сборка
docker build --tag=x25/rmq-delayed-message-exchange .
Запуск
docker run -d --name rmq -p 5672:5672 -p 15672:15672 x25/rmq-delayed-message-exchange
Spring AMQP
Spring Framework полностью поддерживает плагин в проекте s
pring-rabbit
. Начиная с версии 1.6.4 можно пользоваться как xml/bean конфигурациями так и аннотациями.Я буду использовать Spring Boot Amqp Starter.
Зависимость для Maven
org.springframework.boot
spring-boot-starter-amqp
Зависимость для Gradle
compile "org.springframework.boot:spring-boot-starter-amqp"
Конфигурация через аннотации
При использовании бутстраппера и аннотаций Spring берет большую часть работы на себя. Достаточно лишь написать:
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = DELAY_QUEUE_NAME),
exchange = @Exchange(value = DELAY_EXCHANGE_NAME, delayed = "true"), key = DELAY_QUEUE_NAME))
public void onMessage(Message> message) {
//...
}
И при запуске приложения RabbitAdmin автоматически объявит
delayed exchange
, queue
, создаст обработчики событий и привяжет их к аннотированному методу.Нужно больше потоков для обработки сообщений? Это настраивается через файл внешней конфигурации (свойство spring.rabbitmq.listener.concurrency) или через параметр containerFactory у аннотации:
Пример
//Создаем конфигурацию:
@Configuration
public class RabbitConfiguration {
@Bean(name = "containerFactory")
@Autowired
public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(10);
factory.setPrefetchCount(30);
return factory;
}
}
//Добавляем параметр:
@RabbitListener(containerFactory = "containerFactory", ...)
Для отправки отложенного сообщения удобно использовать RabbitTemplate:
rabbitTemplate.send(
DELAY_EXCHANGE_NAME,
DELAY_QUEUE_NAME,
MessageBuilder
.withBody(data)
.setHeader("x-delay", MESSAGE_DELAY_MS).build()
);
Отправлено оно будет моментально, но доставлено будет с задержкой, указанной в заголовке
x-delay
. Максимально допустимое время задержки (2^32–1) мс.Готовый пример приложения:
@SpringBootApplication
public class Application {
private final Logger log = LoggerFactory.getLogger(Application.class);
private final static String DELAY_QUEUE_NAME = "delayed.queue";
private final static String DELAY_EXCHANGE_NAME = "delayed.exchange";
private final static String DELAY_HEADER = "x-delay";
private final static String NUM_ATTEMPT_HEADER = "x-num-attempt";
private final static long RETRY_BACKOFF = 5000;
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = DELAY_QUEUE_NAME),
exchange = @Exchange(value = DELAY_EXCHANGE_NAME, delayed = "true"), key = DELAY_QUEUE_NAME))
public void onMessage(Message message) {
String endpointUrl = new String(message.getPayload());
Long numAttempt = (Long) message.getHeaders().getOrDefault(NUM_ATTEMPT_HEADER, 1L);
log.info("Message received, url={}, attempt={}", endpointUrl, numAttempt);
if (!doNotifyEndpoint(endpointUrl) && numAttempt < 3) {
rabbitTemplate.send(
DELAY_EXCHANGE_NAME,
DELAY_QUEUE_NAME,
MessageBuilder
.withBody(message.getPayload())
.setHeader(DELAY_HEADER, numAttempt * RETRY_BACKOFF)
.setHeader(NUM_ATTEMPT_HEADER, numAttempt + 1)
.build()
);
}
}
private boolean doNotifyEndpoint(String url) {
try {
HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
/* @todo: set up connection timeouts */
return (connection.getResponseCode() == 200);
} catch (Exception e) {
log.error(e.getMessage());
return false;
}
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
application.yml
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
listener:
prefetch: 10
concurrency: 50
build.gradle
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:1.4.2.RELEASE")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
jar {
baseName = 'rmq-delayed-demo'
version = '0.1.0'
}
repositories {
mavenCentral()
}
sourceCompatibility = 1.8
targetCompatibility = 1.8
dependencies {
compile("org.springframework.boot:spring-boot-starter-amqp")
testCompile("org.springframework.boot:spring-boot-starter-test")
}
Проверяем отложенную доставку через панель управления (rmq-management), она доступна на порту 15672:
Логи:
2016-12-21 14:27:25.927: Message received, url=http://localhost:1234, attempt=1
2016-12-21 14:27:25.941: Connection refused (Connection refused)
2016-12-21 14:27:30.946: Message received, url=http://localhost:1234, attempt=2
2016-12-21 14:27:30.951: Connection refused (Connection refused)
2016-12-21 14:27:40.954: Message received, url=http://localhost:1234, attempt=3
Конфигурация через XML
При использовании XML конфигураций нужно просто установить у exchange-бина свойство
delayed
в true
и RabbitAdmin сделает все остальное за вас.Пример xml-конфигурации в связке с Integration Framework
Полезные ссылки
- Официальная документация
- Страница плагина