Паттерн Poison Pill в Java

009ace9ae038920a53848f29abd475e8.png

Привет, Хабр!

Сегодня поговорим о паттерне Poison Pill и его реализацию в Java.

Poison Pill, или «ядовитая пилюля», — это шаблон, используемый для мягкой остановки потока или процесса. Суть его заключается в отправке специального, заранее определенного сообщения, которое сигнализирует о необходимости завершения работы. Это сообщение обрабатывается в рамках нормального потока сообщений и позволяет потоку корректно завершить работу, освободить ресурсы и закрыть все активные соединения.

Принцип работы

Архитектура паттерна Poison Pill включает несколько компонентов: Message, Producer, Consumer и MessageQueue.

Message который определяет структуру сообщений. Сообщения могут включать различные заголовки и тело сообщения. Пример реализации — SimpleMessage:

MessageQueue объединяет точки публикации MqPublishPoint и подписки MqSubscribePoint. Он представляет собой очередь, через которую сообщения передаются от производителей к потребителям. Пример реализации — SimpleMessageQueue, использующий BlockingQueue для хранения сообщений.

Producer создает сообщения и помещает их в очередь. Т.е когда производитель завершает свою работу, он отправляет сообщение Poison Pill, чтобы уведомить потребителей о необходимости остановки.

Consumer является потребителем и извлекает сообщения из очереди и обрабатывает их. Если потребитель получает Poison Pill, он завершает свою работу

В целом, принцип работы Poison Pill основан на циклическом взаимодействии между производителями и потребителями через очередь сообщений. Производители генерируют сообщения и помещают их в очередь. Каждое сообщение обрабатывается потребителями, которые извлекают его из очереди. Когда производитель завершает работу и отправляет Poison Pill, потребитель, получив этот сигнал, прекращает обработку и завершает выполнение.

Реализация в Java

Определим интерфейс Message и класс SimpleMessage:

public interface Message {
    void addHeader(String key, String value);
    String getHeader(String key);
    String getBody();
    void setBody(String body);
}

public class SimpleMessage implements Message {
    private Map headers = new HashMap<>();
    private String body;

    @Override
    public void addHeader(String key, String value) {
        headers.put(key, value);
    }

    @Override
    public String getHeader(String key) {
        return headers.get(key);
    }

    @Override
    public void setBody(String body) {
        this.body = body;
    }

    @Override
    public String getBody() {
        return body;
    }
}

Определим MessageQueue и его реализации SimpleMessageQueue:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public interface MessageQueue extends MqPublishPoint, MqSubscribePoint {}

public class SimpleMessageQueue implements MessageQueue {
    private BlockingQueue queue;

    public SimpleMessageQueue(int capacity) {
        this.queue = new LinkedBlockingQueue<>(capacity);
    }

    @Override
    public void put(Message message) throws InterruptedException {
        queue.put(message);
    }

    @Override
    public Message take() throws InterruptedException {
        return queue.take();
    }
}

Реализация Producer и Consumer:

public class Producer implements Runnable {
    private final MessageQueue queue;
    private volatile boolean isStopped;

    public Producer(MessageQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (!isStopped) {
                SimpleMessage message = new SimpleMessage();
                message.setBody("Important data");
                queue.put(message);
                Thread.sleep(1000);  // Имитация работы
            }
            queue.put(new PoisonPillMessage());  // Отправка Poison Pill
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void stop() {
        isStopped = true;
    }
}

public class Consumer implements Runnable {
    private final MessageQueue queue;

    public Consumer(MessageQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            Message message;
            while (true) {
                message = queue.take();
                if (message instanceof PoisonPillMessage) break;  // Остановка если Poison Pill
                processMessage(message);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void processMessage(Message message) {
        System.out.println("Processed: " + message.getBody());
    }
}

public class PoisonPillMessage extends SimpleMessage {
    public PoisonPillMessage() {
        setBody("POISON_PILL");
    }
}

Запуск и координация потоков:

public class Main {
    public static void main(String[] args) {
        MessageQueue queue = new SimpleMessageQueue(10);

        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        Thread producerThread = new Thread(producer);
        Thread consumerThread = new Thread(consumer);

        producerThread.start();
        consumerThread.start();

        try {
            Thread.sleep(5000);  // позволяем производить сообщения
            producer.stop();     // останавливаем производителя
            producerThread.join();
            consumerThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Альтернативы

Manual Offset Management — один из подходов к управлению отказами заключается в ручном смещении офсета до первого сообщения после Poison Pill. Так можно получить контроль над тем, какие сообщения будут пропущены, но требует доступа к Kafka или другому брокеру сообщений.

Error-Handling Deserializers: в случае использования Apache Kafka можно применить специальный десериализатор с обработчиком ошибок. Он пропускает некорректные сообщения, тем самым избегая зацикливания при обработке данных, и передаёт сообщения об ошибках в спец. лог.

С конфигурацией Spring Kafka можно использовать SeekToCurrentErrorHandler для управления ошибками при десериализации сообщений. Обработчик смещает офсет в точку после неудачного сообщения, избегая блокировки обработки последующих сообщений.

Используя паттерн команды, можно создать единый интерфейс для различных типов задач и использовать общий механизм завершения их выполнения, что также можно адаптировать под конкретные сценарии.

А можно воспользоваться просто прямым завершением потоков. Это можно реализовать через управление потоком через класс Thread:

public class Main {
    public static void main(String[] args) {
        Thread worker1 = new Thread(new Worker());
        Thread worker2 = new Thread(new Worker());
        worker1.start();
        worker2.start();

        try {
            Thread.sleep(1000); // даем время потокам поработать
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // прямое завершение потоков
        worker1.interrupt();
        worker2.interrupt();

        System.out.println("Потоки были прямо завершены.");
    }
}

class Worker implements Runnable {
    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                System.out.println("Работа потока " + Thread.currentThread().getName());
                Thread.sleep(100);
            } catch (InterruptedException e) {
                System.out.println("Поток " + Thread.currentThread().getName() + " прерван.");
                Thread.currentThread().interrupt(); // рекомендуется восстановить статус прерванного состояния
            }
        }
    }
}

Поток выполняет свою работу в цикле до тех пор, пока не будет прерван. Метод interrupt() используется для отправки запроса на прерывание потоку, который проверяет свое состояние с помощью Thread.currentThread().isInterrupted() .

В заключение напомню про открытый урок, который пройдет сегодня вечером в OTUS: «Переопределение, скрытие и передекларация в Java». Записаться можно по ссылке.

© Habrahabr.ru