Асинхронщина в Rust: Стандартная библиотека и async/.await
Перед вами руководство по специфике асинхронного программирования на языке Rust — точка входа в экосистему библиотек, справочник, на который можно опираться при проектировании системы и решении нетривиальных задач. К прочтению рекомендую и опытным разработчикам, и новичкам в экосистеме Rust, только решивших окунуться в эту кроличью нору.
Вас ждёт целый цикл статей разного уровня сложности и погружения, затрагивающий не только асинхронное программирование, но и полезные шаблоны проектирования в Rust, такие как перенос инвариантов бизнес-логики на уровень системы типов, индуктивные вычисления на типах и декларативное программирование посредством комбинаторов.
Сегодня мы рассмотрим API стандартной библиотеки для асинхронного программирования и сам долгожданный синтаксис async/.await
.
Данное пособие несёт исключительно практический характер. За теорией обращайтесь к следующим источникам (по порядку):
- «Programming Paradigms for Dummies: What Every Programmer Should Know»
- Хабр
- «Асинхронность: назад в будущее»
- Wikipedia
- «Asynchronous I/O»
- «Event Loop»
- «Reactor pattern» & «Proactor pattern»
- «The C10K problem»
Стандартная библиотека недавно обзавелась обобщённым интерфейсом для асинхронного программирования, а именно — трейт Future
и модуль std::task
. Эти сущности связывают программы и библиотеки с разными средами асинхронного исполнения (пример такой среды — Tokio, речь о которой пойдёт в следующих статьях), тем самым достигается частичная независимость пользовательского кода от конкретных асинхронных сред.
Точно также, как замыкание представляет собой комбинируемую порцию работы синхронного, последовательного кода, трейт Future
представляет собой комбинируемую порцию работы кода асинхронного (одну асинхронную операцию, футуру, фьючерс). Комбинируемость означает, что точно также, как и замыкание способно вызывать другие замыкания (состоять из них, комбинироваться), асинхронная операция способна вызывать другие асинхронные операции (помимо обычных замыканий/функций).
Рассмотрим в общих чертах принцип работы футуры:
В методе poll
несложно разглядеть модель кооперативной многозадачности, при которой операционная система не осуществляет переключение контекста; вместо этого задачи сами передают контроль экзекьютору (вызывающему коду, планировщику), чтобы тот, в свою очередь, смог эффективно распределять работу задач на доступные физические исполнители (например, ядра процессора). Чаще всего, футуры исполняются на пуле потоков, но возможны и другие сценарии, например, однопоточное исполнение, а в общем случае и более экзотические варианты. Подробнее об этом в следующей статье, посвящённой Tokio.
Принадлежность асинхронных операций потокам ОС и потоков ОС процессорным ядрам неустойчивая, т.е. одна футура может спокойно путешествовать по потокам ОС, а потоки ОС мигрировать по ядрам.
Правильнее будет назвать не «футуры», а зелёные потоки, или асинхронные операции верхнего уровня, таски, задачи.
Как было сказано выше, трейт не накладывает никаких требований на вызов метода poll
, следующего после вызова, вернувшего Poll::Ready(Output)
: он может запаниковать, войти в бесконечный цикл и создавать множество других проблем. Неопределённое поведение, тем не менее, запрещено (нарушение целостности данных, неправильное использование небезопасных функций), вне зависимости от состояния футуры, потому что сама сигнатура poll
ключевым словом unsafe
не помечена. Не следует полагаться на конкретные реализации, лишённые непредвиденного поведения.
Объект типа, реализующего Future
, — это, прежде всего, обычный объект (значение, переменная). Его можно хранить в динамическом массиве, передавать в функции, возвращать из функций, другими словами, делать с ним всё то, что позволено делать с другими объектами. Название такому явлению — сущность (программный компонент) первого класса, а программирование с использованием асинхронных операций в Rust — программирование высшего порядка.
Иллюстрация выше также содержит Waker
. Подробно об этом в следующей секции.
Для лучшего понимания реализуем свою асинхронною операцию WriteFuture
, которая завершится после того, как данные будут отправлены по неблокирующему TCP сокету.
[https://gist.github.com/2f2040d1639bebf723924a73aaa262e7]
use std::{
future::Future,
io::{self, Read, Write},
net::{TcpListener, TcpStream},
pin::Pin,
task::{Context, Poll},
thread::{self, JoinHandle},
};
use tokio::runtime::Runtime;
struct WriteFuture<'a> {
socket: TcpStream,
data: &'a [u8],
}
impl<'a> WriteFuture<'a> {
#[allow(dead_code)]
fn new(socket: TcpStream, data: &'a [u8]) -> Self {
socket.set_nonblocking(true).unwrap();
Self { socket, data }
}
}
impl Future for WriteFuture<'_> {
type Output = io::Result;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll {
let data = self.data;
match self.socket.write(data) {
Ok(length) => Poll::Ready(Ok(length)),
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
cx.waker().wake_by_ref();
Poll::Pending
}
Err(err) => Poll::Ready(Err(err)),
}
}
}
fn main() {
let server = run_server();
let client = run_client();
server.join().unwrap();
client.join().unwrap();
}
const ADDR: &'static str = "127.0.0.1:18373";
fn run_server() -> JoinHandle<()> {
thread::spawn(|| {
let listener = TcpListener::bind(ADDR).unwrap();
let (mut client_accepted, _addr) = listener.accept().unwrap();
let mut message = String::new();
client_accepted.read_to_string(&mut message).unwrap();
dbg!(message);
})
}
fn run_client() -> JoinHandle<()> {
thread::spawn(|| {
let client = TcpStream::connect(ADDR).unwrap();
let mut rt = Runtime::new().unwrap();
rt.block_on(WriteFuture::new(client, b"Hello, world!")).unwrap();
})
}
Наша WriteFuture
имеет один фатальный недостаток: системный вызов отправки внутри self.socket.write(data)
совершается каждый раз в при входе в WriteFuture::poll
. Как это исправить — в следующей статье.
Вывод:
[src/main.rs:60] message = "Hello, world!"
Внимание на метод WriteFuture::poll
. Рассмотрим подробно сопоставление с образом self.socket.write(data)
:
Ветка первая. Данные успешно записаны, возвращаем
Poll::Ready(Ok(length))
. Асинхронная операция теперь считается завершённой.Ветка вторая. Попытка записи данных вернула ошибку
io::ErrorKind::WouldBlock
. Это значит, что данные не могут быть записаны мгновенно, без прерывания нашего приложения на продолжительное время. В этом случае мы даём понять экзекьютору, что наша футура требует ещё хотя бы одного вызова, чтобы стать завершённой.Ветка третья. Попытка записи данных вернула ошибку, отличную от
io::ErrorKind::WouldBlock
. Это может быть потеря соединения, недостаточные привилегии. В этом случае возвращаем эту ошибку, после чего асинхронная операция считается завершённой.
Упрощённо у планировщика задач есть множество футур, каждая из которых в данный момент либо готова сделать прогресс, либо ещё не готова. Работа планировщика заключается в том, чтобы опросить как можно большее количество асинхронных операций, готовых сделать прогресс, за единицу времени.
По-умолчанию новая футура считается готовой совершить прогресс (разморожена), но после первого вызова poll
, если, конечно, асинхронная операция уже не завершилась, она «замораживается». Waker
— это ручка от экзекьютора с одной обязанностью: вовремя размораживать футуру. Для этого достаточно вызвать .wake_by_ref()
/.wake()
на ассоциированном с футурой Waker
, который можно получить, вызвав метод cx.waker()
.
Другими словами, poll
на каждой асинхронной операции экзекьютор как минимум один раз вызовет, а дальше уже всё зависит от возвращаемого значения poll
и контекста. Такой дизайн позволяет не тратить время ЦПУ на бесполезный опрос асинхронных операций, ведь, например, настоящий асинхронный сетевой сокет (а не наша пародия) будет готов принять/отправить данные лишь после того, как очередь событий операционной системы (epoll, kqueue, …) возвратит соответствующее событие.
Без разницы кто и откуда уведомляет экзекьютора о готовности футуры совершить прогресс; достаточным условием является обладать её контекстом. Например, разумно будет в самом цикле событий (или где-то рядом с ним) контроллировать контексты асинхронных сетевых сокетов, потому что их готовность напрямую зависит от поступления события от ОС.
Внутри fn run_client
мы запускаем нашу футуру WriteFuture
:
let mut rt = Runtime::new().unwrap();
rt.block_on(WriteFuture::new(client, b"Hello, world!"));
В первой строчке создаётся сам рантайм Tokio
, а после этого выполнение потока ОС (напомню, что тело fn run_client
находится в thread::spawn
) блокируется до того момента, когда завершится WriteFuture
. Метод block_on
— это точка входа в рантайм, главная асинхронная операция, которая запускает все другие (об этом в следующей статье).
Как написать асинхронную операцию, которая принимает на вход три другие асинхронные операции, отображает их результаты в объекты иного типа и вычисляет их сумму? Три асинхронные операции могут быть, например, отправкой/считыванием данных на/из сервера, функция отображения переводит отправленные/считанные байты в статистику, затем три статистики складываются в одну.
Существует три способа реализовать это:
- Инкапсуляция футур и ручное управление ими в [
Future::poll
]; - Адапторы;
- Синтаксис
async
/.await
.
Вариант №1 (ручное управление)
use std::{
future::Future,
ops::AddAssign,
pin::Pin,
task::{Context, Poll},
};
use pin_project::pin_project;
#[pin_project]
struct CompoundFuture {
fut1: Option,
fut2: Option,
fut3: Fut3,
f: F,
result: Option,
}
impl CompoundFuture
where
Fut1: Future
Вариант №2 (адапторы)
use std::ops::AddAssign;
use futures::Future;
fn compound_future<'a, Fut1, Fut2, Fut3, T, U, F>(
fut1: Fut1,
fut2: Fut2,
fut3: Fut3,
mut f: F,
) -> impl Future- + 'a
where
Fut1: Future
- + 'a,
Fut2: Future
- + 'a,
Fut3: Future
- + 'a,
F: FnMut(T) -> U + 'a,
U: AddAssign,
{
fut1.join3(fut2, fut3).map(move |(a, b, c)| {
let mut result = f(a);
result += f(b);
result += f(c);
result
})
}
В коде выше используется версия futures = "0.1"
, поэтому сигнатура трейта Future
немного другая, но пониманию ситуации это никак не мешает.
Вариант №3 (async/.await)
use std::{future::Future, ops::AddAssign};
async fn compound_future(
fut1: Fut1,
fut2: Fut2,
fut3: Fut3,
mut f: F,
) -> U
where
Fut1: Future,
Fut2: Future,
Fut3: Future,
F: FnMut(T) -> U,
U: AddAssign,
{
let mut result = f(fut1.await);
result += f(fut2.await);
result += f(fut3.await);
result
}
Анализ
В первом варианте мы просто инкапсулировали асинхронные операции в CompoundFuture
и в каждом вызове poll
делаем прогресс текущей операции вручную. Не сложно догадаться, что это чревато ошибками, ведь в конечном итоге придётся писать своё подобие конечного автомата из Future
(что мы и сделали) и много-много повторяющегося кода. Данный подход почти невозможно встретить в пользовательском коде, но полностью исключать его не следует, т.к. он всё ещё используется во внутренностях основополагающих библиотек (futures, Tokio и других).
Второй подход уместился всего в 24 строки, что в 3.5 раза меньше решения с ручным управлением, и, как следствие, понижается риск ошибки, повышается читабельность и поддерживаемость. Он использует понятие адапторов (комбинаторов). Дадим нестрогое определение адапторам:
Адапторы в контексте асинхронных операций — это методы, определённые на типеFut1
, отображающие (возможно, с дополнительными аргументами, влияющими на поведение)Fut1
вFut2<..., Fut1, ...>
, гдеFut1: Future, Fut2: Future
.
Путём последовательного вызова адапторов генерируются новые асинхронные операции (форма ленивых вычислений), каждая из которых обладает собственным поведением. Например, метод futures::future::Future::join3
возвращает футуру, вычисляющую результаты трёх переданных асинхронных операций, а futures::future::Future::map
возвращает футуру, результатом которой есть значение нового типа, полученное заданным отображением результата переданной футуры.
С данным подходом всё гладко ровно до тех пор, пока компилятор не встретит несовпадение типов. Вот пример вывода компилятора в ответ на ошибку в типах в продолжительной цепочки вызова адапторов:
= note: expected type ()
found type futures::future::and_then::AndThen, futures::future::or_else::OrElse, [closure@src\main.rs:139:22: 144:14]>, std::result::Result<(contract::User, std::string::String, i64, i64), http::response::Response>, [closure@src\main.rs:145:23: 162:14]>, futures::future::map_err::MapErr>, [closure@src\main.rs:211:51: 224:46 telegram_client:_, chat_id:_, text:_]>, futures::future::result_::FutureResult<(), telegram_client::TelegramClientError>>, [closure@src\main.rs:173:90: 230:30 file_id:_, ext:_, user:_, message_id:_, dbs:_, chat_id:_, telegram_client:_]>, futures::future::result_::FutureResult<(), telegram_client::TelegramClientError>>, [closure@src\main.rs:166:31: 235:22 user:_, chat_id:_, message_id:_, telegram_client:_, file_id:_, dbs:_]>, [closure@src\main.rs:236:30: 242:22]>, [closure@src\main.rs:163:23: 243:14 telegram_client:_, dbs:_]>, [closure@src\main.rs:245:18: 248:14]>, std::result::Result, hyper::error::Error>, fn(http::response::Response) -> std::result::Result, hyper::error::Error> {std::result::Result, hyper::error::Error>::Ok}>, [closure@src\main.rs:136:54: 250:6 telegram_client:_, dbs:_]>
[Взято отсюда]
Это происходит, потому что адапторы порождают глубоковложенные обобщённые типы: каждый новый адаптор — это новый уровень вложенности. Компилятору зачастую не остаётся ничего лучше, кроме как вывести их целиком на экран, вместе с типами сгенерированных замыканий, лайфтаймами и полными путями (что и демонстрирует сообщение об ошибке выше).
Проблемы предыдущих подходов решает синтаксис async
/.await
, силами которого можно писать асинхронный код в синхронном стиле. Асинхронный участок кода (async fn
, async { ... }
или async move { ... }
) компилятором трансформируется в объект, реализующий Future
, а fut.await
внутри этих участков асинхронно ждёт выполнения fut
. Это означает, что в итоговом методе poll
код, расположенный после fut.await
, выполняться не будет, пока fut
не вернёт Poll::Ready(Output)
. Все приведённые выше реализации одинаковы по смыслу (но итоговый код может разный сгенерироваться). Асинхронный участок кода порождает объект с неявным типом в противовес комбинаторам, которые наслаивают их друг на друга.
Ещё одно неочевидное преимущество async
/.await
над адапторным/комбинаторным подходом — возможность занимать данные между «вызовами» .await
— устраняется нужда в клонировании или использовании сырых (небезопасных) указателей.
Как и с любым нетривиальным решением в дизайне языка, особенно совмещающего системное программирование со сравнительно мощной системой типов, существуют несколько противоположных мнений. Вот некоторые аргументы против async
/.await
:
Слишком дорогое нововведение. Попытки реализовать
async
/.await
начались ещё в 2018 году (если не раньше), в то время как можно было обойтись старым дизайном с адапторами/комбинаторами (как это сделано во многих других языках, например, Scala, Java, …) и потратить время на более важные вещи, например, на систему типов, при условии, что компилятор научится более доходчиво объяснять несовпадения в типах, встречающиеся в том числе в ряде других мест (те же итераторы, стримы (речь о них в следующих статьях)).Неконсистентность синтаксиса.
Future
можно было бы выразить монадой, что позволило бы нам обращаться с асинхронными операциями при помощи do-нотации, в прочем, как и сResult
,Option
,Either
и так далее. Однако в текущей стабильной версии Rust это оказывается невозможным сделать по-человечески без полной поддержки HKT — Higher-Kinded Types. Более подробную аргументацию смотреть в статье Алексея Жуковского «Монады как паттерн переиспользования кода». Если заинтересовала тема HKT, также будет интересно к прочтению
Наличие разницы между async move { ... }
и async { ... }
может сперва показаться неочевидной, ведь у версии простого блока ({ ... }
) не существует move-аналога. Дело в том, что первый асинхронный вариант (с move
) овладевает захваченными переменными, а второй вариант (без move
) — сначала пытается заимствовать среду иммутабельно, если не получилось — заимствовать мутабельно, если и это не получилось — овладевает (вспомним, что асинхронные блоки преобразуются компилятором в анонимные типы, реализующие трейт Future
).
Вот простой пример, демонстрирующий различие. Функция foo()
два раза выводит строку с abc
на экран:
async fn foo() {
let string = String::from("abc");
async { dbg!(&string); }.await;
async { dbg!(&string); }.await;
}
То есть два асинхронных блока заняли string
. Если же добавить ключевое слово move
после и первого, и второго async
, то получим ошибку компиляции, как и следовало ожидать, ведь async move { ... }
пытается сразу овладеть переменной:
error[E0382]: use of moved value: `string`
--> src/lib.rs:5:16
|
2 | let string = String::from("abc");
| ------ move occurs because `string` has type `std::string::String`, which does not implement the `Copy` trait
3 |
4 | async move { dbg!(&string); }.await;
| ------------------
| | |
| | variable moved due to use in generator
| value moved here
5 | async move { dbg!(&string); }.await;
| ^^^^^^^^------^^^^
| | |
| | use occurs due to use in generator
| value used here after move
Рассмотрим ещё один пример:
async fn foo() -> String {
let string = String::from("abc");
async { string }.await
}
Здесь асинхронный блок не смог заимствовать string
иммутабельно и мутабельно, и совершил свою последнюю (удачную) попытку — овладел ей. Если переписать код с использованием move
, то результат останется тем же.
И наконец, рассмотрим пример с мутабельным заимствованием:
async fn foo() {
let mut string = String::from("abc");
async { string.push_str("def"); }.await;
dbg!(string);
}
Заимствовать иммутабельно не вышло, ведь сигнатура у String::push_str
требует &mut self
, значит заимствуем мутабельно — компилируется успешно. Если же присоединить move
, то, как вы уже догадались, произойдёт ошибка компиляции, ведь на последней строке функции располжен вызов макроса dbg!
, требующего владения string
:
error[E0382]: use of moved value: `string`
--> src/lib.rs:6:10
|
2 | let mut string = String::from("abc");
| ---------- move occurs because `string` has type `std::string::String`, which does not implement the `Copy` trait
3 |
4 | async move { string.push_str("def"); }.await;
| ---------------------------
| | |
| | variable moved due to use in generator
| value moved here
5 |
6 | dbg!(string);
| ^^^^^^ value used here after move
Попробуем скомпилировать следующий код:
fn main() {
let closure = async || {
dbg!();
};
}
Вывод:
error[E0658]: async closures are unstable
--> src/main.rs:2:19
|
2 | let closure = async || {
| ^^^^^
|
= note: see issue #62290 for more information
То есть асинхронные замыкания ещё не стабилизировались (RFC). Не путайте асинхронные замыкания и замыкания, возвращающие Future
:
- Первое —
async [move] |...| { ... }
(нестабилизированное) - Второе —
|...| async [move] { ... }
(стабилизированное)
Как вы уже могли заметить, в сигнатуре метода Future::poll
присутствует self: Pin<&mut Self>
. Что это значит? Это даёт гарантию компилятору, что асинхронная операция не переместится в памяти во время выполнения. Зачем это нужно? Представим ситуацию, где на вход компилятору подаётся такая конструкция (взято отсюда):
async {
let mut x = [0; 128];
let read_into_buf_fut = read_into_buf(&mut x);
read_into_buf_fut.await;
println!("{:?}", x);
}
Должно быть сгенерировано следующее:
struct ReadIntoBuf<'a> {
buf: &'a mut [u8], // Указывает на `x` снизу
}
struct AsyncFuture {
x: [u8; 128],
read_into_buf_fut: ReadIntoBuf<'self>,
}
Лайфтайм 'self
не разрешён в пользовательском коде, но в целях демонстрации он присутствует в read_into_buf_fut: ReadIntoBuf<'self>
. Это означает, что ссылка живёт столько же, сколько и сама структура AsyncFuture
.
Если объект AsyncFuture
переместится в памяти, то указатель в read_into_buf_fut.buf
инвалидируется (станет указывать на неправильное значение), что вызовет UB. Решение простое — запретить перемещение в памяти асинхронных операций во время их выполнения, что и делает self: Pin<&mut Self>
.
Pin
многие считают сложной темой в Rust, которую стороной обойти не получиться, если вы собираетесь вплотную заниматься асинхронным программированием. Советую прямо сейчас внимательно прочесть соответствующую страницу в документации.
Future
— трейт для асинхронных (конкурентных) вычислений с poll-based кооперативной моделью исполнения. Объекты Future
являются программными компонентами первого класса: мы можем с ними обращаться как нам вздумается, а сами по себе они ничего не делают — чтобы их выполнить, необходимо вызывать метод Future::poll
до того момента, когда он вернёт Poll::Ready(Output)
. Обычно этим занимается отдельный модуль, называемый экзекьютором (или планировщиком).
async
/.await
в Rust — синтаксис для построения анонимных асинхронных операций. Код которых смотрится так, будто выполнен в синхронном стиле.
Ключевое слово async
, после которого следуют либо фигурные скобки, либо остальная сигнатура функции, обозначает начало асинхронного блока, т.е. нечто, вычисляющее анонимную асинхронную операцию.
Ключевое слово await
, которое синтаксически ведёт себя как поле, асинхронно ожидает футуру fut
(fut.await
). Остальное содержимое асинхронного блока будет выполнено лишь после завершения fut
. Комбинаторный аналог — futures::Future::and_then
(futures 0.1.x).
Далее — про то, что позволяет безжизненные футуры превратить в работающий код: про асинхронную среду исполнения Tokio.
За ревью спасибо @blandger, также другим людям за поддержку в русскоязычном Телеграм-чате @rust_async, куда вы тоже можете задавать вопросы, связанные с асинхронным программированием в Rust.