RabbitMQ + Java Spring Часть 2

Предыдущая часть: RabbitMQ + Java Spring

P. S.: Гит разбит на ветки по частям (part-1, part-2…)

Я продолжаю серию, посвящённую использованию RabbitMQ в связке с Spring.В предыдущей части был реализован простой обмен двух микросервисов посредством очереди в MQ.В этот раз рассмотрим крутые фичи, которые предоставляет RabbitMQ

Message Acknowledgment

В предыдущем примере сообщение максимально быстро удалялось из очереди, когда приложениезабирало его. Но на стороне consumer может возникнуть сбой во время обработки сообщения. Выходит следующая ситуация: сообщение уже удалено из очереди, но на стороне клиента не обработано. Как следствие данные потеряны.

Для настройки такого момента на стороне producer можно настроить autoAck параметр. Слушатель автоматически удалит сообщение,  если autoAck = AUTO. Если autoAck = MANUAL,  то оповещать очередь нужно вручную.

Разыграем такую ситуацию:  на стороне Consumer заведём HashSet,  который будет содержать пришедшие сообщения. Если приложение заберёт сообщение,  которого не будет в HashSet,  то оно его положит в него и вызовет исключение,  иначе просто удалит из него сообщение. Притом будет две очереди и два слушателя:  в первом случае autoAck = true,  во втором autoAck = false

Добавим queue_with_auto_ack и queue_without_auto_ack в application.properties:

rabbitmq.queue_with_auto_ack.name=queue_with_auto_ack
rabbitmq.queue_without_auto_ack.name=queue_without_auto_ack

Добавим переменные в RabbitMQConfig:

    @Value("${rabbitmq.queue_with_auto_ack.name}")
    private String queueWithAutoAckName;

    @Value("${rabbitmq.queue_without_auto_ack.name}")
    private String queueWithoutAutoAckName;

Настроим очереди queue_with_auto_ack и queue_without_auto_ack в классе RabbitMQConfig:

    @Bean
    public Queue autoAckWithException() {
        return new Queue(queueWithAutoAckName, false);
    }

    @Bean
    public Queue autoAckWithoutException() {
        return QueueBuilder
                .nonDurable(queueWithoutAutoAckName)
                .withArgument("x-dead-letter-exchange", "")
                .withArgument("x-dead-letter-routing-key", queueWithoutAutoAckName)
                .build();
    }

Сервис (MessageService) для отправки сообщений в них (Будем отправлять сразу в две очереди):

    /**
     * Посылает сообщение в две очереди: одна оповещается автоматически, вторая - нет
     * @param message Сообщение
     */
    public void sendMessageToQueueWithAndWithoutAutoAck(String message) {
        template.convertAndSend(queueWithAutoAck, message);
        template.convertAndSend(queueWithoutAutoAck, message);
    }

Контроллер для отправки сообщений в них:

    @Operation(description = "Send message to RabbitMQ in queue with autoAck and without autoAck")
    @ApiResponse(responseCode = "204")
    @PostMapping("/with_and_without_auto_ack")
    public void sendMessageToQueueWithAndWithoutAutoAck(String message) {
        service.sendMessageToQueueWithAndWithoutAutoAck(message);
    }

Настроим слушателей на стороне Consumer (аналогично нужно перед этим добавить в application.properties переменные):

    @RabbitListener(queues = "#{@environment.getProperty('rabbitmq.queue_with_auto_ack.name')}")
    public void receiveMessageWithAutoAckMessageHash(String message) {
        System.out.println("Queue with auto ack got: " + message);
        if (withAutoAckMessageHash.contains(message)) {
            withAutoAckMessageHash.remove(message);
        } else {
            withAutoAckMessageHash.add(message);
            System.out.println("Simulate Exception");
        }
    }

    @RabbitListener(
            queues = "#{@environment.getProperty('rabbitmq.queue_without_auto_ack.name')}",
            ackMode = "MANUAL"
    )
    public void receiveMessageWithoutAutoAckMessageHashAndAckMode(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        System.out.println("Queue without auto ack got: " + message);
        if (withoutAutoAckMessageHash.contains(message)) {
            withoutAutoAckMessageHash.remove(message);
            channel.basicAck(tag, false);
        } else {
            withoutAutoAckMessageHash.add(message);
            System.out.println("Simulate exception");
            channel.basicNack(tag, false, true);
        }
    }

P.S.:  Класс Channel нужно импортировать из com.rabbitmq.client!

Пояснения:

  • ackMode — то, как мы оповещаем RabbitMQ о получении сообщения

  • basicAck — оповещение RabbitMQ о том, что сообщение получили

  • basicNack — отменить одно или несколько сообщений. 3 параметр показывает, нужно ли вернуть отменённые сообщения в очередь

Можем увидеть,  что сообщение пришло два раза туда, где autoAck = Manual:

fdd6a25793d51571a8638ba32c7cb5c0.png

Message durability

Тут всё просто. Если consumer или producer помрёт,  то сообщения спокойно будут лежать в RabbitMQ. Но если умрёт RabbitMQ,  то все очереди потрутся,  если не установить параметр durable для очереди как true С данным параметром очередь сохраняет своё состояние.

Для того чтобы данные Rabbitmq сохранились в docker необходимо создать volume:

docker volume create rabbitmq_data

Поднимаем RabbitMQ с volume:

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 -v rabbitmq_data:/var/lib/rabbitmq rabbitmq:4.0-management

Создадим очередь:

  • application.properties:

rabbitmq.durable_queue.name=durable_queue
  • RabbitMQConfig:

    @Bean
    public Queue durableQueue() {
        return new Queue(durableQueueName, true);
    }
    @RabbitListener(queues = "#{@environment.getProperty('rabbitmq.queue_without_auto_ack.name')}")
    public void receiveDurableMsg(String message) {
        System.out.println("Durable message: " + message);
    }

Объявилась новая очередь. Остановим RabbitMQ (убейте его в Docker):

docker stop rabbitmq

Поднимаем RabbiMQ снова:

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 -v rabbitmq_data:/durable rabbitmq:4.0-management

Все сообщения пропали из очередей,  кроме durable очереди.

Fair dispatch

Тут тоже всё просто. Данный параметр позволяет определить максимальное количество отправленных сообщений на обработку в один момент для данного слушателя. Пока приложение не обработает сообщение,  отправляться новые для него не будут. Это полезно в случае,  если у нас несколько потребителей сообщений. Может выйти так,  что одно из сообщений будет «тяжело» обрабатываться одним из потребителей. Другие потребители будут спокойно обрабатывать сообщения,  а у того,  кто обрабатывал «тяжёлое» сообщение может накопиться целая их кипа,  с которой он будет долго справляться.

e4cc15d5606ff18466873907859ec1fc.png

Prefetch count мы в целом устанавливаем для всего приложения,  установим это значение в application.properties:

spring.rabbitmq.listener.direct.prefetch=3
spring.rabbitmq.listener.simple.prefetch=3

В данной статье пока не будем рассматривать direct и simple.

В этой части мы разобрались с тем,  как можно настраивать работу с очередями в RabbitMQ.

Git: https://github.com/3abubenni/rabbitmq/blob/part-2

© Habrahabr.ru