[Из песочницы] Использование Boost.Asio с Coroutines TS
Введение
Использование функций обратного вызова (callback) — популярный подход к построению сетевых приложений с использованием библиотеки Boost.Asio (и не только ее). Проблемой этого подхода является ухудшение читабельности и поддерживаемости кода при усложнении логики протокола обмена данными [1].
Как альтернатива коллбекам, сопрограммы (coroutines) можно применить для написания асинхронного кода, уровень читабельности которого будет близок к читабельности синхронного кода. Boost.Asio поддерживает такой подход, предоставляя возможность использования библиотеки Boost.Coroutine для обработки коллбеков.
Boost.Coroutine реализует сопрограммы с помощью сохранения контекста выполнения текущего потока. Этот подход конкурировал за включение в следующую редакцию стандарта C++ с предложением от Microsoft, которое вводит новые ключевые слова co_return, co_yield и co_await. Предложение Microsoft получило статус Technical Specification (TS) [2] и имеет высокие шансы стать стандартом.
Статья [3] демонстрирует использование Boost.Asio с Coroutines TS и boost: future. В своей статье я хочу показать, как можно обойтись без boost: future. Мы возьмем за основу пример асинхронного TCP эхо-сервера из Boost.Asio и будем его модифицировать, используя сопрограммы из Coroutines TS.
На момент написания статьи Coroutines TS реализована в компиляторах Visual C++ 2017 и clang 5.0. Мы будем использовать clang. Необходимо установить флаги компилятора для включения экспериментальной поддержки стандарта C++ 20 (-std=c++2a) и Coroutines TS (-fcoroutines-ts). Также нужно включить заголовочный файл
Сопрограмма для чтения из сокета
В оригинальном примере функция для чтения из сокета выглядит так:
void do_read() {
auto self(shared_from_this());
socket_.async_read_some(
boost::asio::buffer(data_, max_length),
[this, self](boost::system::error_code ec, std::size_t length) {
if (!ec) {
do_write(length);
}
});
}
Мы инициируем асинхронное чтение из сокета и задаем коллбек, который будет вызван при получении данных и инициирует их отсылку обратно. Функция записи в оригинале выглядит так:
void do_write(std::size_t length) {
auto self(shared_from_this());
boost::asio::async_write(
socket_, boost::asio::buffer(data_, length),
[this, self](boost::system::error_code ec, std::size_t /*length*/) {
if (!ec) {
do_read();
}
});
}
При успешной записи данных в сокет мы снова инициируем асинхронное чтение. По сути, логика программы сводится к циклу (псевдокод):
while (!ec)
{
ec = read(buffer);
if (!ec)
{
ec = write(buffer);
}
}
Было бы удобно закодировать это в виде явного цикла, однако в таком случае нам пришлось бы сделать чтение и запись синхронными операциями. Нам это не подходит, поскольку мы хотим обслуживать несколько клиентских сессий в одном потоке выполнения одновременно. На помощь приходят сопрограммы. Перепишем функцию do_read () в следующем виде:
void do_read() {
auto self(shared_from_this());
const auto[ec, length] = co_await async_read_some(
socket_, boost::asio::buffer(data_, max_length));
if (!ec) {
do_write(length);
}
}
Использование ключевого слова co_await (а также co_yield и co_return) превращает функцию в сопрограмму. Такая функция имеет несколько точек (suspension point), где ее выполнение приостанавливается (suspend) с сохранением состояния (значений локальных переменных). Позже выполнение сопрограммы может быть возобновлено (resume), начиная с последней остановки. Ключевое слово co_await в нашей функции создает suspension point: после того, как асинхронное чтение инициировано, выполнение сопрограммы do_read () будет приостановлено до завершения чтения. Возврата из функции при этом не происходит, но выполнение программы продолжается, начиная с точки вызова сопрограммы. Когда клиент подключается, вызывается session: start (), где do_read () вызывается первый раз для этой сессии. После начала асинхронного чтения продолжается выполнение функции start (), происходит возврат из нее и инициируется прием следующего соединения. Далее продолжает выполнение код из Asio, который вызвал обработчик-аргумент async_accept ().
Для того, чтобы магия co_await работала, его выражение — в нашем случае функция async_read_some () — должен возвращать объект класса, который соответствует определенному контракту. Реализация async_read_some () взята из комментария к статье [3].
template
auto async_read_some(SyncReadStream &s, DynamicBuffer &&buffers) {
struct Awaiter {
SyncReadStream &s;
DynamicBuffer &&buffers;
std::error_code ec;
size_t sz;
bool await_ready() { return false; }
void await_suspend(std::experimental::coroutine_handle<> coro) {
s.async_read_some(std::move(buffers),
[this, coro](auto ec, auto sz) mutable {
this->ec = ec;
this->sz = sz;
coro.resume();
});
}
auto await_resume() { return std::make_pair(ec, sz); }
};
return Awaiter{s, std::forward(buffers)};
}
async_read_some () возвращает объект класса Awaiter, который реализует контракт, требуемый co_await:
- await_ready () вызывается в начале ожидания для проверки, готов ли уже результат асинхронной операции. Поскольку для получения результата нам всегда нужно дождаться, пока данные будут прочитаны, мы возвращаем false.
- await_suspend () вызывается перед тем, как вызывающая сопрограмма будет приостановлена. Здесь мы инициируем асинхронное чтение и передаем обработчик, который сохранит результаты выполнения асинхронной операции в переменных-членах класса Awaiter и возобновит сопрограмму.
- await_resume () — возвращаемое значение этой функции будет результатом выполнения co_await. Просто возвращаем сохраненные ранее результаты выполнения асинхронной операции.
Если теперь попытаться собрать нашу программу, то получим ошибку компиляции:
error: this function cannot be a coroutine: 'std::experimental::coroutines_v1::coroutine_traits' has no member named 'promise_type'
void do_read() {
^
Причина в том, что компилятор требует, чтобы для сопрограммы тоже был реализован некоторый контракт. Это делается с помощью специализации шаблона std: experimental: coroutine_traits:
template
struct std::experimental::coroutine_traits {
struct promise_type {
void get_return_object() {}
std::experimental::suspend_never initial_suspend() { return {}; }
std::experimental::suspend_never final_suspend() { return {}; }
void return_void() {}
void unhandled_exception() { std::terminate(); }
};
};
Мы специализировали coroutine_traits для сопрограмм с возращаемым значением типа void и любым количеством и типами параметров. Сопрограмма do_read () подходит под это описание. Специализация шаблона содержит тип promise_type с следующими функциями:
- get_return_object () вызывается для создания объекта, который сопрограмма будет впоследствии заполнять и возвращать. В нашем случае ничего создавать не нужно, так как do_read () ничего не возвращает.
- initial_suspend () определяет, будет ли сопрограмма приостановлена перед первым вызовом. Аналогия — запуск приостановленного потока в Windows. Нам нужно, чтобы do_read () выполнялась без начальной остановки, поэтому возвращаем suspend_never.
- final_suspend () определяет, будет ли сопрограмма приостановлена перед возвратом значения и завершением. Возвращаем suspend_never.
- return_void () указывает компилятору, что сопрограмма ничего не возвращает.
- unhandled_exception () вызывается, если внутри сопрограммы было сгенерировано исключение, и оно не было обработано внутри сопрограммы. В этом случае аварийно завершаем программу.
Теперь можно запустить сервер и проверить его работоспособность, открыв несколько подключений с помощью telnet.
Сопрограмма для записи в сокет
Функция записи do_write () все еще основывается на использовании коллбека. Исправим это. Перепишем do_write () в следующем виде:
auto do_write(std::size_t length) {
auto self(shared_from_this());
struct Awaiter {
std::shared_ptr ssn;
std::size_t length;
std::error_code ec;
bool await_ready() { return false; }
auto await_resume() { return ec; }
void await_suspend(std::experimental::coroutine_handle<> coro) {
const auto[ec, sz] = co_await async_write(
ssn->socket_, boost::asio::buffer(ssn->data_, length));
this->ec = ec;
coro.resume();
}
};
return Awaiter{self, length};
}
Напишем awaitable-обертку для записи в сокет:
template
auto async_write(SyncReadStream &s, DynamicBuffer &&buffers) {
struct Awaiter {
SyncReadStream &s;
DynamicBuffer &&buffers;
std::error_code ec;
size_t sz;
bool await_ready() { return false; }
auto await_resume() { return std::make_pair(ec, sz); }
void await_suspend(std::experimental::coroutine_handle<> coro) {
boost::asio::async_write(
s, std::move(buffers), [this, coro](auto ec, auto sz) mutable {
this->ec = ec;
this->sz = sz;
coro.resume();
});
}
};
return Awaiter{s, std::forward(buffers)};
}
Последний шаг — перепишем do_read () в виде явного цикла:
void do_read() {
auto self(shared_from_this());
while (true) {
const auto[ec, sz] = co_await async_read_some(
socket_, boost::asio::buffer(data_, max_length));
if (!ec) {
auto ec = co_await do_write(sz);
if (ec) {
std::cout << "Error writing to socket: " << ec << std::endl;
break;
}
} else {
std::cout << "Error reading from socket: " << ec << std::endl;
break;
}
}
}
Логика программы теперь записана в виде, близком к синхронному коду, однако она выполняется асинхронно. Ложкой дёгтя является то, что нам пришлось написать дополнительный awaitable-класс для возвращаемого значения do_write (). Это иллюстрирует один из недостатков Coroutines TS — распространение co_await вверх по стеку асинхронных вызовов [4].
Переделку функции server: do_accept () в сопрограмму оставим в качестве упражнения. Полный текст программы можно найти на GitHub.
Заключение
Мы рассмотрели использование Boost.Asio с Coroutines TS для программирования асинхронных сетевых приложений. Преимущество такого подхода — улучшение читабельности кода, поскольку он становится близок по форме к синхронному. Недостаток — необходимость в написании дополнительных оберток для поддержки модели сопрограмм, реализованной в Coroutines TS.
Ссылки
- Асинхронность: назад в будущее
- Working Draft, Technical Specification for C++ Extensions for Coroutines
- Using C++ Coroutines with Boost C++ Libraries
- Возражения против принятия Coroutines с await в C++17