Настраиваем ExecutorService'ы под свои нужды

6cfed180df96c47f034ceb2f68cae6e0.png

В этой статье хочу рассмотреть что такое ExecutorService в Java, зачем он нужен, варианты создания и в как его настраивать для решения практических задач

Понимаю что есть много подобных статей, но в тех статьях что мне удалось найти не хватало практических примеров, сегодня хочу рассказать вам то, что по-настоящему пригодиться в работе

Также я веду блог в телеграм, где вы сможете найти другие материалы от меня

ExecutorService — это пул потоков, представляет собой специальный объект, управляющий жизненным циклом потоков (Thread’ов), он позволяет ограничить максимальное число создаваемых потоков, а также закрывает их по истечении таймаута если они не выполняют задач

В чем опасность неконтроллируемого создания потоков

Что мешает руками создавать поток через new Thread(() -> {}) на каждую задачу ?

Проблема в том, что создание новых потоков в Java имеет свою цену:

  • количество потоков которые вы можете создать ограничивается системой, например на моем mac’е приложение может создать не более 4000 потоков, после чего падает с OutOfMemoryException, вне зависимости от количества выделяемой памяти (можно увеличить максимальное число потоков, но для этого потребуется специальная настройка приложения)

  • создание и запуск нового потока занимает процессорное время

Эти две проблемы решаются с помощью правильно настроенного ExecutorService’а, который позволяет нам выставить ограничение как на количество потоков в пуле, так и ограничить время их жизни при простое

Для начала рассмотрим ключевые параметры ExecutorService’ов:

  • corePoolSize — количество потоков которые всегда будут запущены в пуле, даже если они простаивают

  • maximumPoolSize — максимальное количество потоков в пуле (тут есть подводные камни, об этом расскажу дальше)

  • keepAliveTime — таймаут, по истечении которого потоки (которые не входят в число core потоков) будут завершены, если они не исполняют задачи

  • unit — единицы времени в которых мы задаем keepAliveTime

  • workQueue — очередь, в которой будут храниться Runnable задачи на исполнение потоками пула

  • threadFactory — фабрика для создания потоков (на практике никогда не пригождается)

  • RejectedExecutionHandler — политика действий при переполнении очереди задач, то есть как будет реагировать ExecutorService на добавление новой задачи, если очередь задач заполнена и все потоки заняты работой

Хотелось бы подробнее коснуться политик действия при переполнении очереди задач (RejectedExecutionHandler):

  • AbortPolicy — не позволяет добавить новую задачу, при этом выбрасывает исключение

  • DiscardPolicy — также как и AbortPolicy не позволяет добавить новую задачу, но не выбрасывает никакого исключения, т.е. делает это тихо

  • CallerRunsPolicy — поток, вызывающий добавление задачи, сам будет выполнять эту задачу

  • При желании вы можете реализовать свою политику, создав экземпляр класса RejectedExecutionHandler (например чтобы добавить собственную метрику переполнения очереди задач)

Важно! При самостоятельном создании ExecutorService планировщик будет вести себя не так как мы того хотим. Мы полагаем что когда все core потоки будут заняты, то в пуле будут создаваться дополнительные потоки до достижения значения maximumPoolSize, но дополнительные потоки начнут создаваться только после заполнения очереди

Об этом сказано в javadoc:

0ac3b257096f63ec8e36ecc25a2d8ed2.png

Чтобы сервис сначала создавал потоки, и только потом ставил новые задачи в очередь, нужно пойти обходным путем — задать количество core потоков равным максимальному, после чего включить специальный флаг allowCoreThreadTimeOut, который позволяет завершать core потоки по истечении таймаута, тем самым мы получим ожидаемое поведение — эластичный пул с ограниченным размером, и очередью, которая будет заполняться только по достижении максимального количества потоков в пуле

Пример самостоятельной настройки:

@Bean
public ExecutorService myExecutorService() {
    BlockingQueue queue = new ArrayBlockingQueue<>(1000);  
    
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1000, 1000, 60L, TimeUnit.SECONDS, queue, new ThreadPoolExecutor.AbortPolicy());
    
    threadPoolExecutor.allowCoreThreadTimeOut(true);
    
    return threadPoolExecutor;
}

Фабрика Executors

Класс Executors позволяет нам создавать готовые варианты планировщиков:

  • Cachable — эластичный пул, в котором потоки добавляются по мере появления новых задач, и завершаются при простое. Не имеет ограничения на количество создаваемых потоков

  • FixedThreadPool — создает фиксированное число потоков, количество core потоков равно максимальному количеству потоков, политика переполнения AbortPolicy

  • Single — это FixedThreadPool с одним потоком

  • Scheduled — пул с возможность запланировать выполнение задачи по таймеру, не имеет ограничения по количеству создаваемых потоков, реализуется за счет специальной блокирующей очереди с отложенным взятием задач в работу

  • WorkStealing — это ForkJoinPool, создает количество потоков равное количеству ядер процессора, не гарантирует порядок выполнения задач (используется по умолчанию в .parallelStream () и CompletableFuture)

  • Virtual — это нововведение Java 21, называется Project Loom, данный планировщик позволяет выполнять одновременно невероятно большое количество задач, допустим вы можете параллельно запустить миллиард задач, при этом использует всего несколько системных потоков

Когда нужны Executors планировщики:

  • Cachable — не рекомендую его использовать на коммерческих проектах, т.к. не имеет ограничения максимального количества потоков, что может привести к возникновению OutOfMemoryException под высокой нагрузкой

  • WorkStealing — из-за непредсказуемого порядка выполнения задач, его стоит использовать только для параллельных вычислений, где важна скорость, а не порядок, при этом важно чтобы данные потоки не выполняли блокирующих операций

  • FixedThreadPool — пригождается, когда мы хотим ограничить параллельное выполнение операций, однако из-за политики AbortPolicy с ним нужно быть осторожным, т.к. при переполнении наша задача не выполнится

  • Single — хорошо сочетается с выполнением задач по времени, когда мы хотим чтобы одновременно выполнялась только одна задача (скажем формирование аналитического отчета, которое может продолжаться очень долго) и мы не хотим чтобы в это время была запущена еще одна такая задача, однако учитывайте, что из-за AbortPolicy в логи будут попадать ошибки

  • Scheduled — создан для специфических типов задач, например когда мы хотим ограничить время выполнения цепочки CompletableFuture, это можно сделать с помощью вызова оператора .completeExceptionally () в отдельной задаче, которая будет выполнена по истечении заданного времени, пример кода можно найти здесь

  • Virtual — хороший вариант для большинства задач, поскольку крайне низка вероятность переполнить память, однако на текущий момент имеет определенные ограничения, его стоит рассмотреть в отдельной статье

Таким образом, большинство планировщиков фабрики Executors имеют либо узкую область применения, либо обладают недостатками и зачастую их лучше не использовать на коммерческих проектах

В таком случае нам необходимо самим настраивать планировщики под каждую ситуацию

Практические примеры собственной настройки

Поскольку большинство коммерческих бэкенд приложений работают по принципу перекладывания данных из одного места в другое (из http запроса в бд, из бд в другой сервис и т.д.), то есть две ключевые ситуации, когда требуется использование пула потоков:

Также популярной ситуацией является выполнение задач по времени:

Реже встречается ситуация на:

Сразу хочу отметить, что если ваше приложение не работает с высокими нагрузками, и не должно обеспечивать 100% отказоустойчивость, то и смысла в использовании планировщиков нет

Передача обработки входящих сообщений выделенному планировщику

Важно понимать, что библиотеки для работы с очередями сообщений (например с kafka) и контейнер сервлетов (например tomcat) имеют встроенные пулы потоков для обработки входящих запросов

Чтение из MQ (Message Queue)

Пул потоков, читающих из очереди сообщений, как правило, сильно имеет маленьких размер

Данный встроенный пул можно сконфигурировать, увеличить количество потоков, но тогда возникает ситуация, что если все они будут заняты, то некому будет читать сообщения из mq, а также очередь будет ждать acknowledment’а, что может влиять на производительность

Лучше разделять логику вычитывания сообщений из очереди и последующей их обработки

Для такого разделения создадим свой планировщик:

@Bean
public ExecutorService kafkaExecutor() {
	BlockingQueue queue = new ArrayBlockingQueue<>(500);  
    
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(100, 100, 60L, TimeUnit.SECONDS, queue, new ThreadPoolExecutor.CallerRunsPolicy());
    
    threadPoolExecutor.allowCoreThreadTimeOut(true);
    
    return threadPoolExecutor;
}

Здесь мы создаем пул с максимальным размером в 100 потоков, и очередью в 500 задач, также задана политика переполнения CallerRunsPolicy, при которой вызывающий поток сам будет выполнять. То есть если все потоки пула будут заняты работой и очередь задач будет полностью заполнена, то поток, читающий сообщения из mq сам выполнит обработку

Теперь сам метод:

@Autowired
private ExecutorService kafkaExecutor;

@KafkaListener(id = "foo", topics = "myTopic")
public void processMessage(String data) {
	// save message to db

	CompletableFuture.runAsync(() -> {  
            doSomeLogic();  
        },  
        kafkaExecutor);
}

Обратите внимание, что acknowledgment в mq выполнится сразу после передачи обработки планировщику в методе processMessage (), т.е. сообщение будет помечено как прочитанное в очереди сообщений и повторно вычитано не будет

Поэтому вам на стороне вашего приложения необходимо сохранять данные о вычитанных сообщениях, чтобы потом их обработать отдельно в случае падения приложения, но это уже тема для отдельного материала

Получение REST запросов

Большая часть Java web приложений используют для работы Spring WebMVC, где в качестве контейнера сервлетов применяется Tomcat

Tomcat по умолчанию использует пул на 200 потоков для обработки входящих запросов

Сконфигурировать размер пула можно при помощи настройки:

server:    
  tomcat:  
    threads:  
      max: 1000

Здесь вам не понадобиться создавать отдельный планировщик, достаточно вышеуказанной настройки, поскольку данный пул уже является эластичным и завершает потоки спустя минуту бездействия

Если же вы хотите более гибко управлять своими запросами, например чтобы часть запросов выполнялась в одном планировщике, а другая часть в другом, то здесь можно создать свои пулы, правда на практике с такими ситуациями сталкиваться не приходилось

Если все же решитесь использовать отдельный пул для исполнения запросов, то используйте CallerRunsPolicy

Параллельное выполнение запросов к разным сервисам

Довольно часто нам приходится получать данные из нескольких микросервисов, и затем их агрегировать

Лучше всего для решения таких задач подходят реактивные цепочки из webflux или rxjava, но здесь мы их не рассматриваем

Можно воспользоваться решением через CompletableFuture:

Hidden text

@Bean
public ExecutorService clientOneExecutor() {
	BlockingQueue queue = new ArrayBlockingQueue<>(100);  
    
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(100, 100, 60L, TimeUnit.SECONDS, queue, new ThreadPoolExecutor.CallerRunsPolicy());
    
    threadPoolExecutor.allowCoreThreadTimeOut(true);
    
    return threadPoolExecutor;
}

@Bean
public ExecutorService clientTwoExecutor() {
	BlockingQueue queue = new ArrayBlockingQueue<>(100);  
    
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(100, 100, 60L, TimeUnit.SECONDS, queue, new ThreadPoolExecutor.CallerRunsPolicy());
    
    threadPoolExecutor.allowCoreThreadTimeOut(true);
    
    return threadPoolExecutor;
}

Класс клиента:

@Async("clientOneExecutor")
public CompletableFuture sendMsOneRequest() {
    // ...
}

@Async("clientTwoExecutor")
public CompletableFuture sendMsTwoRequest() {
    // ...
}

Одновременный вызов клиентов и объединение результатов:

public CompletableFuture simultaneousCalls() {
    CompletableFuture requestOne = client.sendMsOneRequest();
    CompletableFuture requestTwo = client.sendMsTwoRequest();

    return CompletableFuture.allOf(requestOne, requestTwo)
            .thenApply(ignore -> {
                ResponseOne responseOne = requestOne.getNow(new ResponseOne());
                ResponseTwo responseTwo = requestTwo.getNow(new ResponseTwo());

                return combine(responseOne, responseTwo);
            });
}

private CombinedResult combine(ResponseOne responseOne, ResponseTwo responseTwo) {
    // ...
}

Выполнение частых Scheduled задач

В своей предыдущей статье я описал как делегировать выполнение запланированных (Scheduled) задач самостоятельно созданному планировщику, а здесь расскажу какой планировщик лучше использовать

Если ваши Scheduled задачи выполняются раз в несколько часов или даже реже, то вам вряд ли стоит беспокоиться о том что планировщик Scheduled задач переполнится, поэтому можно использовать любой ExecutorService

Но предположим ваши задачи вызываются часто, скажем раз в минуту необходимо искать в бд записи о новых уведомлениях и рассылать их клиентам. Представим что с нашей логикой что-то произошло и выполнение такой задачи вместо 1 минуты станет занимать 2. Если мы используем Cachable Executor, то спустя некоторое время сервис исчерпает доступные для создания потоки и упадёт с OutOfMemoryException

Для предотвращения данной ситуации подойдет Executors.newSingle () планировщик, но из-за AbortPolicy он будет каждый раз бросать в логи ошибку, если при попытке запуска обнаружится что предыдущая задача еще в работе

В нашем примере нам такие ошибки в логах не нужны, поэтому сконфигурируем свой планировщик с одним потоком:

@Bean
public Executor jobExecutor() {      
    return new ThreadPoolExecutor(1, 1, 1L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), new ThreadPoolExecutor.DiscardPolicy());  
}  

@Async("jobExecutor")
@Scheduled(cron = "0 */1 * * * *")
public void everyMinute() {
	...
}

Теперь, благодаря DiscardPolicy ошибок в логах не появится, и вся работа будет выполняться всегда в одном потоке

Параллельные вычисления

Поскольку вычисления требуют эффективного использования процессорного времени, то нет смысла в создании пула с большим количеством потоков, их должно быть столько же сколько ядер у процессора, здесь важным становится отсутствие простоев в потоках

Для решения данной проблемы используется Executors.newWorkStealingPool ()

Примером параллельных вычислений могут послужить любые Stream API цепочки преобразований, если например происходит обработка большой коллекции, то использование .parallelStream сильно увеличит скорость работы

Однако надо понимать, что внутри данной Stream цепочки не должно быть блокирующих вызовов (обращений в бд, rest вызовов, записи в mq, работы с файлами), иначе ваши малочисленные потоки будут простаивать и замедлять обработку задач

Вывод

Надеюсь что по итогу прочтения статьи у вас появилось понимание как работать с пулами потоков в Java, а также как они создаются и в каких ситуациях применяются

Мною были охвачены только наиболее популярные сценарии использования ExecutorService’ов, но есть еще много частных случаев когда они могут потребоваться, приводите свои примеры в комментариях

В заключение хочу сказать, что главное это понимание того, как должны работать потоки в вашем приложении, тогда вы сможете решить стоит ли вам самостоятельно настроить планировщик, использовать фабрику или можно вовсе обойтись без них

© Habrahabr.ru