Основы многопоточности в Rust

6fd15121cece85c62a2acaf1c7a4d704.png

Привет!

Думаю, уже всем известно, что многопоточность — это мастхев для большинства приложений.

Rust предлагает хорошие решения к задачам многопоточности. В Rust нет места таким распространенным проблемам, как гонки данных или неправильное управление памятью, благодаря его системе владения и заимствования.

Многопоточность в Rust

В самом начале стоит понять, что такое потоки. Поток — это наименьшая единица обработки, которая может быть выполнена операционной системой. В Rust, потоки можно создавать с помощью стандартной библиотеки, используя std::thread. Это позволяет выполнять одновременно несколько задач.

use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("Привет из потока! {}", i);
            thread::sleep(std::time::Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("Привет из основного потока! {}", i);
        thread::sleep(std::time::Duration::from_millis(1));
    }

    handle.join().unwrap();
}

Создаём новый поток с помощью thread::spawn и синхронизируем с основным потоком через join.

Гонка данных возникает, когда два или более потоков одновременно пытаются получить доступ к одним и тем же данным, и по крайней мере один из потоков изменяет эти данные. В Rust, благодаря системе владения и заимствования, гонки данных можно предотвратить на этапе компиляции:

Mutex, или взаимное исключение,  — это механизм синхронизации, который используется для предотвращения одновременного доступа нескольких потоков к общему ресурсу. Mutex обеспечивает безопасный доступ к данным, которые он содержит: только один поток может владеть мьютексом в определенный момент времени.

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Результат: {}", *counter.lock().unwrap());
}

Атомарные операции — это операции, которые гарантированно выполняются целиком, без возможности прерывания другими потоками. В Rust, атомарные типы, такие как AtomicBool или AtomicUsize, предоставляют примитивы для безопасного выполнения операций в многопоточной среде:

use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;

fn main() {
    let counter = AtomicUsize::new(0);
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = &counter;
        let handle = thread::spawn(move || {
            for _ in 0..1000 {
                counter.fetch_add(1, Ordering::SeqCst);
            }
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Результат: {}", counter.load(Ordering::SeqCst));
}

RwLock позволяет множеству потоков читать данные одновременно, но только одному потоку — изменять их в любой момент времени:

use std::sync::{RwLock, Arc};
use std::thread;

fn main() {
    let data = Arc::new(RwLock::new(5));

    let reader = thread::spawn({
        let data = Arc::clone(&data);
        move || {
            println!("Значение данных: {}", *data.read().unwrap());
        }
    });

    let writer = thread::spawn({
        let data = Arc::clone(&data);
        move || {
            let mut data = data.write().unwrap();
            *data += 1;
        }
    });

    reader.join().unwrap();
    writer.join().unwrap();
    println!("Новое значение данных: {}", *data.read().unwrap());
}

Когда данные обернуты в Arc, они могут безопасно разделяться между несколькими потоками. Каждый поток увеличивает счетчик ссылок при доступе к данным и уменьшает его при завершении работы.

Иммутабельность данных — если данные не изменяются, то их безопасно разделять между потоками без дополнительной синхронизации. Rust поощряет использование неизменяемых данных там, где это возможно.

Пулы потоков и Rayon

Пул потоков — это коллекция предварительно созданных потоков, которые могут выполнять задачи. Вместо того чтобы при каждой необходимости создавать новый поток, что является ресурсоемкой операцией, задачи могут быть отправлены в пул, где они выполняются доступными потоками

В Раст пулы потоков часто используются для распределенной обработки данных, параллельных вычислений и в сценариях, где задачи могут быть независимо выполнены в параллельной манере. Создание и управление пулом потоков вручную может быть довольно сложной задачей, но, к счастью, в экосистеме Rust существуют библиотеки, которые упрощают этот процесс.

Rayon — это библиотека в экосистеме Rust, предназначенная для упрощения параллельных вычислений. Она позволяет легко применять параллельные итерации и другие параллельные паттерны к данным, значительно упрощая процесс написания многопоточного кода.

Параллельные итераторы — это основа Rayon, они позволяют легко выполнять итерационные операции в параллельном режиме, к примеру в map:

use rayon::prelude::*;

fn main() {
    let nums = vec![1, 2, 3, 4, 5];
    let squares: Vec<_> = nums.par_iter().map(|&i| i * i).collect();
    println!("{:?}", squares);
}

Код берет вектор чисел и создает новый вектор, содержащий квадраты этих чисел, используя параллельный map.

use rayon::prelude::*;

fn main() {
    let nums = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
    let even_nums: Vec<_> = nums.par_iter().filter(|&&x| x % 2 == 0).collect();
    println!("{:?}", even_nums);
}

Фильтруем вектор чисел, оставляя только четные числа, снова используя параллельные операции.

Rayon также предоставляет функции для параллельных сверток, такие как reduce и fold:

use rayon::prelude::*;

fn main() {
    let nums = vec![1, 2, 3, 4, 5];
    let sum: i32 = nums.par_iter().reduce(|| 0, |a, b| a + b);
    println!("Сумма: {}", sum);
}

Суммируем числа в векторе, используя параллельный reduce.

use rayon::prelude::*;

fn main() {
    let nums = vec![1, 2, 3, 4, 5];
    let sum: i32 = nums.par_iter().fold(|| 0, |a, &b| a + b).sum();
    println!("Сумма: {}", sum);
}

Делаем то же самое, что и в предыдущем примере, но используем fold вместо reduce.

Rayon также предлагает функции для параллельной сортировки массивов и векторов.

use rayon::prelude::*;

fn main() {
    let mut nums = vec![10, 5, 3, 1, 4, 2, 6, 9, 8, 7];
    nums.par_sort();
    println!("{:?}", nums);
}

Сортируем вектор чисел в порядке возрастания, используя параллельную сортировку.

Rayon также позволяет легко распараллеливать выполнение произвольных задач:

use rayon::prelude::*;

fn main() {
    let data = vec![1, 2, 3, 4, 5];

    data.into_par_iter().for_each(|num| {
        println!("Обработка числа: {}", num);
    });
}

Выполняем функцию для каждого элемента вектора параллельно.

Представим, что у нас есть большой вектор целых чисел. Нам нужно выполнить следующие шаги:

  1. Отфильтровать числа, оставив только те, которые делятся на 3.

  2. Каждое отфильтрованное число возвести в квадрат.

  3. Подсчитать сумму всех квадратов.

use rayon::prelude::*;

fn main() {
    // Создаем большой вектор данных
    let data: Vec = (1..=1000000).collect();

    // Выполняем обработку данных в параллельном режиме
    let result: i32 = data
        .par_iter() // Используем параллельный итератор
        .filter(|&&x| x % 3 == 0) // Фильтруем числа, делящиеся на 3
        .map(|&x| x * x) // Возводим каждое число в квадрат
        .sum(); // Суммируем все квадраты

    println!("Сумма квадратов чисел, делящихся на 3: {}", result);
}

Создаем вектор data, содержащий числа от 1 до 1,000,000. Юзаемpar_iter(), для преобразования обычного итератора в параллельный, что позволяет обрабатывать данные в нескольких потоках. С filter отбираем только те числа, которые делятся на 3. map применяется для возведения каждого отфильтрованного числа в квадрат. sum суммирует все полученные квадраты чисел, давая нам итоговый результат.

Немного про асинхронное программирование в Rust

Асинхронное программирование в Rust основано на модели событийного цикла, где программа выполняет код, реагируя на различные события (например, завершение I/O операций). Это позволяет программе одновременно обрабатывать множество задач, не создавая для каждой отдельный поток

Future в Rust — это основной строительный блок асинхронного программирования. Future представляет собой абстракцию над операцией, которая может быть завершена в будущем. Она может находиться в одном из трех состояний:

  1. Pending: операция еще не завершена.

  2. Ready: операция завершена, и результат доступен.

  3. Cancelled: операция была отменена.

Когда Future создается, она начинается в состоянии Pending и переходит в состояние Ready, когда операция завершается. Future сама по себе не выполняет никакой работы; для ее выполнения требуется асинхронный исполнитель (executor).

Асинхронный исполнитель — это компонент, который управляет выполнением Future. Он отвечает за запуск асинхронных задач и продвижение их к завершению. В Rust существуют различные библиотеки, предоставляющие исполнителей, например, tokio и async-std. Это уже конечно совсем отдельная тема про tokio,разберем в будущих статьях. А пока, пару примеров:

Cоздадим две асинхронные функции и соединим их с помощью метода then, который позволяет создать цепочку Future.

use futures::future::FutureExt; // Для доступа к методу `then`
use tokio;

async fn compute_length(input: &str) -> usize {
    input.len()
}

async fn report_length(length: usize) {
    println!("Длина строки: {}", length);
}

#[tokio::main]
async fn main() {
    let input = "Hello, world!";
    compute_length(input)
        .then(|length| report_length(length))
        .await;
}

compute_length сначала вычисляет длину строки, а затем report_length выводит эту длину. then используется для создания цепочки этих двух операций.

Есть в токиоasync move блок, который позволяет захватывать переменные из окружающей среды:

use tokio;

#[tokio::main]
async fn main() {
    let my_string = String::from("Hello, async world!");
    tokio::spawn(async move {
        println!("{}", my_string);
    }).await.unwrap();
}

my_string перемещается в асинхронный блок, и мы используем tokio::spawn для запуска этой асинхронной задачи.

Макрос join! позволяет одновременно ожидать завершения нескольких Future:

use tokio;

async fn task_one() -> String {
    "Результат задачи один".to_string()
}

async fn task_two() -> String {
    "Результат задачи два".to_string()
}

#[tokio::main]
async fn main() {
    let (result_one, result_two) = tokio::join!(task_one(), task_two());
    println!("{}, {}", result_one, result_two);
}

task_one и task_two выполняются параллельно, и join! ожидает завершения обеих задач.

Макрос select! позволяет ждать несколько Future и продолжить выполнение с той, которая завершится первой:

use tokio::time::{delay_for, Duration};
use tokio::select;

async fn task_slow() -> &'static str {
    delay_for(Duration::from_secs(5)).await;
    "Медленная задача завершена"
}

async fn task_fast() -> &'static str {
    delay_for(Duration::from_secs(1)).await;
    "Быстрая задача завершена"
}

#[tokio::main]
async fn main() {
    select! {
        result = task_slow() => {
            println!("{}", result);
        }
        result = task_fast() => {
            println!("{}", result);
        }
    };
}

task_slow и task_fast выполняются одновременно, но select! позволяет продолжить выполнение программы сразу после завершения самой быстрой задачи

В целом, Rust предлагает один из самых безопасных и в то же время интересных способов работы с многопоточностью на мой взгляд, благодаря его системе типов и владений.

В продолжение темы хочу порекомендовать бесплатные вебинары от экспертов рынка про безопасный unsafe Rust и про то,  как Rust побуждает использовать композицию.

© Habrahabr.ru