Паттерн Poison Pill в Java
Привет, Хабр!
Сегодня поговорим о паттерне Poison Pill и его реализацию в Java.
Poison Pill, или «ядовитая пилюля», — это шаблон, используемый для мягкой остановки потока или процесса. Суть его заключается в отправке специального, заранее определенного сообщения, которое сигнализирует о необходимости завершения работы. Это сообщение обрабатывается в рамках нормального потока сообщений и позволяет потоку корректно завершить работу, освободить ресурсы и закрыть все активные соединения.
Принцип работы
Архитектура паттерна Poison Pill включает несколько компонентов: Message
, Producer
, Consumer
и MessageQueue
.
Message который определяет структуру сообщений. Сообщения могут включать различные заголовки и тело сообщения. Пример реализации — SimpleMessage
:
MessageQueue объединяет точки публикации MqPublishPoint
и подписки MqSubscribePoint
. Он представляет собой очередь, через которую сообщения передаются от производителей к потребителям. Пример реализации — SimpleMessageQueue
, использующий BlockingQueue
для хранения сообщений.
Producer создает сообщения и помещает их в очередь. Т.е когда производитель завершает свою работу, он отправляет сообщение Poison Pill, чтобы уведомить потребителей о необходимости остановки.
Consumer является потребителем и извлекает сообщения из очереди и обрабатывает их. Если потребитель получает Poison Pill, он завершает свою работу
В целом, принцип работы Poison Pill основан на циклическом взаимодействии между производителями и потребителями через очередь сообщений. Производители генерируют сообщения и помещают их в очередь. Каждое сообщение обрабатывается потребителями, которые извлекают его из очереди. Когда производитель завершает работу и отправляет Poison Pill, потребитель, получив этот сигнал, прекращает обработку и завершает выполнение.
Реализация в Java
Определим интерфейс Message и класс SimpleMessage:
public interface Message {
void addHeader(String key, String value);
String getHeader(String key);
String getBody();
void setBody(String body);
}
public class SimpleMessage implements Message {
private Map headers = new HashMap<>();
private String body;
@Override
public void addHeader(String key, String value) {
headers.put(key, value);
}
@Override
public String getHeader(String key) {
return headers.get(key);
}
@Override
public void setBody(String body) {
this.body = body;
}
@Override
public String getBody() {
return body;
}
}
Определим MessageQueue и его реализации SimpleMessageQueue:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public interface MessageQueue extends MqPublishPoint, MqSubscribePoint {}
public class SimpleMessageQueue implements MessageQueue {
private BlockingQueue queue;
public SimpleMessageQueue(int capacity) {
this.queue = new LinkedBlockingQueue<>(capacity);
}
@Override
public void put(Message message) throws InterruptedException {
queue.put(message);
}
@Override
public Message take() throws InterruptedException {
return queue.take();
}
}
Реализация Producer и Consumer:
public class Producer implements Runnable {
private final MessageQueue queue;
private volatile boolean isStopped;
public Producer(MessageQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (!isStopped) {
SimpleMessage message = new SimpleMessage();
message.setBody("Important data");
queue.put(message);
Thread.sleep(1000); // Имитация работы
}
queue.put(new PoisonPillMessage()); // Отправка Poison Pill
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void stop() {
isStopped = true;
}
}
public class Consumer implements Runnable {
private final MessageQueue queue;
public Consumer(MessageQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
Message message;
while (true) {
message = queue.take();
if (message instanceof PoisonPillMessage) break; // Остановка если Poison Pill
processMessage(message);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void processMessage(Message message) {
System.out.println("Processed: " + message.getBody());
}
}
public class PoisonPillMessage extends SimpleMessage {
public PoisonPillMessage() {
setBody("POISON_PILL");
}
}
Запуск и координация потоков:
public class Main {
public static void main(String[] args) {
MessageQueue queue = new SimpleMessageQueue(10);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
Thread producerThread = new Thread(producer);
Thread consumerThread = new Thread(consumer);
producerThread.start();
consumerThread.start();
try {
Thread.sleep(5000); // позволяем производить сообщения
producer.stop(); // останавливаем производителя
producerThread.join();
consumerThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Альтернативы
Manual Offset Management — один из подходов к управлению отказами заключается в ручном смещении офсета до первого сообщения после Poison Pill. Так можно получить контроль над тем, какие сообщения будут пропущены, но требует доступа к Kafka или другому брокеру сообщений.
Error-Handling Deserializers: в случае использования Apache Kafka можно применить специальный десериализатор с обработчиком ошибок. Он пропускает некорректные сообщения, тем самым избегая зацикливания при обработке данных, и передаёт сообщения об ошибках в спец. лог.
С конфигурацией Spring Kafka можно использовать SeekToCurrentErrorHandler
для управления ошибками при десериализации сообщений. Обработчик смещает офсет в точку после неудачного сообщения, избегая блокировки обработки последующих сообщений.
Используя паттерн команды, можно создать единый интерфейс для различных типов задач и использовать общий механизм завершения их выполнения, что также можно адаптировать под конкретные сценарии.
А можно воспользоваться просто прямым завершением потоков. Это можно реализовать через управление потоком через класс Thread
:
public class Main {
public static void main(String[] args) {
Thread worker1 = new Thread(new Worker());
Thread worker2 = new Thread(new Worker());
worker1.start();
worker2.start();
try {
Thread.sleep(1000); // даем время потокам поработать
} catch (InterruptedException e) {
e.printStackTrace();
}
// прямое завершение потоков
worker1.interrupt();
worker2.interrupt();
System.out.println("Потоки были прямо завершены.");
}
}
class Worker implements Runnable {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
System.out.println("Работа потока " + Thread.currentThread().getName());
Thread.sleep(100);
} catch (InterruptedException e) {
System.out.println("Поток " + Thread.currentThread().getName() + " прерван.");
Thread.currentThread().interrupt(); // рекомендуется восстановить статус прерванного состояния
}
}
}
}
Поток выполняет свою работу в цикле до тех пор, пока не будет прерван. Метод interrupt()
используется для отправки запроса на прерывание потоку, который проверяет свое состояние с помощью Thread.currentThread().isInterrupted()
.
В заключение напомню про открытый урок, который пройдет сегодня вечером в OTUS: «Переопределение, скрытие и передекларация в Java». Записаться можно по ссылке.