Паттерны применения многопоточности на коммерческом проекте (на примере Java)
Данная статья написана по мотивам одной из лекций, которые я читаю на своем курсе, подробности можете найти в моем телеграм канале
Также в данной статье будут отсылки к моим прошлым статьям по многопоточности, поскольку они дополняют друг друга
Код из статьи можно найти в репозитории на GitHub
Для чего нужна многопоточность
Многопоточность неразрывно связана с отзывчивостью вашего приложения под нагрузкой. Если нагрузки нет, скажем 5 запросов в час, то и о многопоточности можно особо не задумываться
На коммерческих проектах чаще встречается ситуация, когда вашим приложением пользуются сотни и даже тысячи клиентов одновременно, в такие моменты без многопоточности никуда, при этом приложение должно работать эффективно, не расходуя лишние ресурсы
Я собрал самые часто встречаемые паттерны работы с многопоточностью из своей практики, которые помогают писать отказоустойчивые и надежные приложения
В данной статье будем рассматривать многопоточность без применения webflux, однако если будет спрос, то распишу те же паттерны на webflux’е, там они выглядят более органично
Также не буду затрагивать Project Loom, поскольку мне не приходилось отлаживать и поддерживать код с использованием Loom на коммерческом проекте (Java 21 еще не используем), но когда-нибудь я напишу подробную и понятную статью по Loom
Стоимость потоков в Java и пулы потоков
Про стоимость потоков в Java я расписал в предыдущей статье про webflux, поэтому здесь напишу тезисно. Потоки используют следующие ресурсы:
CPU — переключение между потоками, их создание и завершение требует процессорного времени, при этом если количество потоков становится слишком большим, примерно > 3–4 тысяч (зависит от мощностей сервера), то процессор начинает тратить значительно количество времени на переключение между ними, производительность приложения перестает расти и даже падает
RAM — стек потока занимает место в оперативной памяти
Количество потоков ограничено возможностями Операционной системы
Для решения этих проблем мы должны создавать ограниченное число потоков, при этом если количество задач превышает количество созданных потоков, они должны помещаться в очередь, где будут дожидаться своего выполнения. Для этого предназначены пулы потоков — ExecutorService’ы
Подробно про ExecutorService’ы я рассказал в другой своей статье, там я разобрал как правильно создавать эластичные пулы потоков и объяснил почему не стоит использовать стандартные пулы из фабрики Executors
Для демонстрации паттернов нам понадобятся executorService«ы, создавать мы их будем по такому образцу:
Создание executorService:
@Bean(destroyMethod = "shutdown")
public ExecutorService elasticExecutor() {
return createElasticExecutor(10, 100);
}
private ThreadPoolExecutor createElasticExecutor(int threads, int queueCapacity) {
BlockingQueue queue = new ArrayBlockingQueue<>(queueCapacity);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
threads, threads,
60L, TimeUnit.SECONDS,
queue, new ThreadPoolExecutor.AbortPolicy());
threadPoolExecutor.allowCoreThreadTimeOut(true);
return threadPoolExecutor;
}
Теперь, когда мы поняли, что нужно использовать пулы потоков для эффективного управления ресурсами, можем переходить непосредственно к разбору паттернов
Какие паттерны мы рассмотрим
Запуск асинхронного процесса по сигналу
Параллельное выполнение задач без ожидания результата
Ограничение количества потоков для вызова внешнего сервиса
Выполнение Scheduled задач
Параллельное обращение к разным источникам, с последующим объединением результатов
Контекст
Ожидание асинхронного ответа
Запуск асинхронного процесса по сигналу
Пожалуй это наиболее простой паттерн с которого стоит начать
Сигналом может выступать любое событие, которое не дожидается результата выполнения сценария, а лишь запускает его, например:
HTTP сообщение — приходит HTTP запрос на составление аналитического отчета, сам отчет будет формироваться несколько минут, при этом клиентский сервис не хочет ждать так долго, ему нужно лишь запустить процесс
Сообщение из брокера (Kafka, IBM MQ, RabbitMQ) — мы не всегда хотим обрабатывать сообщение в том же потоке, в котором оно поступает, при высоких нагрузках лучше переключиться на отдельный пул потоков, чтобы не блокировать чтение новых сообщений (в той же kafka количество читающих потоков равно количеству партиций, на которые подписался экземпляр приложения, а их, как правило, мало)
Запуск события по времени — например выполнить перерасчет данных в бд
Для запуска асинхронного выполнения какого-то процесса необходимо его передать пулу потоков, сделать это можно несколькими способами:
public void runWithStraightExecuteMethod() {
runAsyncTasksElasticExecutor.execute(() -> {
executeLongOperation();
});
}
@Async("runAsyncTasksElasticExecutor")
public void runWithAnnotation() {
executeLongOperation();
}
public void runWithCompletableFuture() {
CompletableFuture.runAsync(() -> {
executeLongOperation();
},
runAsyncTasksElasticExecutor);
}
Все три вышеприведенных куска кода идентичны в плане поведе
Если запустить любой из них, то вызывающий поток будет отпущен, а выполнение метода executeLongOperation()
начнется в потоке executorService«а
Параллельное выполнение задач без ожидания результата
Данный паттерн является альтернативным вариантом предыдущего, но все же заслуживает рассмотрения, т.к. часто встречается на практике
Допустим вам поступило сообщение, которое нужно отправить в разные внешние источники по HTTP или другому протоколу. При этом нам не важен порядок в котором будет выполняться отправка, а также результаты каждой отправки не зависят от результатов предыдущих (независимы)
Это означает, что данные задачи могут выполняться в параллельных потоках
Для начала создадим интерфейс с единственным методом, назначением которого будет отправить сообщение во внешний сервис:
public interface MessageSender {
void send(Message message);
}
При этом может быть сколько угодно сервисов, реализующих данный интерфейс
Демонстрационный пример такого сервиса:
@Slf4j
@Service
public class MessageSender1implements MessageSender {
@Override
public void send(Message message) {
// Выполнение операции отправки
MentoringUtil.sleep(1500);
log.info("Сообщение отправлено в источник #1");
}
}
Напишем класс, который будет передавать обработку сразу нескольким MessageSender«ам в разных потоках:
@Service
@RequiredArgsConstructor
public class SendInParallelExample {
private final List messageSenders;
private final ExecutorService runParallelTasksElasticExecutor;
public void sendMessageToSeveralTargets(Message message) {
messageSenders.forEach(messageSender ->
CompletableFuture.runAsync(() ->
messageSender.send(message), runParallelTasksElasticExecutor)
);
}
}
Пояснения к коду:
Импортируем коллекцию бинов, реализующих интерфейс MessageSender и бин ExecutorService, в котором будет происходить выполнение кода отправщиков
Итерируемся по бинам MessageSender«ам, и для каждого выполняем асинхронный вызов
CompletableFuture.runAsync(…)
, без ожидания результата, вторым параметром передаем ExecutorService, чтобы выполнение происходило в его потоках
Таким способом мы можем запустить параллельные задачи. При этом мы можем пользоваться и другими подходами, приведенными в предыдущем паттерне (прямой вызов executorService.execute(…)
или аннотация @Async)
Пару слов про parallelStream. Мы могли бы сделать то же самое через .parallelStream ().forEach (…), без использования CompletableFuture, но данный подход не подойдет для параллельного выполнения блокирующих операций (таких как внешние Rest вызовы), поскольку parallelStream использует ForkJoinPool, количество рабочих потоков которого ограниченно количеством ядер процессора, что может приводить к низкой производительности
Ограничение количества потоков для вызова внешнего сервиса
В микросервисной архитектуре желательно ограничивать количество одновременных вызовов внешних микросервисов, чтобы предотвратить возможные перегрузки. Если внешний сервис начинает деградировать, то и вызывающие сервисы могут начать деградировать вслед за ним, поскольку заканчиваются их потоки
Особенно остро данная проблема проявляется при работе с WebFlux, когда нет явного ограничения количеством потоков и ничто не мешает сделать одновременно десятки тысяч вызовов
Такая ситуация называется Chain Of Failures, когда деградация одного микросервиса приводит к каскадной деградации остальных, и ограничение количества вызовов позволяет предотвратить падение
Дополнительным решением данной проблемы является паттерн Circuit Breaker, но он скорее относится к паттернам микросервисов и не имеет отношения к многопоточности, поэтому здесь рассмотрен не будет
Варианты ограничения потоков в Java:
Semaphore
Исторически, для ограничения количества потоков, применяется класс Semaphore
Пример использования:
private final Semaphore semaphore = new Semaphore(10);
public String parsePageWithSemaphoreRateLimiting() {
boolean isAcquired;
try {
isAcquired = semaphore.tryAcquire(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new CustomAppException(e);
}
if (!isAcquired) {
log.info("Не удалось захватить поток семафора");
return "fallback result";
}
try {
return marketPlaceParser.parsePage();
} catch (Exception e) {
return "fallback result";
} finally {
semaphore.release();
}
}
Пояснения к коду:
Создаем семафор на 10 разрешений
В течение двух секунд поток пытается захватить разрешение семафора
Если захватить не удалось, то прекращаем выполнение метода, возвращая заготовленный ответ (так называемый «fallback»)
Если удалось захватить, то выполняем внешний вызов с помощью метода
marketPlaceParser.parsePage()
Обязательно отпускаем разрешение семафора в блоке finally
Также в случае возникновения исключений при обращении к внешнему сервису возвращаем fallback
У данного решения есть несколько недостатков:
код выглядит громоздко
выполнение и ожидание происходит в вызывающих потоках, а значит мы можем заставить ждать все вызывающие потоки, сильно ухудшая производительность системы
нет гибкости при ожидании результата выполнения, которую нам дает CompletableFuture
CompletableFuture с выделенным пулом потоков
В качестве более продвинутого подхода можно использовать CompletableFuture с выделенным пулом потоков (мы его уже видели ранее)
Благодаря CompletableFuture вы можете более гибко управлять результатом, дожидаясь его при необходимости, выставляя таймаут и обрабатывая ошибки
Решение:
public CompletableFuture parsePageWithRateLimitingCompletableFuture() {
try {
return CompletableFuture.supplyAsync(marketPlaceParser::parsePage, parserElasticExecutor)
.orTimeout(4, TimeUnit.SECONDS)
.exceptionally(e -> {
log.warn("Ошибка при вызове внешнего сервиса", e);
return "fallback result";
});
} catch (RejectedExecutionException e) {
log.warn("Переполнена очередь выполонения задач на вызов внешнего сервиса");
return CompletableFuture.completedFuture("fallback result");
}
}
Пояснения к коду:
Передаем вызов внешнего сервиса выделенному пулу потоков, с помощью явного указания пула вторым параметром в методе
CompletableFuture.supplyAsync(…, parserElasticExecutor)
Выставляем таймаут в 4 секунды на ожидание ответа
В операторе
.exceptionally(…)
отлавливаем возможные исключения при вызове сервиса и возвращаем fallbackОборачиваем весь код в try-catch, поскольку если очередь задач parserElasticExecutor«а будет переполнена, то при попытке вызвать
supplyAsync(..)
будет выброшено исключение RejectedExecutionException, в таком случае мы также возвращаем fallback
Данный подход лучше семафора по следующим параметрам:
есть таймаут на вызов внешнего сервиса
fallback результат в случае ошибок
вызовы внешнего сервиса происходят в потоках выделенного executorService«а, в котором мы можем регулировать как максимальное количество потоков, так и глубину очереди задач
Благодаря этому мы получаем ожидаемое поведение и можем гибко контроллировать подаваемую нагрузку, однако всё еще код выглядит громоздко
Готовые инструменты библиотеки resilience4j: @Bulkhead и @RateLimiter
Для использования resilience4j добавим зависимость:
implementation "io.github.resilience4j:resilience4j-spring-boot2:2.2.0"
Bulkhead
Имеет две реализации SEMAPHORE (по умолчанию) и THREADPOOL, которые по сути предлагают те же самые решения, что были продемонстрированы выше
Bulkhead semaphore
Добавим конфигурацию в application.yml:
resilience4j:
bulkhead:
instances:
parserSemaphore:
maxConcurrentCalls: 10
maxWaitDuration: 5s
parserSemaphore — название конфигурации
maxConcurrentCalls — количество разрешений семафора
maxWaitDuration — время в течение которого поток будет ждать разрешения от семафора, если таймаут превышен, то будет выброшено исключение BulkheadFullException
Решение:
@Bulkhead(name = "parserSemaphore", fallbackMethod = "fallbackMethod")
public String parsePageWithSemaphoreBulkheadResilience4j() {
return marketPlaceParser.parsePage();
}
public String fallbackMethod(Exception e) {
log.warn("Ошибка при вызове внешнего сервиса", e);
return "fallback result";
}
Пояснения к коду:
Используем аннотацию @Bulkhead, в поле name указываем наименование конфигурации, а также указываем наименование fallback метода
Вызываем внешний сервис
Объявляем fallback метод — он должен быть публичным, принимать исключение в качестве аргумента и возвращать тот же тип данных в результате, что и основной метод
Все исключения при выполнении метода будут отловлены с помощью fallback-метода
По сути это более удобная запись того что было продемонстрировано ранее с классическим семафором, поэтому остается проблема с ожиданием разрешений семафора в вызывающих потоках
Bulkhead theadpool
Опишем конфигурацию:
resilience4j:
bulkhead:
instances:
parserThreadPool:
coreThreadPoolSize: 10
maxThreadPoolSize: 10
queueCapacity: 100
Конфигурация представляет собой стандартные настройки, которые используются для создания ThreadPoolExecutor:
coreThreadPoolSize — количество core потоков пула
maxThreadPoolSize — максимальное число потоков пула
queueCapacity — глубина очереди задач
Также нам понадобиться @TimeLimiter для корректной работы таймаута:
resilience4j:
...
timelimiter:
instances:
parserThreadPool:
timeoutDuration: 3s
Решение:
@TimeLimiter(name = "parserThreadPool", fallbackMethod = "fallbackMethodCompletableFuture")
@Bulkhead(name = "parserThreadPool", fallbackMethod = "fallbackMethodCompletableFuture", type = Bulkhead.Type.THREADPOOL)
public CompletableFuture parsePageWithThreadPoolBulkheadAndTimeLimiterResilience4j() {
String response = marketPlaceParser.parsePage();
return CompletableFuture.completedFuture(response);
}
public CompletableFuture fallbackMethodCompletableFuture(Exception e) {
log.warn("Ошибка при вызове внешнего сервиса", e);
return CompletableFuture.completedFuture("fallback result");
}
Пояснения к коду:
В аннотации @Bulkhead указываем тип Bulkhead.Type.THREADPOOL
Указываем @TimeLimiter
Вызываем внешний сервис внутри метода, при этом выполнение будет происходить в пуле потоков, созданном в @Bulkhead
Возвращаем CompletableFuture.completedFuture (…), во-первых возврат CompletableFuture в результате является обязательным для аннотаций @Bulkhead (с типом THREADPOOL) и @TimeLimiter, во-вторых это позволяет нам гибко управлять ожиданием результата
Объявляем fallback метод
При вышеуказанном подходе, мы получаем то же самое поведение, что было описано выше с использованием выделенного пула потоков и CompletableFuture
Здесь также добавлен TimeLimiter, поскольку если указать оператор .orTimeout(…)
в CompletableFuture, то таймаут будет работать некорректно — все потоки, ожидающие в очереди executorService«а bulkhead«а не увидят оператор .orTimeout()
, поэтому будут ожидать своей очереди бесконечно долго
Будьте внимательны. Стандартный threadPool, который создается внутри кода resilience4j, будет использовать core потоки до тех пор, пока очередь задач не заполнится до конца, и только потом начнет увеличивать количество потоков до значения maxThreadPoolSize, что может приводить к нежелательному поведению. Для пресечения такого поведения я рекомендую создавать количество core потоков равное maxThreadPoolSize, и при этом их должно быть немного, поскольку они не будут завершаться
RateLimiter
Bulkhead паттерн ограничивает максимальное количество одновременно вызывающих потоков, однако этого не всегда достаточно. Представим что внешний сервис отвечает очень быстро — за 10 мс каждый ответ, и мы использовали простое ограничение количества одновременных вызовов 10-ю штуками, тогда мы столкнемся с проблемой, что за секунду может выполняться до 1000 запросов (каждый поток по 100 вызовов в секунду), тем самым ограничение вроде бы есть, но нагрузки все равно могут быть высокими
Эту проблему решает RateLimiter. Он похож на Bulkhead, но при этом добавляет «скользящее окно», с помощью которого ограничивается количество вызовов за определенное время, например не более 100 запросов за последние 10 секунд
Т.е. если в первую секунду выполниться 100 вызовов, то в остальные 9 секунд все дальнейшие запросы будут возвращать fallback (или обрабатываться с ошибкой RequestNotPermitted, если fallback не настроен)
Получается что данный паттерн дает нам более правильное поведение для сдерживания подаваемой нагрузки на внешний сервис
Реализовывать такой паттерн самостоятельно будет сложно, поэтому воспользуемся готовым решением от resilience4j
Конфигурация:
resilience4j:
rateLimiter:
instances:
parser:
timeoutDuration: 5s
limitRefreshPeriod: 1s
limitForPeriod: 10
timeoutDuration — время в течение которого поток ждет разрешения
limitRefreshPeriod — период за который ограничивается число вызовов
limitForPeriod — предельное число вызовов за время указанное в limitRefreshPeriod
Решение:
@RateLimiter(name = "parser", fallbackMethod = "fallbackMethodCompletableFuture")
public CompletableFuture parsePageWithRateLimitingResilience4jCompletableFuture() {
return CompletableFuture.supplyAsync(marketPlaceParser::parsePage, parserElasticExecutor)
.orTimeout(2, TimeUnit.SECONDS);
}
public CompletableFuture fallbackMethodCompletableFuture(Exception e) {
log.warn("Ошибка при вызове внешнего сервиса", e);
return CompletableFuture.completedFuture("fallback result");
}
Пояснения к коду:
Используем аннотацию @RateLimiter, указываем имя конфигурации и fallback метод
Передаем вызовы внешнего сервиса отдельному executorService«у с помощью метода
CompletableFuture.supplyAsync(…)
Выставляем таймаут
Объявляем fallback метод
Что мы получаем:
код выглядит лаконично
есть таймаут на вызов внешнего сервиса
возвращаем fallback результат в случае ошибок
вызовы внешнего сервиса происходят в потоках выделенного executorService«а
в ответе возвращается CompletableFuture, благодаря чему мы можем гибко управлять результатом
Таким образом, мы получаем сплошные преимущества от такого подхода, поэтому я рекомендую применять именно RateLimiter для ограничения количества вызовов внешнего сервиса
Выполнение Scheduled задач
Данное решение я разобрал в другой статье, с более подробными примерами, здесь изложу подход более кратко
По умолчанию аннотация @Scheduled использует один и тот же поток для выполнения всех методов, помеченных данной аннотацией. В связи с этим могут возникать конфликты, когда один из @Scheduled методов еще не закончил свое выполнение, и пришло время выполнять другой @Scheduled метод, в таком случае второй метод просто не будет запущен, т.к. поток занят
Для решения следует использоваться связку аннотаций @Scheduled + @Async, тогда при запуске @Scheduled задачи она будет передаваться на выполнение пулу потоков, указанному в @Async, а единственный @Scheduled поток будет отпущен
При этом чаще всего нам подойдет поведение, когда в аннотации @Async используется executorService на 1 поток, но для разных @Scheduled методов используются разные executorService’ы. Тогда мы получаем следующее поведение — каждый @Scheduled метод будет выполняться в своем потоке, при этом повторные запуски метода будут игнорироваться, если предыдущий запуск не завершил свою работу
Создадим executorService на один поток:
@Bean(destroyMethod = "shutdown")
public ExecutorService refreshCacheSingleExecutor() {
return Executors.newSingleThreadExecutor();
}
Обратите внимание. Executors.newSingleThreadExecutor () имеет важный недостаток — его очередь задач неограниченна, что может приводить к утечкам памяти, в случаях когда задачи в него поступают чаще, чем успевают выполняться
Пример правильного создания executorService«а на один поток:
@Bean(destroyMethod = "shutdown")
public ExecutorService correctSingleExecutor() {
return new ThreadPoolExecutor(
1, 1,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), new ThreadPoolExecutor.DiscardPolicy());
}
При попытке добавить новую задачу в такой executorService, она будет проигнорирована, если он занят другой задачей, поскольку мы использовали политику переполнения DiscardPolicy, которая тихо игнорирует добавление новой задачи в executorService. Если же вы хотите явно видеть ошибку о том, что пул потоков занят на момент добавления новой задачи, используйте политику AbortPolicy
Применим executorService с помощью аннотации @Async к @Scheduled задаче:
@Async("refreshCacheSingleExecutor")
@Scheduled(cron = "*/2 * * * * *")
public void refreshCache() {
// долгая операция
log.info("Кеш обновлен");
}
Таким образом мы получаем поведение, которое и ожидали:
плановая задача запускается всегда в нужное время при наличии конфликтов
и в то же время она не будет запущена, если предыдущий запуск этой же задачи не завершил свое выполнение
Параллельное обращение к разным источникам, с последующим объединением результатов
Часто встречается ситуация, когда необходимо обратиться в несколько микросервисов, полученные ответы объединить и в таком виде вернуть результат
Для наглядности допустим внешние сервисы довольно медлительны, первый возвращает ответ за 2 секунды, второй за 3 секунды
Напишем простой код с последовательным вызовом внешних сервисов:
public ProductPageResponse sequentialExecution(UUID productId) {
ProductInfo productInfo = productInfoClient.fetchProductInfo(productId);
List feedbacks = feedbacksClient.fetchFeedbacks(productId);
ProductPageResponse productPage = productMapper.toPage(productInfo, feedbacks);
log.info("Страница продукта: {}", productPage);
return productPage;
}
Пояснения к коду:
productInfoClient.fetchProductInfo(…)
— вызывает сервис получения информации по товару, выполняется 2 секундыfeedbacksClient.fetchFeedbacks(…)
— вызывает сервис получения отзывов о товаре, выполняется 3 секундыproductMapper.toPage(productInfo, feedbacks)
— объединяет ответы в один результат
Запустим код:
: Время выполнения: 6132
Результат превышает суммарное время вызовов внешних микросервисов, но мы хотим сэкономить время и распараллелить их
Перепишем данный метод с использованием CompletableFuture:
public CompletableFuture parallelExecution(UUID productId) {
CompletableFuture productInfoCompletableFuture = CompletableFuture.supplyAsync(() -> productInfoClient.fetchProductInfo(productId), productInfoElasticExecutor);
CompletableFuture> feedbacksCompletableFuture = CompletableFuture.supplyAsync(() -> feedbacksClient.fetchFeedbacks(productId), feedbacksElasticExecutor);
return CompletableFuture.allOf(productInfoCompletableFuture, feedbacksCompletableFuture)
.thenApply(ignore -> {
ProductInfo productInfo = productInfoCompletableFuture.getNow(new ProductInfo());
List feedbacks = feedbacksCompletableFuture.getNow(List.of());
ProductPageResponse productPage = productMapper.toPage(productInfo, feedbacks);
log.info("Страница продукта: {}", productPage);
return productPage;
});
}
Пояснения к коду:
Запускаем параллельные вызовы сервисов при помощи
CompletableFuture.supplyAsync(…, elasticExecutor)
— вызовы клиентов начнутся в тот же момент, когда мы создаем CompletableFutureОбъединяем созданные CompletableFuture с помощью оператора
.allOf(...)
К сожалению нет такого оператора, который бы позволял сразу получить результаты выполнения CompletableFuture после оператора
.allOf(…)
, поэтому нам приходится внутри оператора.thenApply()
вызывать методы.getNow()
у исходных CompletableFuture, поскольку они оба уже завершили свое выполнение, а значит их результаты могут быть получены немедленноЗатем объединяем результаты запросов и возвращаем ответ
Обратите внимание, при вызове
.supplyAsync(…, elasticExecutor)
без передачи executorService«а вторым параметром, мы бы запустили выполнение кода в common ForkJoinPool из CompletableFuture. Ранее я уже объяснил почему FJP не подходит для выполнения блокирующих вызовов. Поэтому следует передавать вторым параметром executorService
Запустим код:
: Время выполнения: 4297
Результат в районе 4х секунд, а значит он меньше суммарного времени обращения к внешним микросервисам, тем самым мы получили ожидаемое поведение, при этом объединив результат вызова двух сервисов
Контекст
Бывает двух типов:
Также бывает распределенный контекст (кеш) между несколькими микросервисами (redis, hazelcast), но это уже ближе к паттернам микросервисов
Контекст, ограниченный одним потоком
Контекст в рамках одного потока основывается на механизме ThreadLocal, который позволяет привязать объект к текущему потоку, тем самым данный объект будет невидим для остальных потоков
ThreadLocal
Создадим класс, который будет инкапсулировать в себе ThreadLocal:
@Component
public class ThreadContextHolder {
private final ThreadLocal
Пояснения к коду:
Создаем ThreadLocal, в который помещаем пустую HashMap, в нашем случае достаточно иметь в качестве значения строку, но обычно используется Object, чтобы хранить любые объекты
Создаем методы put и get, для более удобного использования контекста
Создадим web фильтр, в котором будем имитировать парсинг JWT:
@Component
public class ParseJwtThreadLocalFilterextends HttpFilter {
@Override
protected void doFilter(HttpServletRequest request,
HttpServletResponse response,
FilterChain chain) throws IOException, ServletException {
String authorizationHeader = request.getHeader(HttpHeaders.AUTHORIZATION);
JwtParams jwtParams = parseJwt(authorizationHeader);
ThreadContextHolder.put("username", jwtParams.getUsername());
chain.doFilter(request, response);
}
private JwtParams parseJwt(String authorizationHeader) {
// здесь должен быть парсинг JWT
}
}
Пояснения к коду:
Создаем бин — наследник HttpFilter, который будет вызываться на каждый входящий HTTP запрос
Парсим jwt из заголовка Authorization
Кладем в контекст имя пользователя
Вызываем следующий фильтр по цепочке
Теперь мы можем извлекать данные из контекста в любом месте, где используется данный поток:
@Slf4j
@RestController
@RequestMapping("/api/v1/thread-context")
public class ThreadContextController {
@GetMapping
public String getUsername() {
return "Имя пользователя из контекста потока: " + ThreadContextHolder.get("username");
}
}
Пояснения к коду:
В методе контроллера обращаемся к контексту — поскольку наш HttpFilter будет вызываться в том же потоке что и метод контроллера, мы можем извлечь нужные данные из контекста
Чтобы не писать свою обертку над ThreadLocal можно воспользоваться готовым решением — MDC контекстом
MDC контекст
Данный класс поставляется вместе с библиотеками логгирования, в Spring по умолчанию это logback
Перепишем предыдущее решение с использованием MDC
HttpFilter:
@Component
public class ParseJwtMdcFilterextends HttpFilter {
@Override
protected void doFilter(HttpServletRequest request,
HttpServletResponse response,
FilterChain chain)throws IOException, ServletException {
String authorizationHeader = request.getHeader(HttpHeaders.AUTHORIZATION);
JwtParams jwtParams = parseJwt(authorizationHeader);
MDC.put("username", jwtParams.getUsername());
chain.doFilter(request, response);
}
private JwtParams parseJwt(String authorizationHeader) {
// здесь должен быть парсинг JWT
}
}
Контроллер:
@Slf4j
@RestController
@RequestMapping("/api/v1/thread-context")
@RequiredArgsConstructor
public class MDCExampleController {
@GetMapping
public String getUsername() {
return "Имя пользователя из контекста потока: " + MDC.get("username");
}
}
Поведение будет полностью идентичным предыдущему примеру
Отличительной чертой MDC контекста является то, что его поля доступны при записи логов. Благодаря нему к логам добавляются разные параметры, например уникальный идентификатор входящего запроса для последующего поиска логов, принадлежащих одному запросу
В продемонстрированных примерах с ThreadLocal и MDC есть важный аспект — потоки томката переиспользуются для выполнения запросов разных клиентов (после выполнения запроса клиента, в этом же потоке начинает выполняться следующий запрос, т.е. типичное поведение ExecutorService«а), из-за чего в ThreadLocal контексте может оставаться информация из предыдущих запросов
Поэтому такой контекст необходимо очищать в момент завершения бизнес логики, либо перед началом его использования
Подход с контекстом в рамках одного потока очень удобен для традиционного синхронного приложения, где обработка каждого запроса клиента происходит в своем потоке. И не подходит для реактивных приложений на webflux, поскольку там разные потоки могут выполнять разные части одного запроса
Контекст, доступный из разных потоков
Возможные варианты реализации контекста:
volatile или Atomic переменная
Конкурентная коллекция — ConcurrentHashMap
Готовые реализации — Caffeine, Spring Cache
Здесь покажу только пример с ручным созданием кеша Caffeine
Работа с конкурентными коллекциями и volatile/Atomic переменными очень проста и почти не отличается от взаимодействия с обычной коллекцией или переменной
А если расписывать применение Spring Cache то статья начинает сильно уходить от основной темы
Кеш с использованием Caffeine
Импортируем библиотеку:
implementation 'com.github.ben-manes.caffeine:caffeine:3.1.8'
Решение:
@Slf4j
@Service
public class CaffeineExampleService {
private final Cache cache = Caffeine.newBuilder()
.expireAfterAccess(Duration.ofMinutes(1))
.maximumSize(100)
.build();
public String process(String request) {
String cachedResult = cache.getIfPresent(request);
if (cachedResult != null) {
log.info("Возвращаю закешированный результат");
return cachedResult;
}
MentoringUtil.sleep(1000);
String response = "test response";
cache.put(request, response);
return response;
}
}
Пояснения к коду:
Создаем кеш (данный интерфейс похож на Map), выставляем время жизни и максимальное количество записей в кеше
Реализуем простой метод с кешированием результата выполнения
Благодаря возможности выставлять время жизни и размер кеша, мы предотвращаем возможные утечки памяти
Ожидание асинхронного ответа
Иногда встречается необходимость в ожидании асинхронного ответа от внешнего сервиса, например мы отправляем запрос в один топик kafka, и хотим дождаться ответа из другого топика (прослушивание топика происходит в отдельном потоке)
В качестве решения нам потребуется хранить в контексте callback для каждого отправленного сообщения
Также есть важное условие, мы будем отправлять в запросе уникальный идентификатор, и этот же идентификатор должен быть передан внешним сервисом в ответе, чтобы мы могли сопоставить запрос и ответ
Создадим класс с контекстом:
@Service
public class KafkaMessageContext {
private final Cache> messageContext = Caffeine.newBuilder()
.expireAfterAccess(Duration.ofMinutes(1))
.maximumSize(1000)
.build();
public CompletableFuture createMessageCompletableFuture(String id) {
CompletableFuture completableFuture = new CompletableFuture<>();
messageContext.put(id, completableFuture);
return completableFuture;
}
public CompletableFuture findById(String id) {
return messageContext.getIfPresent(id);
}
public void removeById(String id) {
messageContext.invalidate(id);
}
}
Пояснения к коду:
Создаем кеш, где в качестве ключа будет выступать уникальный идентификатор, а в качестве значения callback в виде CompletableFuture, также указываем время жизни и размер кеша, чтобы предотвратить возможные утечки памяти
Добавляем метод создания callback«а, и добавления его в контекст, callback создаем через
new CompletableFuture<>()
, который позволит нам подписаться на него для ожидания ответаДобавляем методы findById и removeById
Реализуем сервис обмена сообщениями с ожиданием ответа:
@Service
@RequiredArgsConstructor
public class AwaitAsyncRequestService {
private final KafkaProducer kafkaProducer;
private final KafkaMessageContext kafkaMessageContext;
public KafkaResponse sendAndReceive(KafkaRequest request) {
CompletableFuture responseCompletableFuture = kafkaMessageContext.createMessageCompletableFuture(request.getId());
kafkaProducer.send(request);
try {
return responseCompletableFuture
.get(5000, TimeUnit.MILLISECONDS);
} catch (TimeoutException | ExecutionException | InterruptedException e) {
throw new CustomAppException(e);
} finally {
kafkaMessageContext.removeById(request.getId());
}
}
}
Пояснения к коду:
Вызываем метод создания callback«а
Выполняем отправку в kafka
Ожидаем ответ от callback«а в течение 5 секунд
Обрабатываем возможные исключения, удаляем запись из кеша в блоке finally
Создадим класс слушателя ответного топика:
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaConsumer {
private final JsonUtil jsonUtil;
private final KafkaMessageContext kafkaMessageContext;
@KafkaListener(topics = "${app.kafka.topicOut}", groupId = "test")
public void receiveResponse(ConsumerRecord consumerRecord) {
KafkaResponse response = jsonUtil.fromJson(consumerRecord.value(), KafkaResponse.class);
CompletableFuture responseCompletableFuture = kafkaMessageContext.findById(response.getId());
if (responseCompletableFuture == null) {
log.info("Получено сообщение в ответный топик для которого не найден callback: {}", response);
} else {
responseCompletableFuture.complete(response);
}
}
}
Пояснения к коду:
Слушаем исходящий топик
По идентификатору входящего сообщения пытаемся найти callback в контексте
Если callback найден, то вызываем метод complete, с этого момента продолжается выполнение кода в методе sendAndReceive класса AwaitAsyncRequestService
С помощью данного решения мы можем дождаться любого асинхронного ответа
В случае с kafka есть одна особенность, которую надо обязательно учитывать, если у вашего приложения будет больше одного экземпляра, то необходимо чтобы каждый экземпляр приложения получал все сообщения из ответного топика, поскольку есть вероятность, что ответ будет прочитан другим экземпляром и тогда ваш ответ будет потерян
Итог
В данной статье я постарался показать практические примеры, с которыми сталкивался в работе, не касаясь низкоуровневых инструментов volatile, synchronized, барьеров и др.
Представленные паттерны являются лишь кирпичиками, из которых складывается общая работа приложения
Правильное их применение требует внимательности и тщательного тестирования, обдумывайте и проверяйте возможные корнер кейсы, особенно если ваше приложение работает с повышенными нагрузками
Надеюсь вам неоднократно пригодится материал данной статьи на реальном проекте, спасибо что дочитали