Основы многопоточности в Rust
Привет!
Думаю, уже всем известно, что многопоточность — это мастхев для большинства приложений.
Rust предлагает хорошие решения к задачам многопоточности. В Rust нет места таким распространенным проблемам, как гонки данных или неправильное управление памятью, благодаря его системе владения и заимствования.
Многопоточность в Rust
В самом начале стоит понять, что такое потоки. Поток — это наименьшая единица обработки, которая может быть выполнена операционной системой. В Rust, потоки можно создавать с помощью стандартной библиотеки, используя std::thread
. Это позволяет выполнять одновременно несколько задач.
use std::thread;
fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("Привет из потока! {}", i);
thread::sleep(std::time::Duration::from_millis(1));
}
});
for i in 1..5 {
println!("Привет из основного потока! {}", i);
thread::sleep(std::time::Duration::from_millis(1));
}
handle.join().unwrap();
}
Создаём новый поток с помощью thread::spawn
и синхронизируем с основным потоком через join
.
Гонка данных возникает, когда два или более потоков одновременно пытаются получить доступ к одним и тем же данным, и по крайней мере один из потоков изменяет эти данные. В Rust, благодаря системе владения и заимствования, гонки данных можно предотвратить на этапе компиляции:
Mutex, или взаимное исключение, — это механизм синхронизации, который используется для предотвращения одновременного доступа нескольких потоков к общему ресурсу. Mutex
обеспечивает безопасный доступ к данным, которые он содержит: только один поток может владеть мьютексом в определенный момент времени.
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Результат: {}", *counter.lock().unwrap());
}
Атомарные операции — это операции, которые гарантированно выполняются целиком, без возможности прерывания другими потоками. В Rust, атомарные типы, такие как AtomicBool
или AtomicUsize
, предоставляют примитивы для безопасного выполнения операций в многопоточной среде:
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
fn main() {
let counter = AtomicUsize::new(0);
let mut handles = vec![];
for _ in 0..10 {
let counter = &counter;
let handle = thread::spawn(move || {
for _ in 0..1000 {
counter.fetch_add(1, Ordering::SeqCst);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Результат: {}", counter.load(Ordering::SeqCst));
}
RwLock
позволяет множеству потоков читать данные одновременно, но только одному потоку — изменять их в любой момент времени:
use std::sync::{RwLock, Arc};
use std::thread;
fn main() {
let data = Arc::new(RwLock::new(5));
let reader = thread::spawn({
let data = Arc::clone(&data);
move || {
println!("Значение данных: {}", *data.read().unwrap());
}
});
let writer = thread::spawn({
let data = Arc::clone(&data);
move || {
let mut data = data.write().unwrap();
*data += 1;
}
});
reader.join().unwrap();
writer.join().unwrap();
println!("Новое значение данных: {}", *data.read().unwrap());
}
Когда данные обернуты в Arc
, они могут безопасно разделяться между несколькими потоками. Каждый поток увеличивает счетчик ссылок при доступе к данным и уменьшает его при завершении работы.
Иммутабельность данных — если данные не изменяются, то их безопасно разделять между потоками без дополнительной синхронизации. Rust поощряет использование неизменяемых данных там, где это возможно.
Пулы потоков и Rayon
Пул потоков — это коллекция предварительно созданных потоков, которые могут выполнять задачи. Вместо того чтобы при каждой необходимости создавать новый поток, что является ресурсоемкой операцией, задачи могут быть отправлены в пул, где они выполняются доступными потоками
В Раст пулы потоков часто используются для распределенной обработки данных, параллельных вычислений и в сценариях, где задачи могут быть независимо выполнены в параллельной манере. Создание и управление пулом потоков вручную может быть довольно сложной задачей, но, к счастью, в экосистеме Rust существуют библиотеки, которые упрощают этот процесс.
Rayon — это библиотека в экосистеме Rust, предназначенная для упрощения параллельных вычислений. Она позволяет легко применять параллельные итерации и другие параллельные паттерны к данным, значительно упрощая процесс написания многопоточного кода.
Параллельные итераторы — это основа Rayon, они позволяют легко выполнять итерационные операции в параллельном режиме, к примеру в map:
use rayon::prelude::*;
fn main() {
let nums = vec![1, 2, 3, 4, 5];
let squares: Vec<_> = nums.par_iter().map(|&i| i * i).collect();
println!("{:?}", squares);
}
Код берет вектор чисел и создает новый вектор, содержащий квадраты этих чисел, используя параллельный map
.
use rayon::prelude::*;
fn main() {
let nums = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let even_nums: Vec<_> = nums.par_iter().filter(|&&x| x % 2 == 0).collect();
println!("{:?}", even_nums);
}
Фильтруем вектор чисел, оставляя только четные числа, снова используя параллельные операции.
Rayon также предоставляет функции для параллельных сверток, такие как reduce
и fold
:
use rayon::prelude::*;
fn main() {
let nums = vec![1, 2, 3, 4, 5];
let sum: i32 = nums.par_iter().reduce(|| 0, |a, b| a + b);
println!("Сумма: {}", sum);
}
Суммируем числа в векторе, используя параллельный reduce
.
use rayon::prelude::*;
fn main() {
let nums = vec![1, 2, 3, 4, 5];
let sum: i32 = nums.par_iter().fold(|| 0, |a, &b| a + b).sum();
println!("Сумма: {}", sum);
}
Делаем то же самое, что и в предыдущем примере, но используем fold
вместо reduce
.
Rayon также предлагает функции для параллельной сортировки массивов и векторов.
use rayon::prelude::*;
fn main() {
let mut nums = vec![10, 5, 3, 1, 4, 2, 6, 9, 8, 7];
nums.par_sort();
println!("{:?}", nums);
}
Сортируем вектор чисел в порядке возрастания, используя параллельную сортировку.
Rayon также позволяет легко распараллеливать выполнение произвольных задач:
use rayon::prelude::*;
fn main() {
let data = vec![1, 2, 3, 4, 5];
data.into_par_iter().for_each(|num| {
println!("Обработка числа: {}", num);
});
}
Выполняем функцию для каждого элемента вектора параллельно.
Представим, что у нас есть большой вектор целых чисел. Нам нужно выполнить следующие шаги:
Отфильтровать числа, оставив только те, которые делятся на 3.
Каждое отфильтрованное число возвести в квадрат.
Подсчитать сумму всех квадратов.
use rayon::prelude::*;
fn main() {
// Создаем большой вектор данных
let data: Vec = (1..=1000000).collect();
// Выполняем обработку данных в параллельном режиме
let result: i32 = data
.par_iter() // Используем параллельный итератор
.filter(|&&x| x % 3 == 0) // Фильтруем числа, делящиеся на 3
.map(|&x| x * x) // Возводим каждое число в квадрат
.sum(); // Суммируем все квадраты
println!("Сумма квадратов чисел, делящихся на 3: {}", result);
}
Создаем вектор data
, содержащий числа от 1 до 1,000,000. Юзаемpar_iter()
, для преобразования обычного итератора в параллельный, что позволяет обрабатывать данные в нескольких потоках. С filter
отбираем только те числа, которые делятся на 3. map
применяется для возведения каждого отфильтрованного числа в квадрат. sum
суммирует все полученные квадраты чисел, давая нам итоговый результат.
Немного про асинхронное программирование в Rust
Асинхронное программирование в Rust основано на модели событийного цикла, где программа выполняет код, реагируя на различные события (например, завершение I/O операций). Это позволяет программе одновременно обрабатывать множество задач, не создавая для каждой отдельный поток
Future
в Rust — это основной строительный блок асинхронного программирования. Future
представляет собой абстракцию над операцией, которая может быть завершена в будущем. Она может находиться в одном из трех состояний:
Pending: операция еще не завершена.
Ready: операция завершена, и результат доступен.
Cancelled: операция была отменена.
Когда Future
создается, она начинается в состоянии Pending
и переходит в состояние Ready
, когда операция завершается. Future
сама по себе не выполняет никакой работы; для ее выполнения требуется асинхронный исполнитель (executor).
Асинхронный исполнитель — это компонент, который управляет выполнением Future
. Он отвечает за запуск асинхронных задач и продвижение их к завершению. В Rust существуют различные библиотеки, предоставляющие исполнителей, например, tokio
и async-std
. Это уже конечно совсем отдельная тема про tokio,
разберем в будущих статьях. А пока, пару примеров:
Cоздадим две асинхронные функции и соединим их с помощью метода then
, который позволяет создать цепочку Future
.
use futures::future::FutureExt; // Для доступа к методу `then`
use tokio;
async fn compute_length(input: &str) -> usize {
input.len()
}
async fn report_length(length: usize) {
println!("Длина строки: {}", length);
}
#[tokio::main]
async fn main() {
let input = "Hello, world!";
compute_length(input)
.then(|length| report_length(length))
.await;
}
compute_length
сначала вычисляет длину строки, а затем report_length
выводит эту длину. then
используется для создания цепочки этих двух операций.
Есть в токиоasync move
блок, который позволяет захватывать переменные из окружающей среды:
use tokio;
#[tokio::main]
async fn main() {
let my_string = String::from("Hello, async world!");
tokio::spawn(async move {
println!("{}", my_string);
}).await.unwrap();
}
my_string
перемещается в асинхронный блок, и мы используем tokio::spawn
для запуска этой асинхронной задачи.
Макрос join!
позволяет одновременно ожидать завершения нескольких Future
:
use tokio;
async fn task_one() -> String {
"Результат задачи один".to_string()
}
async fn task_two() -> String {
"Результат задачи два".to_string()
}
#[tokio::main]
async fn main() {
let (result_one, result_two) = tokio::join!(task_one(), task_two());
println!("{}, {}", result_one, result_two);
}
task_one
и task_two
выполняются параллельно, и join!
ожидает завершения обеих задач.
Макрос select!
позволяет ждать несколько Future
и продолжить выполнение с той, которая завершится первой:
use tokio::time::{delay_for, Duration};
use tokio::select;
async fn task_slow() -> &'static str {
delay_for(Duration::from_secs(5)).await;
"Медленная задача завершена"
}
async fn task_fast() -> &'static str {
delay_for(Duration::from_secs(1)).await;
"Быстрая задача завершена"
}
#[tokio::main]
async fn main() {
select! {
result = task_slow() => {
println!("{}", result);
}
result = task_fast() => {
println!("{}", result);
}
};
}
task_slow
и task_fast
выполняются одновременно, но select!
позволяет продолжить выполнение программы сразу после завершения самой быстрой задачи
В целом, Rust предлагает один из самых безопасных и в то же время интересных способов работы с многопоточностью на мой взгляд, благодаря его системе типов и владений.
В продолжение темы хочу порекомендовать бесплатные вебинары от экспертов рынка про безопасный unsafe Rust и про то, как Rust побуждает использовать композицию.