Параллельное выполнение в R

736fd8138c81fd14c1623fa8d230e6f6.png

Привет, Хабр!

Параллельные вычисления — подход к проектированию и выполнению программ, который позволяет ускорить обработку данных и вычисления, используя множество процессоров или ядер процессора одновременно.

В ЯП R паралельное выполнение также имеет свои варианты реализации. Рассмотрим их в статье.

Немного о том, что такое параллельное программирование в общих чертах

Существует два основных подхода к параллельному программированию: использование потоков и использование процессов.

На основе потоков задачи разделяются на независимые подзадачи, которые могут выполняться одновременно. Каждая подзадача выполняется в соответствующем потоке, который может работать параллельно с другими потоками. В этом случае все потоки имеют доступ к общей памяти и могут обмениваться данными между собой.

А на основе процессов, задачи разделяются на отдельные процессы, которые выполняются независимо друг от друга. Каждый процесс имеет свою собственную память, и обмен данными между процессами может выполняться с использованием механизмов межпроцессного взаимодействия, таких как сокеты или каналы.

Основные варианты реализации в R и пакеты

parallel

parallel объединяет функциональность более старых пакетов multicore и snow, предоставляя интерфейс для выполнения задач в параллельном режиме. Основные функции:

mclapply(): аналог функции lapply(), используемый для параллельной обработки списка элементов. Работает только в юниксо подобных ос:

library(parallel)
result <- mclapply(1:100, function(x) x^2, mc.cores = 2)

parLapply(): альтернатива lapply(), подходящая для работы как на одном пк, так и на кластере. Для использования необходимо предварительно создать кластер с помощью makeCluster():

cl <- makeCluster(4) # cоздаем кластер из 4 узлов
clusterExport(cl, varlist = "myData") # экспортируем данные в каждый узел кластера
result <- parLapply(cl, 1:100, function(x) x^2)
stopCluster(cl)

makeCluster(): создает кластер из заданного количества ядер. Кластер затем используется функциями типа parLapply():

cl <- makeCluster(4) # 4 ядра
stopCluster(cl)

clusterApply(): распараллеливает выполнение функции по элементам списка на кластере:

cl <- makeCluster(2)
result <- clusterApply(cl, 1:10, function(x) x^2)
stopCluster(cl)

clusterEvalQ() и clusterCall(): позволяют выполнить произвольный код на узлах кластера:

cl <- makeCluster(2)
clusterEvalQ(cl, library(ggplot2)) # ggplot2 на каждом узле
clusterCall(cl, function() Sys.info()['nodename']) # получаем имя узла
stopCluster(cl)

Попробуем распараллелить анализ данных с использованием модели линейной регрессии на различных подвыборках датасета:

library(parallel)
data(mtcars)
cl <- makeCluster(4)

# разделяем данные на подвыборки
subsets <- split(mtcars, mtcars$cyl)

# распараллеливаем вычисления
models <- parLapply(cl, subsets, function(subset) lm(mpg ~ wt, data = subset))

stopCluster(cl)

Код создает параллельный кластер, разделяет набор данных mtcars на подвыборки по количеству цилиндров, и использует функцию parLapply() для параллельного подсчета линейных моделей по каждой подвыборке.

foreach

Пакет foreach предоставляет способ организации циклов в R, который может быть легко адаптирован для параллельной обработки без необходимости изменения основного кода. Основная фича заключается в том, чтобы заменить традиционные циклы R, такие как for, на конструкцию foreach, которая затем может использоваться с одним из многих пакетов параллельного бэкенда, к примеруdoParallel.

Основные функции foreach:

foreach(): основная функция, которая используется для определения цикла. Без паралельной обработки:

library(foreach)
result <- foreach(i = 1:100) %do% {
  sqrt(i)
}

doParallel является одним из бэкендов для foreach, он позволяет выполнять каждую итерацию цикла foreach параллельно, используя множество процессоров или ядер на компьютере. Основные функции:

registerDoParallel(): функция для регистрации doParallel как параллельного бэкенда для foreach, указывая количество ядер или процессоров, которые должны использоваться для параллельной обработки.

Пример использования обоих пакетов для параллельной обработки:

library(foreach)
library(doParallel)

cl <- makeCluster(4) # создаем кластер из 4 ядер
registerDoParallel(cl) # регистрируем кластер для использования с foreach

result <- foreach(i = 1:100) %dopar% {
  sqrt(i)
}

stopCluster(cl)

Здесь создали кластер из четырех ядер с помощью makeCluster из пакета parallel на который doParallel опирается и затем регистрируем этот кластер как бэкенд для foreach, используя registerDoParallel. Цикл foreach автоматически распараллеливается, и каждая итерация выполняется на отдельном ядре.

foreach также поддерживают различные механизмы контроля за процессом выполнения, например агрегацию результатов с помощью аргумента .combine, который позволяет определить, как результаты отдельных итераций должны быть объединены в конечный resultd:

result <- foreach(i=1:100, .combine='c') %dopar% {
  sqrt(i)
}

Результаты каждой итерации объединяются в один вектор с использованием функции объединения 'c'.

future

plan(): функция определяет, как будут выполняться наши future-объекты — локально, в параллельных сессиях, на удаленных машинах и т.д. выбр Можно выбраать стратегию исполнения, например, multisession, multicore, sequential и другие.

future(): создает future-объект, представляющий значение, которое будет доступно в будущем. Это сердце пакета, позволяющее нам абстрагироваться от деталей параллельного исполнения.

value(): получает результат выполнения future-объекта, дожидаясь его завершения, если это необходимо.

resolved(): проверяет, завершилось ли выполнение future-объекта.

future и plan для параллельного вычисления:

library(future)
plan(multisession)  # несколько сессий для параллельного исполнения

# future-объект для асинхронного вычисления
fut <- future({
  Sys.sleep(5)  # имитация длительной операции
  sum(1:1000)
})

result <- value(fut)

Инициируем параллельное вычисление суммы чисел от 1 до 1000, при этом основной поток исполнения не блокируется и может продолжать выполнение других задач.

Можно так же реализовать веб.скрапинг к примеру для сбора информации о продуктах с нескольких интернет-магазинов для сравнения цен. можно одновременно отправлять запросы к разным сайтам, уменьшая общее время выполнения скрапинга:

library(future)
plan(multisession)

urls <- c("https://store1.com/product", "https://store2.com/product", "https://store3.com/product")
scrapingFutures <- lapply(urls, function(url) {
  future({
    scrapeData(url)  # псевдофункция для извлечения данных со страницы
  })
})

productsData <- lapply(scrapingFutures, value)

RcppParallel

RcppParallel использует потоки C++ и предлагает простой API для наиболее распространенных операций параллельного программирования в c++.

Допустим есть большой вектор чисел, и мы хотим быстро посчитать его сумму. Используем RcppParallel для буста процесса:

#include 
using namespace RcppParallel;

struct Sum : public Worker
{   
    // вектор и результат
    const RVector input;
    double& sum;
    
    // конструктор
    Sum(const NumericVector input, double& sum) : input(input), sum(sum) {}
    
    // переопределение оператора ()
    void operator()(std::size_t begin, std::size_t end) {
        sum += std::accumulate(input.begin() + begin, input.begin() + end, 0.0);
    }
};

// [[Rcpp::export]]
double parallelSum(NumericVector x) {
    double sum = 0;
    Sum sumWorker(x, sum);
    parallelReduce(0, x.length(), sumWorker);
    return sum;
}

Структура Sum используется для параллельного вычисления суммы чисел в векторе. parallelReduce автоматически разбивает данные на части и выполняет их параллельно, используя доступные ядра процессора.

Предположим, нам нужно применить функцию к каждому элементу большого вектора. Вместо того, чтобы делать это последовательно, мы можем выполнить операцию параллельно:

#include 
using namespace RcppParallel;

struct Transform : public Worker
{
    // исходный и целевой векторы
    const RVector input;
    RVector output;
    
    Transform(const NumericVector input, NumericVector output) : input(input), output(output) {}
    
    // переопределение оператора ()
    void operator()(std::size_t begin, std::size_t end) {
        std::transform(input.begin() + begin, input.begin() + end, output.begin() + begin, [](double x) {
            return std::sqrt(x); // Например, вычисляем корень квадратный
        });
    }
};

// [[Rcpp::export]]
NumericVector parallelTransform(NumericVector x) {
    NumericVector result(x.size());
    Transform transformWorker(x, result);
    parallelFor(0, x.size(), transformWorker);
    return result;
}

Струтура Transform применяет некоторую функцию к элементам вектора параллельно.

Прочие примеры применения

Операции с матрицами порой оч. больших вычислительных ресурсов, особенно при работе с большими матрицами:

library(foreach)
library(doParallel)

# задаем количество ядер
cl <- makeCluster(detectCores())
registerDoParallel(cl)

# генерим две большие матрицы
mat1 <- matrix(rnorm(1e6), 1000, 1000)
mat2 <- matrix(rnorm(1e6), 1000, 1000)

# параллельное умножение матриц
result <- foreach(i=1:nrow(mat1), .combine='rbind') %dopar% {
  mat1[i, ] %*% mat2
}

stopCluster(cl)

Как можно параллельно обработать большой датасет:

library(foreach)
library(doParallel)

cl <- makeCluster(detectCores())
registerDoParallel(cl)

# представим, что у есть большой датасет 'big_data'
# используем placeholder для демонстрации
big_data <- replicate(100, rnorm(1e4))

# применяем функцию обработки данных к каждому столбцу датасета
processed_data <- foreach(i=1:ncol(big_data), .combine='cbind') %dopar% {
  someComplexFunction(big_data[, i])
}

stopCluster(cl)

Как можно параллельно выполнять множество независимых вычислительных задач:

library(foreach)
library(doParallel)

cl <- makeCluster(detectCores())
registerDoParallel(cl)

# выполнение независимых вычислений, например, генерация симуляций
tasks <- 100 # Количество задач

results <- foreach(i=1:tasks, .combine='c') %dopar% {
  runSimulation(i) # 'runSimulation' - placeholder для функции симуляции
}

stopCluster(cl)

В целом параллельное вычисление в R реализовано добротно, однако еще далеко до питона.

Статья подготовлена в рамках запуска специализации Системный аналитик и бизнес-аналитик. По ссылкам ниже вы сможете зарегистрироваться на бесплатные вебинары специализации.

© Habrahabr.ru