Миграция на инфраструктуру async-await в Rust

birds migration
img source

На прошлой неделе для Rust комьюнити случилось огромное событие — вышла версия компилятора 1.39, а в месте с ней и стабилизация async-await фичи. В этом посте я постараюсь резюмировать все релевантные изменения в компиляторе и экосистеме, а также предоставить инструкции по миграции на async-await парадигму. Детального разбора асинхронности в Rust я делать не буду, есть всё ещё актуальные статьи на хабре, которые помогут войти в тему:

Помимо указанных статей можно также обратиться к документациям стандартной библиотеке и нужных крейтов, а также почитать async-book (на англ.).

Все примеры, рассматриваемые в статье работают на стабильном компиляторе 1.38 и должны работать на всех последующих версиях. Конечный код доступен на github.

Для реализации асинхронного кода использовалась библиотека futures-0.1. Она предоставляет базовые типажи futures::Future и futures::Stream для работы с отложенными вычислениями. Они оперируют с типами Result<..> и предоставляют набор комбинаторов. Помимо этого, библиотека предоставляет каналы для общения между задачами (task), различные интерфейсы для работы с экзекьютором и его системой задач и прочее.

Рассмотрим пример, который генерирует числовой ряд из старших 32 бит чисел Фибоначчи и отправляет их в Sink:

// futures 0.1.29
use futures::prelude::*;
use futures::{stream, futures};

fn sink_fibb_series(
    sink: impl Sink,
) -> impl Future {
    stream::unfold((1u32, 1), |(mut fact, n)| {
        while fact.checked_mul(n).is_none() {
            fact >>= 1;
        }
        fact *= n;
        Some(future::ok((fact, (fact, n + 1))))
    })
    .forward(sink)
    .map(|_v| ())
}

Зам.: считать CPU-bound задачи на корутинах не самое лучшее применение, зато пример самодостаточен и прост.

Как можно заметить, код выглядит достаточно громоздко: необходимо указывать возвращаемое значение, несмотря на то, что никакого полезного значения в нем нет. В futures 0.3 код становится немного проще:

// futures 0.3.1
use futures::prelude::*;
use futures::stream;

async fn sink_fibb_series(sink: impl Sink) {
    stream::unfold((1u32, 1), |(mut fact, n)| {
        async move {
            while fact.checked_mul(n).is_none() {
                fact >>= 1;
            }
            fact *= n;
            Some((fact, (fact, n + 1)))
        }
    })
    .map(Ok)
    .forward(sink)
    .map(|_v| ())
    .await;
}

Здесь у функции добавляется ключевое слово async, которое оборачивает возвращаемое значение функции в Future. Поскольку в нашем случае это кортеж нулевого размера, то его можно попросту опустить, как и в обычных функциях.

Для ожидания выполнения в конце цепочки вызовов используется ключевое слово await. Этот вызов приостанавливает выполнение в текущем async-контексте и передает управление планировщику до тех пор, пока ожидаемое Future значение не будет готово. Затем выполнение возобновляется с последнего await (в нашем примере завершая функцию), т.е. поток управления становится нелинейным по сравнению с аналогичным синхронным кодом.

Ещё одно значительное различие — наличие async-блока в теле замыкания внутри stream::unfold. Эта обёртка является полным аналогом объявлением новой async-функции с таким же телом и вызовом вместо async-блока.


#[feature (async_closure)

Возможно это замыкание в скором времени написать с помощью фичи async_closure, но увы, она пока не реализована:

async |(mut fact, n)| {
    while fact.checked_mul(n).is_none() {
        fact >>= 1;
    }
    fact *= n;
    Some((fact, (fact, n + 1)))
}

Как можно заметить, новый типаж Stream работает не только с элементами типа Result<..>, как это было ранее. Аналогичные изменения коснулись типажа Future, определения по версиям следующие:

// futures 0.1
trait Future {
    type Item;
    type Error;

    fn poll(&mut self) -> Result, Self::Error>;
}

enum Async {
    Ready(T),
    NotReady
}

// futures 0.3
trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll;
}

enum Poll {
    Ready(T),
    Pending
}

Помимо того, что тип возвращаемого значения может быть произвольным, также поменялись и входные параметры для Future::poll. Появился новый параметр Context, который предоставляет явный интерфейс для пробуждения текущей задачи. Ранее то же самое можно было достигнуть через глобальные переменные конкретного экзекьютора (например через вызов tokio::prelude::task::current().notify()).

Более фундаментальное отличие интерфейса в том, что ссылку на себя требуется оборачивать в Pin. Эта обертка над указателем гарантирует «неподвижность» данных в памяти (более подробное описание Pin есть 1.33 релизе компилятора на хабре, либо на английском, в документации стандартной библиотеки std: pin).

Попробуем теперь запустить наш пример. В качестве Sink возьмем половину канала из futures и на выходной стороне будем печатать результат с некоторой задержкой между итерациями. На futures-0.1 такой код можно написать следующим образом:

use std::time::{Duration, Instant};

// futures 0.1.29
use futures::prelude::*;
use futures::sync::mpsc;
// tokio 0.1.22
use tokio::runtime::Runtime;
use tokio::timer::Delay;

fn main() {
    let mut rt = Runtime::new().unwrap();

    let (tx, rx) = mpsc::channel(32);
    rt.spawn(Box::new(sink_fibb_series(tx.sink_map_err(|_e| ()))));

    let fut = rx.take(100).for_each(|val| {
        println!("{}", val);
        Delay::new(Instant::now() + Duration::from_millis(50))
            .map(|_| ())
            .map_err(|_| ())
    });
    rt.spawn(Box::new(fut));

    rt.shutdown_on_idle().wait().unwrap();
}

Аналогичный код с новым tokio (который на момент написания ещё alpha) и futures-0.3 может выглядеть так:

use std::time::Duration;

// futures 0.3.1
use futures::channel::mpsc;
use futures::prelude::*;
// tokio 0.2.0-alpha.5
use tokio::timer;

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel(32);

    tokio::spawn(sink_fibb_series(tx));

    rx.take(100)
        .for_each(|val| {
            println!("{}", val);
            timer::delay_for(Duration::from_millis(50))
        })
        .await;
}

Как можно заметить, код с новыми футурами стал значительно короче. По опыту автора, количество строк всегда выходит ощутимо меньше (порой даже при переписывании синхронного кода). Но как мне кажется, куда более весомое отличие в читабельности и отсутствия мешанины вызовов map/map_err, которые были необходимы из-за вариативности ошибок у стандартных типов в Result<..>.

Комбинаторы над элементами типа Result<..> тем не менее остались и находятся в отдельных типажах, некоторые со слегка обновленным названием. Теперь они разбиты по двум разным типажам; те, которые реализованы для:

Чуть более сложным оказывается реализация типажей Future и Stream. Для примера попробуем реализовать Stream для уже рассмотренного числового ряда. Общий тип для обеих версий футур будет следующий:

struct FactStream {
    fact: u32,
    n: u32,
}

impl FactStream {
    fn new() -> Self {
        Self { fact: 1, n: 1 }
    }
}

Для futures-0.1 реализация будет следующая:

impl Stream for FactStream {
    type Item = u32;
    type Error = ();

    fn poll(&mut self) -> Poll, Self::Error> {
        while self.fact.checked_mul(self.n).is_none() {
            self.fact >>= 1;
        }
        self.fact *= self.n;
        self.n += 1;
        Ok(Async::Ready(Some(self.fact)))
    }
}

В этом примере реализация Stream::poll фактически является полной копией замыкания stream::unfold. В случае с futures-0.3 реализация оказывается эквивалентной:

impl Stream for FactStream {
    type Item = u32;

    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> {
        while self.fact.checked_mul(self.n).is_none() {
            self.fact >>= 1;
        }
        self.fact *= self.n;
        self.n += 1;

        Poll::Ready(Some(self.fact))
    }
}

Однако, если тип какого-нибудь поля структуры не реализует Unpin, то std::ops::DerefMut не будет реализовать на Pin<&mut T> и тем самым не будет мутабельного доступа ко всем полям:

use std::marker::PhantomPinned;

struct Fact {
    inner: u32,
    // маркер убирает реализацию Unpin у структуры
    _pin: PhantomPinned,
}

struct FactStream {
    fact: Fact,
    n: u32,
}

impl Stream for FactStream {
    type Item = u32;

    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> {
        while self.fact.inner.checked_mul(self.n).is_none() {
            self.fact.inner >>= 1; // <- ошибка компиляции
                                   // trait `DerefMut` is required to modify
                                   // through a dereference, but it is not
                                   // implemented for `std::pin::Pin<&mut FactStream>`
        }
        self.fact.inner *= self.n;  // <- тут аналогично
        self.n += 1;                // <-

        Poll::Ready(Some(self.fact.inner))
    }
}

В таком случае, в том или ином виде придется воспользоваться unsafe функциями Pin::get_unchecked_mut и Pin::map_unchecked_mut для того, чтобы получить «проекцию» !Unpin поля (в документации есть более развернутое описание). К счастью, для таких случаев существует безопасная обёртка реализованная в крейте pin_project (детали реализации можно найти в документации библиотеки).

use pin_project::pin_project;

#[pin_project]
struct FactStream {
    fact: Fact,
    n: u32,
}

impl Stream for FactStream {
    type Item = u32;

    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> {
        let mut this = self.project();

        while this.fact.inner.checked_mul(*this.n).is_none() {
            this.fact.inner >>= 1;
        }
        this.fact.inner *= *this.n;
        *this.n += 1;

        Poll::Ready(Some(this.fact.inner))
    }
}

Последний момент, который хотелось бы освятить это интеропреабельность между типажами разных версий. Для этого существует модуль futures: compat, который позволяет конвертировать из старых типов в новые и наоборот. К примеру можно проитерироваться по Stream из futures-0.1 с помощью async-await:

use std::fmt::Display;

// futures 0.3
use new_futures::compat::Compat01As03 as Compat;
use new_futures::StreamExt as _;
// futures 0.1
use old_futures::Stream as OldStream;

async fn stream_iterate(
    old_stream: impl OldStream,
) -> Result<(), E> {
    let stream = Compat::new(old_stream);
    let mut stream = Box::pin(stream);

    while let Some(item) = stream.as_mut().next().await.transpose()? {
        println!("{}", item);
    }

    Ok(())
}

Примечание: в статье рассматривается только экзекьютор tokio, как наиболее долгоживущий и распространенный. Тем не менее на нём мир не заканчивается, например существует альтернативный async-std, который помимо этого предоставляет футурные обертки для типов стандартной библиотеки, а также ThreadPool и LocalPool из рассмотренной библиотеки futures-0.3.

© Habrahabr.ru