Параллельное выполнение в R
Привет, Хабр!
Параллельные вычисления — подход к проектированию и выполнению программ, который позволяет ускорить обработку данных и вычисления, используя множество процессоров или ядер процессора одновременно.
В ЯП R паралельное выполнение также имеет свои варианты реализации. Рассмотрим их в статье.
Немного о том, что такое параллельное программирование в общих чертах
Существует два основных подхода к параллельному программированию: использование потоков и использование процессов.
На основе потоков задачи разделяются на независимые подзадачи, которые могут выполняться одновременно. Каждая подзадача выполняется в соответствующем потоке, который может работать параллельно с другими потоками. В этом случае все потоки имеют доступ к общей памяти и могут обмениваться данными между собой.
А на основе процессов, задачи разделяются на отдельные процессы, которые выполняются независимо друг от друга. Каждый процесс имеет свою собственную память, и обмен данными между процессами может выполняться с использованием механизмов межпроцессного взаимодействия, таких как сокеты или каналы.
Основные варианты реализации в R и пакеты
parallel
parallel
объединяет функциональность более старых пакетов multicore
и snow
, предоставляя интерфейс для выполнения задач в параллельном режиме. Основные функции:
mclapply()
: аналог функции lapply()
, используемый для параллельной обработки списка элементов. Работает только в юниксо подобных ос:
library(parallel)
result <- mclapply(1:100, function(x) x^2, mc.cores = 2)
parLapply()
: альтернатива lapply()
, подходящая для работы как на одном пк, так и на кластере. Для использования необходимо предварительно создать кластер с помощью makeCluster()
:
cl <- makeCluster(4) # cоздаем кластер из 4 узлов
clusterExport(cl, varlist = "myData") # экспортируем данные в каждый узел кластера
result <- parLapply(cl, 1:100, function(x) x^2)
stopCluster(cl)
makeCluster()
: создает кластер из заданного количества ядер. Кластер затем используется функциями типа parLapply()
:
cl <- makeCluster(4) # 4 ядра
stopCluster(cl)
clusterApply()
: распараллеливает выполнение функции по элементам списка на кластере:
cl <- makeCluster(2)
result <- clusterApply(cl, 1:10, function(x) x^2)
stopCluster(cl)
clusterEvalQ()
и clusterCall()
: позволяют выполнить произвольный код на узлах кластера:
cl <- makeCluster(2)
clusterEvalQ(cl, library(ggplot2)) # ggplot2 на каждом узле
clusterCall(cl, function() Sys.info()['nodename']) # получаем имя узла
stopCluster(cl)
Попробуем распараллелить анализ данных с использованием модели линейной регрессии на различных подвыборках датасета:
library(parallel)
data(mtcars)
cl <- makeCluster(4)
# разделяем данные на подвыборки
subsets <- split(mtcars, mtcars$cyl)
# распараллеливаем вычисления
models <- parLapply(cl, subsets, function(subset) lm(mpg ~ wt, data = subset))
stopCluster(cl)
Код создает параллельный кластер, разделяет набор данных mtcars
на подвыборки по количеству цилиндров, и использует функцию parLapply()
для параллельного подсчета линейных моделей по каждой подвыборке.
foreach
Пакет foreach
предоставляет способ организации циклов в R, который может быть легко адаптирован для параллельной обработки без необходимости изменения основного кода. Основная фича заключается в том, чтобы заменить традиционные циклы R, такие как for
, на конструкцию foreach
, которая затем может использоваться с одним из многих пакетов параллельного бэкенда, к примеруdoParallel
.
Основные функции foreach
:
foreach()
: основная функция, которая используется для определения цикла. Без паралельной обработки:
library(foreach)
result <- foreach(i = 1:100) %do% {
sqrt(i)
}
doParallel
является одним из бэкендов для foreach
, он позволяет выполнять каждую итерацию цикла foreach
параллельно, используя множество процессоров или ядер на компьютере. Основные функции:
registerDoParallel()
: функция для регистрации doParallel
как параллельного бэкенда для foreach
, указывая количество ядер или процессоров, которые должны использоваться для параллельной обработки.
Пример использования обоих пакетов для параллельной обработки:
library(foreach)
library(doParallel)
cl <- makeCluster(4) # создаем кластер из 4 ядер
registerDoParallel(cl) # регистрируем кластер для использования с foreach
result <- foreach(i = 1:100) %dopar% {
sqrt(i)
}
stopCluster(cl)
Здесь создали кластер из четырех ядер с помощью makeCluster
из пакета parallel
на который doParallel
опирается и затем регистрируем этот кластер как бэкенд для foreach
, используя registerDoParallel
. Цикл foreach
автоматически распараллеливается, и каждая итерация выполняется на отдельном ядре.
foreach
также поддерживают различные механизмы контроля за процессом выполнения, например агрегацию результатов с помощью аргумента .combine
, который позволяет определить, как результаты отдельных итераций должны быть объединены в конечный resultd:
result <- foreach(i=1:100, .combine='c') %dopar% {
sqrt(i)
}
Результаты каждой итерации объединяются в один вектор с использованием функции объединения 'c'
.
future
plan()
: функция определяет, как будут выполняться наши future-объекты — локально, в параллельных сессиях, на удаленных машинах и т.д. выбр Можно выбраать стратегию исполнения, например, multisession
, multicore
, sequential
и другие.
future()
: создает future-объект, представляющий значение, которое будет доступно в будущем. Это сердце пакета, позволяющее нам абстрагироваться от деталей параллельного исполнения.
value()
: получает результат выполнения future-объекта, дожидаясь его завершения, если это необходимо.
resolved()
: проверяет, завершилось ли выполнение future-объекта.
future и plan для параллельного вычисления:
library(future)
plan(multisession) # несколько сессий для параллельного исполнения
# future-объект для асинхронного вычисления
fut <- future({
Sys.sleep(5) # имитация длительной операции
sum(1:1000)
})
result <- value(fut)
Инициируем параллельное вычисление суммы чисел от 1 до 1000, при этом основной поток исполнения не блокируется и может продолжать выполнение других задач.
Можно так же реализовать веб.скрапинг к примеру для сбора информации о продуктах с нескольких интернет-магазинов для сравнения цен. можно одновременно отправлять запросы к разным сайтам, уменьшая общее время выполнения скрапинга:
library(future)
plan(multisession)
urls <- c("https://store1.com/product", "https://store2.com/product", "https://store3.com/product")
scrapingFutures <- lapply(urls, function(url) {
future({
scrapeData(url) # псевдофункция для извлечения данных со страницы
})
})
productsData <- lapply(scrapingFutures, value)
RcppParallel
RcppParallel
использует потоки C++ и предлагает простой API для наиболее распространенных операций параллельного программирования в c++.
Допустим есть большой вектор чисел, и мы хотим быстро посчитать его сумму. Используем RcppParallel
для буста процесса:
#include
using namespace RcppParallel;
struct Sum : public Worker
{
// вектор и результат
const RVector input;
double& sum;
// конструктор
Sum(const NumericVector input, double& sum) : input(input), sum(sum) {}
// переопределение оператора ()
void operator()(std::size_t begin, std::size_t end) {
sum += std::accumulate(input.begin() + begin, input.begin() + end, 0.0);
}
};
// [[Rcpp::export]]
double parallelSum(NumericVector x) {
double sum = 0;
Sum sumWorker(x, sum);
parallelReduce(0, x.length(), sumWorker);
return sum;
}
Структура Sum
используется для параллельного вычисления суммы чисел в векторе. parallelReduce
автоматически разбивает данные на части и выполняет их параллельно, используя доступные ядра процессора.
Предположим, нам нужно применить функцию к каждому элементу большого вектора. Вместо того, чтобы делать это последовательно, мы можем выполнить операцию параллельно:
#include
using namespace RcppParallel;
struct Transform : public Worker
{
// исходный и целевой векторы
const RVector input;
RVector output;
Transform(const NumericVector input, NumericVector output) : input(input), output(output) {}
// переопределение оператора ()
void operator()(std::size_t begin, std::size_t end) {
std::transform(input.begin() + begin, input.begin() + end, output.begin() + begin, [](double x) {
return std::sqrt(x); // Например, вычисляем корень квадратный
});
}
};
// [[Rcpp::export]]
NumericVector parallelTransform(NumericVector x) {
NumericVector result(x.size());
Transform transformWorker(x, result);
parallelFor(0, x.size(), transformWorker);
return result;
}
Струтура Transform
применяет некоторую функцию к элементам вектора параллельно.
Прочие примеры применения
Операции с матрицами порой оч. больших вычислительных ресурсов, особенно при работе с большими матрицами:
library(foreach)
library(doParallel)
# задаем количество ядер
cl <- makeCluster(detectCores())
registerDoParallel(cl)
# генерим две большие матрицы
mat1 <- matrix(rnorm(1e6), 1000, 1000)
mat2 <- matrix(rnorm(1e6), 1000, 1000)
# параллельное умножение матриц
result <- foreach(i=1:nrow(mat1), .combine='rbind') %dopar% {
mat1[i, ] %*% mat2
}
stopCluster(cl)
Как можно параллельно обработать большой датасет:
library(foreach)
library(doParallel)
cl <- makeCluster(detectCores())
registerDoParallel(cl)
# представим, что у есть большой датасет 'big_data'
# используем placeholder для демонстрации
big_data <- replicate(100, rnorm(1e4))
# применяем функцию обработки данных к каждому столбцу датасета
processed_data <- foreach(i=1:ncol(big_data), .combine='cbind') %dopar% {
someComplexFunction(big_data[, i])
}
stopCluster(cl)
Как можно параллельно выполнять множество независимых вычислительных задач:
library(foreach)
library(doParallel)
cl <- makeCluster(detectCores())
registerDoParallel(cl)
# выполнение независимых вычислений, например, генерация симуляций
tasks <- 100 # Количество задач
results <- foreach(i=1:tasks, .combine='c') %dopar% {
runSimulation(i) # 'runSimulation' - placeholder для функции симуляции
}
stopCluster(cl)
В целом параллельное вычисление в R реализовано добротно, однако еще далеко до питона.
Статья подготовлена в рамках запуска специализации Системный аналитик и бизнес-аналитик. По ссылкам ниже вы сможете зарегистрироваться на бесплатные вебинары специализации.