Stream API & ForkJoinPool

Продолжаем серию полезностей, которыми мы делимся с вами. Теперь уже вновь по Java.

Если вы уже знакомы со Stream API и использовали его, то знаете, что это удобный способ обработки данных. С помощью различных встроенных операций, таких как map, filter, sort и других можно преобразовать входящие данные и получить результат. До появления стримов разработчик был вынужден императивно описывать процесс обработки, то есть создавать цикл for по элементам, затем сравнивать, анализировать и сортировать при необходимости. Stream API позволяет декларативно описать, что требуется получить без необходимости описывать, как это делать. Чем-то это напоминает SQL при работе с базами данных.

59ca65b147d0e843908361.jpeg

Стримы сделали Java-код компактнее и читаемее. Еще одной идеей при создании Stream API было предоставить разработчику простой способ распараллеливания задач, чтобы можно было получить выигрыш в производительности на многоядерных машинах. При этом нужно было избежать сложности, присущей многопоточному программированию. И это удалось сделать, в Stream API есть методы BaseStream: parallel и Collection.parallelStream (), которые возвращают параллельный стрим.

То есть, если у нас был код:

Collection.stream().operation()


то его легко распараллелить, если изменить один вызов

Collection.parallelStream().operation()


либо в общем случае для произвольного stream:

Source.stream().parallel().operation()


Как и за всяким простым API, за parallelStream () скрывается сложный механизм распараллеливания операций. И разработчику придется столкнуться с тем, что использование параллельного стрима может не улучшить производительность, а даже ухудшить её, поэтому важно понимать, что происходит за вызовом parallelStream (). Есть статья Doug Lea о том, в каких случаях использование параллельных стримов даст положительный эффект. Следует обратить внимание на следующие факторы:

F — операция, которая будет применяться к каждому элементу стрима. Она должна быть независимой — то есть не оказывает влияние на другие элементы, кроме текущего и не зависит от других элементов (stateless non-interfering function)

S — источник данных (коллекция) эффективно разделима (efficiently splittable). Например, ArrayList — это эффективно разделимый источник, легко вычислить индексы и интервалы, которые можно обрабатывать параллельно. Также эффективно обрабатывать HashMap. BlockingQueue, LinkedList и большинство IO-источников это плохие кандидаты для параллельной обработки.

Оценка преимущества параллельной обработки. На современных машинах имеет смысл распараллеливать задачи, время выполнения которых превышает 100 микросекунд.

Таким образом, прежде чем использовать этот инструмент, нужно понять, насколько ваша задача укладывается в описанные ограничения.

Экспериментируя с parallel () наткнулись ещё на один интересный момент, связанный с текущей реализацией. Parallel () пытается исполнять ваш код в несколько потоков и становится интересно, кто эти потоки создаёт и как ими управляет.

Попробуем запустить такой код:

void parallel() {
  int result = IntStream.range(0, 3)
       .parallel()
       .peek(it -> System.out.printf("Thread [%s] peek: %d\n", Thread.currentThread().getName(), it))
       .sum();
  System.out.println("sum: " + result);
}


Thread [ForkJoinPool.commonPool-worker-1] peek: 0
Thread [main] peek: 1
Thread [ForkJoinPool.commonPool-worker-0] peek: 2
sum: 3


Уже интересно, оказывается, по умолчанию parallel stream используют ForkJoinPool.commonPool. Этот пул создается статически, то есть при первом обращении к ForkJoinPool, он не реагирует на shutdown ()/shutdownNow () и живет, пока не будет вызван System: exit. Если задачам не указывать конкретный пул, то они будут исполняться в рамках commonPool.

Попробуем выяснить, каков же размер commonPool и посмотрим в исходники jdk1.8.0_111. Для читаемости убраны некоторые вызовы, которые не относятся к parallelism.

ForkJoinPool::makeCommonPool

private static ForkJoinPool makeCommonPool() {
   int parallelism = -1;
   try {  // ignore exceptions in accessing/parsing properties
       String pp = System.getProperty
           ("java.util.concurrent.ForkJoinPool.common.parallelism");
           if (pp != null)
             parallelism = Integer.parseInt(pp);
      } catch (Exception ignore) {
   }
   if (parallelism < 0 && // default 1 less than #cores
       (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
       parallelism = 1;
   if (parallelism > MAX_CAP)
       parallelism = MAX_CAP;
   return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                           "ForkJoinPool.commonPool-worker-");
}


Из того же класса константа:

static final int MAX_CAP      = 0x7fff;        // max #workers - 1

Нас интересует parallelism, который отвечает за количество воркеров в пуле. По-умолчанию, размер пула равен Runtime.getRuntime ().availableProcessors () — 1, то есть на 1 меньше, чем количество доступных ядер. Когда вы создаете кастомный FJPool, то можно установить желаемый уровень параллелизма через конструктор. А для commonPool можно задать уровень через параметры JVM:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=n


Сверху свойство ограничено числом 32767 (0×7fff);

Это может быть полезно, если вы не хотите отдавать все ядра под задачи ForkJoinPool, возможно, ваше приложение в обычном режиме утилизирует 4 из 8 CPU, тогда имеет смысл отдать под FJ оставшиеся 4 ядра.

Появляется вопрос, почему количество воркеров на 1 меньше количества ядер. Ответ можно увидеть в документации к ForkJoinPool.java:

When external threads submit to the common pool, they can
* perform subtask processing (see externalHelpComplete and
* related methods) upon joins. This caller-helps policy makes it
* sensible to set common pool parallelism level to one (or more)
* less than the total number of available cores, or even zero for
* pure caller-runs

То есть, когда некий тред отправляет задачу в common pool, то пул может использовать вызывающий тред (caller-thread) в качестве воркера. Вот почему в выводе программы мы видели main! Разгадка найдена, ForkJoinPool пытается загрузить своими задачами и вызывающий тред. В коде выше это main, но если вызовем код из другого треда, то увидим, что это работает и для произвольного потока:

Thread t = new Thread(() -> {
   parallel();
}, "MyThread");

t.start();
t.join();


Thread [ForkJoinPool.commonPool-worker-1] peek: 0
Thread [MyThread] peek: 1
Thread [ForkJoinPool.commonPool-worker-0] peek: 2
sum: 3


Теперь мы знаем немного больше об устройстве ForkJoinPool и parallel stream. Оказывается, что количество воркеров parallel stream ограничено и эти воркеры общего назначения, то есть могут быть использованы любыми другими задачами, которые запускаются на commonPool. Попробуем понять, чем это чревато для нас при разработке.

Рассмотрим следующий код. Для наглядности запускаем с -Djava.util.concurrent.ForkJoinPool.common.parallelism=2, то есть для FJPool.commonPool доступны 2 воркера и вызывающий поток.

final long ms = System.currentTimeMillis();
ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println("Parallelism: " + commonPool.getParallelism());
IntStream.range(0, commonPool.getParallelism() + 1).forEach((it) -> commonPool.submit(() -> {
   try {
       System.out.printf("[%d sec] [%s]: #%d start()\n",
               TimeUnit.SECONDS.convert(System.currentTimeMillis() - ms, TimeUnit.MILLISECONDS),
               Thread.currentThread().getName(), it);
       TimeUnit.SECONDS.sleep(5);
   } catch (Exception e) {e.printStackTrace();}
   System.out.printf("[%d sec] [%s]: #%d finish()\n", TimeUnit.SECONDS.convert(System.currentTimeMillis() - ms, TimeUnit.MILLISECONDS), Thread.currentThread().getName(), it);
}));

int result = IntStream.range(0, 3)
       .parallel()
       .peek(it -> System.out.printf("Thread [%s] peek: %d\n", Thread.currentThread().getName(), it))
       .sum();
System.out.println("sum: " + result);
commonPool.awaitTermination(100, TimeUnit.SECONDS);


Parallelism: 2
[0 sec] [ForkJoinPool.commonPool-worker-1]: #0 start()
Thread [main] peek: 1
[0 sec] [ForkJoinPool.commonPool-worker-0]: #1 start()
Thread [main] peek: 2
Thread [main] peek: 0
sum: 3
[0 sec] [main]: #2 start()
[5 sec] [ForkJoinPool.commonPool-worker-0]: #1 finish()
[5 sec] [ForkJoinPool.commonPool-worker-1]: #0 finish()
[5 sec] [main]: #2 finish()


В коде происходит следующее: мы пытаемся полностью занять пул, отправив туда parallelism + 1 задачу (то есть 3 штуки в данном случае). После этого запускаем параллельную обработку стрима из первого примера. По логам видно, что parallel стрим исполняется в один поток, так как все ресурсы пула исчерпаны. Не зная о такой особенности будет сложно понять, если в вашей программе вырастет время обработки какого то запроса через BaseStream: parallel.

Что же делать, если вы хотите быть уверены, что ваш код действительно будет распараллелен? Есть решение, нужно запустить parallel () на кастомном пуле, для этого нам придётся немного модифицировать код из примера выше и запустить код обработки данных, как Runnable на кастомном FJPool:

ForkJoinPool custom = new ForkJoinPool(2);
custom.submit(() -> {
           int result = IntStream.range(0, 3)
                   .parallel()
                   .peek(it -> System.out.printf("Thread [%s] peek: %d\n", Thread.currentThread().getName(), it))
                   .sum();
           System.out.println("sum: " + result);
       });


Parallelism: 2
[0 sec] [ForkJoinPool.commonPool-worker-1]: #0 start()
Thread [ForkJoinPool-1-worker-0] peek: 0
Thread [ForkJoinPool-1-worker-1] peek: 1
[0 sec] [main]: #2 start()
[0 sec] [ForkJoinPool.commonPool-worker-0]: #1 start()
Thread [ForkJoinPool-1-worker-0] peek: 2
sum: 3
[5 sec] [ForkJoinPool.commonPool-worker-1]: #0 finish()
[5 sec] [ForkJoinPool.commonPool-worker-0]: #1 finish()
[5 sec] [main]: #2 finish()


Окей, теперь мы добились своей цели и уверены, что наши вычисления под контролем и никто не может повлиять на них со стороны.

Прежде чем применять любой, даже самый простой инструмент необходимо выяснить его особенности и ограничения. Для parallel stream таких особенностей много и необходимо учитывать, насколько ваша задача подходит для распараллеливания. Parallel stream хорошо работают, если операции независимы и не хранят состояние, источник данных может быть легко разделен на сегменты для параллельной обработки и задачу действительно имеет смысл выполнять параллельно. Помимо этого нужно учесть особенности реализации и убедиться, что для важных вычислений вы используете отдельный пул потоков, а не делите с общим пулом приложения.

Вопросы и предложения, как всегда приветствуются, т.к. это является частью нашего курса по Java и нам интересно мнение по материалу.

© Habrahabr.ru