Польза создания однородных задач для параллельного вычисления
Как правильно использовать возможности параллельного программирования?
Зачем программистам математика и зачем знать алгоритмы?
Представьте что у вас есть 10 задач. Каждая пронумерована от 1 до 10, а так же каждая задача выполняется секунд, равным номеру задачи. 1я задача — 1 секунда, 10я задача — 10 секунд — она самая «тяжелая».
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Выполняя задачи синхронно, мы получим выполнение всех за 55 секунд. Как мы узнали это число? Есть несколько способов.
Сложив все вместе (1+2+3…+10). Можно использовать цикл fori. Еще это называется «Сумма арифметической прогрессии».
Использовать формулу (n * (n+1)) / 2 (Это к вопросу, «зачем нам нужна математика?»). Она же с побитовым сдвигом (n * (n+1)) >> 1
Так же мы можем заметить, что если сложить первое и последнее числа, мы получим 11. 1 + 10 = 11. Далее 2+9 тоже будет 11, и так до 5+6=11. То есть 11+11+11+11+11 = 11*(10/2) = 11×5 = 55.
Последнее — это одно из решений на Leetcode для нескольких задач, где мы используем алгоритм под названием «Two pointers» или «Два указателя».
Используем многопоточность, что бы ускорить выполнение нашей задачи. У нас есть 5 ядер по одному потоку на каждый. Каждая задача изолирована друг от друга и мы имеем три подхода к решению данной задачи.
Поделим количество задач на количество ядер, и выполним задачи в том порядке, в котором они расположены сейчас. То есть мы получим по 2 задачи на ядро, которые будут выполняться последовательно.
Переиспользуем наши потоки. Освободившиеся потоки будут брать свободные задачи так же расположенные по порядку (Аналог пула).
Как и в первом варианте, поделим задачи между ядрами, но теперь одинаково распределим сложность задач для каждого ядра, используя алгоритм «Two pointers»(Это к вопросу «Зачем программистам знать алгоритмы?»).
Итак, к результатам!
Результаты будут под теми же номерами, что и решения:
Мы поделили задачи между ядрами последовательно и получилось, что на последнем ядре выполняются самые сложные задачи по 9 и 10 секунд, что в сумме 19. Остальные ядра уже закончили свои задачи, и как итог — это решение равняется сумме самых сложных задач на ядро — 19 секунд.
Здесь 5е ядро, после работы в 5 секунд, заберет себе задачу на 10 секунд, итого, задача займет 15 секунд. Неплохо.
5 ядер будут выполнять задачи по 11 секунд (10 + 1, 9 + 2, 8 + 3, 7 + 4, 6 + 5). Как итог — все выполнение займет 11 секунд. Лучший результат!
Очень важная мысль из книги »Java concurrency in action»!
«Реальная отдача от разделения рабочей нагрузки программы на задачи достигается при наличии большого числа независимых, однородных задач, которые могут обрабатываться конкурентно.»
Обычно мы не имеем дел с «Суммой арифметической прогрессии», где все задачи имеют уникальное время последовательной продолжительности, а, вероятнее, имеем задачи повторяющейся продолжительности. Эти задачи будут так же отсортированы по сложности выполнения.
[1, 3, 4, 4, 6, 7, 7, 8, 9, 10]
Здесь нам не помогут предыдущие формулы, но общее количество времени на последовательное исполнение мы посчитать можем. 59 секунд.
Результаты в каждом из подходов:
19 секунд.
16 секунд.
13 секунд.
Опять побеждает решение с равномерным распределением задач по сложности выполнения.
Давайте выразим это в коде. Для удобства я так же использую библиотеку Lombok.
Представьте. Мы работаем в инвестиционной компании, которая недавно открылась. Нам присылают большой пакет необработанных данных, в котором содержатся инвесторы и инвестиции, которые они инвестировали в разные компании в течении последних 10 лет.
Наша задача состоит в том, что бы получить эти данные и отправить их обрабатываться в другой сервис со своей бизнес логикой. Сейчас, возможно, звучит непонятно, но дальше все станет ясно.
Оставляю ссылку на гитхаб со всем написанным кодом. Так что если не хочется все писать самостоятельно, можете просто скачать здесь.
Для начала создадим три модели данных. Investor, Investment, ExecutionMethod. Последняя нужна просто для удобства.
@Data
public class ExecutionMethod {
private String name;
private long time;
private String description;
public ExecutionMethod(String name, String description) {
this.name = name;
this.description = description;
}
public void represent() {
System.out.println(name + ". " + description); // TODO Delete
}
public void showExecutionTime() {
System.out.println(name + " " + time + " ms."); // TODO Delete
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Investor {
private String name;
private List investments;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Investment {
private String from;
private String to;
private Long price;
}
Так же нам понадобятся два класса помощника. BusinessLogic (будет симулировать бизнес-логику) и MyPhaser (Поможет нам с синхронизацией).
public class BusinessLogic {
public static void handle(List investors) {
try {
System.out.println("Инвесторов: " + investors.size() + " с порядком инвестиций: " + investors.stream().map(x -> x.getInvestments().size()).collect(Collectors.toList())); // TODO Delete
for (Investor investor : investors)
for (int j = 0; j < investor.getInvestments().size(); j++)
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
public static List getUnhandledInvestors(int investorsNum, int investmentNum) {
List investors = new ArrayList<>();
for (int i = 0; i < investorsNum; i++) {
List investments = new ArrayList<>();
Investor investor = new Investor("Some Investor", investments);
int investmentsNumber = (int) ((Math.random()) * investmentNum);
for (int j = 0; j < investmentsNumber; j++)
investments.add(new Investment("Good company", "somebody", 1000L));
investors.add(investor);
}
return investors;
}
}
public class MyPhaser {
private static Phaser phaser = new Phaser();
public static void awaitAdvance() {
phaser.arriveAndAwaitAdvance();
}
public static void deregister() {
phaser.arriveAndDeregister();
}
public static void register() {
if (phaser.isTerminated())
phaser = new Phaser(0);
phaser.register();
}
}
Класс бизнес-логики будет отдавать нам необработанные данные об инвесторах методом getUnhandledInvestors (…), которые мы должны отправить на обработку в метод handle (…).
В методе handle (…) мы сделали задержку 1 мс для симуляции какой-либо деятельности.
Для начала попробуем последовательный метод. У нас будет 1024 инвестора, которые совершили от 0 до 120 инвестиций за 10 последних лет. (120 — это 12 месяцев на 10 лет. По одному платежу в месяц.)
Использование фазера, на данный момент, не имеет какой-либо практической ценности, но и не мешает выполнению. Он понадобится нам позже, потому добавим сразу.
public static void main(String[] args) {
List investors = BusinessLogic.getUnhandledInvestors(1024, 120);
MyPhaser.register();
ExecutionMethod syncHandle = new ExecutionMethod("Последовательное выполнение со случайным порядком инвестиций", "Пример: [4,1,10,6,2,9,3,8,7,5]");
syncHandle.represent();
syncHandle.setTime(syncHandle(investors));
MyPhaser.deregister();
}
private static long syncHandle(List investors) {
long start = System.currentTimeMillis(); // TODO Delete
BusinessLogic.handle(investors);
long end = System.currentTimeMillis(); // TODO Delete
System.out.println("Result: " + (end - start) + " ms\n"); // TODO Delete
return (end - start);
}
Не слишком хороший результат. Если возрастет количество инвесторов или инвестиций, ждать придется еще дольше.
Мы знаем что инвесторы и инвестиции изолированы от других инвесторов и инвестиций. Они не связаны друг с другом и никак не пересекаются.
Потому применим ForkJoin Framework для того, чтобы достичь параллельности в наших вычислениях. Главным отличием ForkJoin от многопоточного выполнения является то, что он задействует ядра компьютера, для распараллеливания и распределения задач по ядрам, в то время как многопоточный подход использует метод квантования времени выполнения, а то есть полезное использование простоев одних потоков другими.
Для начала создадим менеджера, который будет управлять нашими задачами. На моем компьютере больше 5 ядер, потому я выбрал 5. Узнать количество процессоров вы можете методом Runtime.getRuntime ().availableProcessors ().
public class TaskManager {
private final List tasks;
private final int CORES = 5;
private final ForkJoinPool pool = new ForkJoinPool(CORES);
public TaskManager(List tasks) {
this.tasks = new ArrayList<>(tasks);
}
public void orderedExec(int thresholdSize, int parts) {
MyPhaser.register();
pool.submit(new AsyncOrderedTask(tasks, Math.max(100, thresholdSize), parts));
}
}
В методе orderedExec (…) параметр thresholdSize — отвечает за минимальное количество инвесторов для последовательной обработки.
Параметр parts — показывает на сколько частей нужно делить коллекцию инвесторов.
Создадим задачу, которая будет разбивать инвесторов на одинаковые группы, и отправлять их выполнять нашу бизнес-логику.
public class AsyncOrderedTask extends RecursiveAction {
private final List investors;
private final int threshold;
private final int parts;
public AsyncOrderedTask(List investors, int threshold, int parts) {
this.investors = investors;
this.threshold = threshold;
this.parts = parts;
}
@Override
protected void compute() {
try {
if (investors.size() < threshold)
someBusinessLogic();
else
divide();
} finally {
MyPhaser.deregister();
}
}
private void someBusinessLogic() {
BusinessLogic.handle(investors);
}
private void divide() {
int size = investors.size(), remains = size % parts, step = size / parts + remains;
System.out.printf("All size = %d | Step = %d | Parts = %d\n", size, step, parts); // TODO Delete
for (int i = 0; i < size; i += step) {
MyPhaser.register();
int max = Math.min(size, i + step);
new AsyncOrderedTask(investors.subList(i, max), threshold, parts).fork();
}
}
}
Разберем метод divide (). Для начала мы вычисляем шаг цикла (step). В цикле fori мы регистрируем нового участника фазера и создаем задачу на выполнение.
remains — это остаток от деления, для равномерного распределения значений по ядрам. К примеру 1000 на 5 мы легко поделим, получится по 200 инвесторов на ядро.
Тем не менее 1024 инвесторов мы без остатка поделить не сможем.
Если мы не учтем остаток, то после выполнения первым ядром своей задачи в 204 инвестора, оно возьмет еще 4х остаточных инвесторов на выполнение.
Разбивка без учета остатка будет выглядеть следующим образом:
1 ядро. 204 инвестора
2 ядро. 204 инвестора
3 ядро. 204 инвестора
4 ядро. 204 инвестора
5 ядро. 204 инвестора
1 ядро. 4 инвестора
С учетом же остатка, ядра отработают в следующем порядке.
1 ядро. 208 инвестора
2 ядро. 208 инвестора
3 ядро. 208 инвестора
4 ядро. 208 инвестора
5 ядро. 192 инвестора
Гораздо лучше! Так же вы могли заметить строку «int max = Math.min (size, i + step);». Это для того, что бы не выйти за границы коллекции.»5 ядро. 192 инвестора» как раз нам это демонстрирует.
С разбором закончили, переходим к тестам!
Далее мы больше не будем проверять последовательный подход, потому просто его уберем.
public static void main(String[] args) {
List investors = BusinessLogic.getUnhandledInvestors(1024, 120);
MyPhaser.register();
ExecutionMethod commonMethodRandom = new ExecutionMethod("Метод со случайным порядком инвестиций (Практически равномерное распределение)", "Пример: [4,1,10,6,2,9,3,8,7,5]");
commonMethodRandom.represent();
commonMethodRandom.setTime(commonMethod(investors, 5));
MyPhaser.deregister();
}
private static long commonMethod(List investors, int parts) {
TaskManager manager = new TaskManager(investors);
long start = System.currentTimeMillis();
manager.orderedExec(investors.size() / 4, parts);
MyPhaser.awaitAdvance();
long end = System.currentTimeMillis();
System.out.println("Result: " + (end - start) + " ms\n"); // TODO Delete
return (end - start);
}
Совсем не плохо! Благодаря случайной последовательности мы получаем практически равномерно-распределенное количество инвестиций. Давайте посмотрим, что будет, если мы получим последовательные данные. Для этого добавим в метод main некоторый код.
public static void main(String[] args) {
List investors = BusinessLogic.getUnhandledInvestors(1024, 120);
MyPhaser.register();
ExecutionMethod commonMethodRandom = new ExecutionMethod("Метод со случайным порядком инвестиций (Практически равномерное распределение)", "Пример: [4,1,10,6,2,9,3,8,7,5]");
ExecutionMethod commonMethodOrdered = new ExecutionMethod("Метод с отсортированным порядком инвестиций (Худший вариант)", "Пример: [1,2,3,4,5,6,7,8,9,10]");
commonMethodRandom.represent();
commonMethodRandom.setTime(commonMethod(investors, 5));
investors.sort(Comparator.comparingInt(dividedStep -> dividedStep.getInvestments().size()));
commonMethodOrdered.represent();
commonMethodOrdered.setTime(commonMethod(investors, 5));
MyPhaser.deregister();
}
Уже не так весело, не правда ли?
Вот здесь нам и пригодится знание алгоритмов. В частности мы будем использовать алгоритм «Two pointers».
Создадим задачу, которая будет использовать данный алгоритм для разбития инвесторов на примерно однородные группы.
public class AsyncTwoPointerTask extends RecursiveAction {
private final List investors;
private final int threshold;
private final int parts;
private final boolean asPool;
public AsyncTwoPointerTask(List investors, int threshold, int parts, boolean asPool) {
this.investors = investors;
this.threshold = threshold;
this.parts = parts;
this.asPool = asPool;
}
@Override
protected void compute() {
try {
if (investors.size() < threshold)
someBusinessLogic();
else
divide();
} finally {
MyPhaser.deregister();
}
}
private void someBusinessLogic() {
BusinessLogic.handle(investors);
}
private void divide() {
int size = investors.size(), remains = size % parts, step = size / parts, dividedStep = step / 2, leftEnd = 0, leftStart, rightStart = size, rightEnd;
System.out.printf("All size = %d | Step = %d | Parts = %d\n", size, asPool ? step / 2 : step, asPool ? parts * 2 : parts); // TODO Delete
while (leftEnd < rightStart) {
MyPhaser.register();
leftStart = leftEnd;
rightEnd = rightStart;
leftEnd += Math.min(rightEnd - leftStart - dividedStep, dividedStep + remains);
rightStart -= dividedStep;
if (asPool) {
MyPhaser.register();
new AsyncTwoPointerTask(investors.subList(leftStart, leftEnd), threshold, parts, asPool).fork();
new AsyncTwoPointerTask(investors.subList(rightStart, rightEnd), threshold, parts, asPool).fork();
} else
new AsyncTwoPointerTask(
Stream.of(investors.subList(leftStart, leftEnd), investors.subList(rightStart, rightEnd))
.flatMap(Collection::stream).collect(Collectors.toList()), threshold, parts, asPool).fork();
}
}
}
Метод divide () теперь сужается к центру. Слева он собирает в коллекцию инвесторов с самым маленьким количеством инвестиций, справа — с самым большим.
Потом объединяет их и отдает на исполнение. Таким образом мы достигаем однородности задач на ядро.
Вспомните пример с вычислением первого и последнего числа в примере с «Суммой арифметической прогрессии», о чем говорили ранее. 11+11+11+11+11=55.
Так же претерпели изменения некоторые переменные, и появилась переменная asPool, которую мы вызываем из вне. Таким образом мы делаем подобие пула потоков, где после того как поток заканчивает выполнение одной задачи — берет другую (Переиспользование потоков в пуле). Для того, чтобы симулировать пул в классе AsyncOrderedTask, достаточно увеличить в классе main количество частей, на сколько мы хотим разбить коллекцию инвесторов. Эти методы будут доступны в полном коде, в конце статьи, разбирать мы их не будем.
Добавим метод в TaskManager.
public void twoPointersExec(int thresholdSize, boolean asPool) {
MyPhaser.register();
pool.submit(new AsyncTwoPointerTask(tasks, Math.max(100, thresholdSize), CORES, asPool));
}
Теперь перейдем в метод main и поправим его следующим образом:
public static void main(String[] args) {
List investors = BusinessLogic.getUnhandledInvestors(1024, 120);
MyPhaser.register();
ExecutionMethod commonMethodRandom = new ExecutionMethod("Метод со случайным порядком инвестиций (Практически равномерное распределение)", "Пример: [4,1,10,6,2,9,3,8,7,5]");
ExecutionMethod commonMethodOrdered = new ExecutionMethod("Метод с отсортированным порядком инвестиций (Худший вариант)", "Пример: [1,2,3,4,5,6,7,8,9,10]");
ExecutionMethod twoPointerExecutionRandom = new ExecutionMethod("Метод со случайным порядком инвестиций и применением алгоритма Two Pointers", "Пример: [4,1,10,6,2,9,3,8,7,5]");
ExecutionMethod twoPointerExecutionOrdered = new ExecutionMethod("Метод с отсортированным порядком инвестиций и применением алгоритма Two Pointers", "Пример: [1,2,3,4,5,6,7,8,9,10]");
commonMethodRandom.represent();
commonMethodRandom.setTime(commonMethod(investors, 5));
twoPointerExecutionRandom.represent();
twoPointerExecutionRandom.setTime(twoPointerExecution(investors, false));
investors.sort(Comparator.comparingInt(dividedStep -> dividedStep.getInvestments().size()));
commonMethodOrdered.represent();
commonMethodOrdered.setTime(commonMethod(investors, 5));
twoPointerExecutionOrdered.represent();
twoPointerExecutionOrdered.setTime(twoPointerExecution(investors, false));
MyPhaser.deregister();
}
private static long twoPointerExecution(List investors, boolean asPool) {
TaskManager manager = new TaskManager(investors);
long start = System.currentTimeMillis();
manager.twoPointersExec(investors.size() / 4, asPool);
MyPhaser.awaitAdvance();
long end = System.currentTimeMillis();
System.out.println("Result: " + (end - start) + " ms\n"); // TODO Delete
return (end - start);
}
Мы видим что при случайном порядке инвестиций, при использовании алгоритма «Two pointers», практически нет никакого выигрыша по отношению к обычному последовательному вычислению.
Иногда он может даже проигрывать в скорости, но не значительно. Строго говоря, однородность задач, при случайном порядке инвестиций, мы так же получаем случайно. Потому абсолютно однородными их назвать сложно.
Однако при отсортированном порядке инвестиций мы видим либо, практически, неизменность либо увеличение скорости вычисления.
Обычное последовательное вычисление здесь проигрывает полностью.
Получается что преимуществом однородных задач является не только скорость, но и неизменность скорости вычисления, независимо от порядка и «тяжести» входящих данных.
Мы раскрыли преимущество создания однородных задач, используя математику и алгоритмы!
Так же мы рассматривали варианты использования подобия пула. Для этого поменяем метод main следующим образом.
public static void main(String[] args) {
List investors = BusinessLogic.getUnhandledInvestors(1024, 120);
MyPhaser.register();
ExecutionMethod syncHandle = new ExecutionMethod("Последовательное выполнение со случайным порядком инвестиций", "Пример: [4,1,10,6,2,9,3,8,7,5]");
ExecutionMethod commonMethodRandom = new ExecutionMethod("Метод со случайным порядком инвестиций (Практически равномерное распределение)", "Пример: [4,1,10,6,2,9,3,8,7,5]");
ExecutionMethod commonMethodRandomPool = new ExecutionMethod("Метод со случайным порядком инвестиций по типу пула (Практически равномерное распределение)", "Пример: [4,1,10,6,2,9,3,8,7,5]");
ExecutionMethod twoPointerExecutionRandom = new ExecutionMethod("Метод со случайным порядком инвестиций и применением алгоритма Two Pointers", "Пример: [4,1,10,6,2,9,3,8,7,5]");
ExecutionMethod twoPointerExecutionRandomPool = new ExecutionMethod("Метод со случайным порядком инвестиций и применением алгоритма Two Pointers по типу пула", "Пример: [4,1,10,6,2,9,3,8,7,5]");
ExecutionMethod commonMethodOrdered = new ExecutionMethod("Метод с отсортированным порядком инвестиций (Худший вариант)", "Пример: [1,2,3,4,5,6,7,8,9,10]");
ExecutionMethod commonMethodOrderedPool = new ExecutionMethod("Метод с отсортированным порядком инвестиций по типу пула (Худший вариант с пулом)", "Пример: [1,2,3,4,5,6,7,8,9,10]");
ExecutionMethod twoPointerExecutionOrdered = new ExecutionMethod("Метод с отсортированным порядком инвестиций и применением алгоритма Two Pointers", "Пример: [1,2,3,4,5,6,7,8,9,10]");
ExecutionMethod twoPointerExecutionOrderedPool = new ExecutionMethod("Метод с отсортированным порядком инвестиций и применением алгоритма Two Pointers по типу пула", "Пример: [1,2,3,4,5,6,7,8,9,10]");
syncHandle.represent();
syncHandle.setTime(syncHandle(investors));
commonMethodRandom.represent();
commonMethodRandom.setTime(commonMethod(investors, 5));
commonMethodRandomPool.represent();
commonMethodRandomPool.setTime(commonMethod(investors, 10));
twoPointerExecutionRandom.represent();
twoPointerExecutionRandom.setTime(twoPointerExecution(investors, false));
twoPointerExecutionRandomPool.represent();
twoPointerExecutionRandomPool.setTime(twoPointerExecution(investors, true));
investors.sort(Comparator.comparingInt(dividedStep -> dividedStep.getInvestments().size()));
commonMethodOrdered.represent();
commonMethodOrdered.setTime(commonMethod(investors, 5));
commonMethodOrderedPool.represent();
commonMethodOrderedPool.setTime(commonMethod(investors, 10));
twoPointerExecutionOrdered.represent();
twoPointerExecutionOrdered.setTime(twoPointerExecution(investors, false));
twoPointerExecutionOrderedPool.represent();
twoPointerExecutionOrderedPool.setTime(twoPointerExecution(investors, true));
System.out.println("Результаты:"); // TODO Delete
commonMethodRandom.showExecutionTime();
twoPointerExecutionRandom.showExecutionTime();
twoPointerExecutionRandomPool.showExecutionTime();
commonMethodOrdered.showExecutionTime();
commonMethodOrderedPool.showExecutionTime();
twoPointerExecutionOrdered.showExecutionTime();
twoPointerExecutionOrderedPool.showExecutionTime();
System.out.println("\nНаш победитель:"); // TODO Delete
ExecutionMethod method = Stream.of(commonMethodRandom,
twoPointerExecutionRandom,
twoPointerExecutionRandomPool,
commonMethodOrdered,
commonMethodOrderedPool,
twoPointerExecutionOrdered,
twoPointerExecutionOrderedPool).min(Comparator.comparingLong(ExecutionMethod::getTime))
.orElse(new ExecutionMethod("Пустой", "Пустой"));
method.showExecutionTime();
MyPhaser.deregister();
}
А результаты вы уже сами посмотрите:-)