[Из песочницы] Использование Boost.Asio с Coroutines TS

habr.png

Введение

Использование функций обратного вызова (callback) — популярный подход к построению сетевых приложений с использованием библиотеки Boost.Asio (и не только ее). Проблемой этого подхода является ухудшение читабельности и поддерживаемости кода при усложнении логики протокола обмена данными [1].


Как альтернатива коллбекам, сопрограммы (coroutines) можно применить для написания асинхронного кода, уровень читабельности которого будет близок к читабельности синхронного кода. Boost.Asio поддерживает такой подход, предоставляя возможность использования библиотеки Boost.Coroutine для обработки коллбеков.


Boost.Coroutine реализует сопрограммы с помощью сохранения контекста выполнения текущего потока. Этот подход конкурировал за включение в следующую редакцию стандарта C++ с предложением от Microsoft, которое вводит новые ключевые слова co_return, co_yield и co_await. Предложение Microsoft получило статус Technical Specification (TS) [2] и имеет высокие шансы стать стандартом.


Статья [3] демонстрирует использование Boost.Asio с Coroutines TS и boost: future. В своей статье я хочу показать, как можно обойтись без boost: future. Мы возьмем за основу пример асинхронного TCP эхо-сервера из Boost.Asio и будем его модифицировать, используя сопрограммы из Coroutines TS.



На момент написания статьи Coroutines TS реализована в компиляторах Visual C++ 2017 и clang 5.0. Мы будем использовать clang. Необходимо установить флаги компилятора для включения экспериментальной поддержки стандарта C++ 20 (-std=c++2a) и Coroutines TS (-fcoroutines-ts). Также нужно включить заголовочный файл .

Сопрограмма для чтения из сокета

В оригинальном примере функция для чтения из сокета выглядит так:


void do_read() {
    auto self(shared_from_this());
    socket_.async_read_some(
        boost::asio::buffer(data_, max_length),
        [this, self](boost::system::error_code ec, std::size_t length) {
            if (!ec) {
                do_write(length);
            }
        });
}


Мы инициируем асинхронное чтение из сокета и задаем коллбек, который будет вызван при получении данных и инициирует их отсылку обратно. Функция записи в оригинале выглядит так:


void do_write(std::size_t length) {
    auto self(shared_from_this());
    boost::asio::async_write(
        socket_, boost::asio::buffer(data_, length),
        [this, self](boost::system::error_code ec, std::size_t /*length*/) {
            if (!ec) {
                do_read();
            }
        });
}


При успешной записи данных в сокет мы снова инициируем асинхронное чтение. По сути, логика программы сводится к циклу (псевдокод):

while (!ec)
{
	ec = read(buffer);
	if (!ec)
	{
		ec = write(buffer);
	}
}


Было бы удобно закодировать это в виде явного цикла, однако в таком случае нам пришлось бы сделать чтение и запись синхронными операциями. Нам это не подходит, поскольку мы хотим обслуживать несколько клиентских сессий в одном потоке выполнения одновременно. На помощь приходят сопрограммы. Перепишем функцию do_read () в следующем виде:

void do_read() {
    auto self(shared_from_this());
    const auto[ec, length] = co_await async_read_some(
        socket_, boost::asio::buffer(data_, max_length));

    if (!ec) {
        do_write(length);
    }
}


Использование ключевого слова co_await (а также co_yield и co_return) превращает функцию в сопрограмму. Такая функция имеет несколько точек (suspension point), где ее выполнение приостанавливается (suspend) с сохранением состояния (значений локальных переменных). Позже выполнение сопрограммы может быть возобновлено (resume), начиная с последней остановки. Ключевое слово co_await в нашей функции создает suspension point: после того, как асинхронное чтение инициировано, выполнение сопрограммы do_read () будет приостановлено до завершения чтения. Возврата из функции при этом не происходит, но выполнение программы продолжается, начиная с точки вызова сопрограммы. Когда клиент подключается, вызывается session: start (), где do_read () вызывается первый раз для этой сессии. После начала асинхронного чтения продолжается выполнение функции start (), происходит возврат из нее и инициируется прием следующего соединения. Далее продолжает выполнение код из Asio, который вызвал обработчик-аргумент async_accept ().


Для того, чтобы магия co_await работала, его выражение — в нашем случае функция async_read_some () — должен возвращать объект класса, который соответствует определенному контракту. Реализация async_read_some () взята из комментария к статье [3].

template 
auto async_read_some(SyncReadStream &s, DynamicBuffer &&buffers) {
    struct Awaiter {
        SyncReadStream &s;
        DynamicBuffer &&buffers;

        std::error_code ec;
        size_t sz;

        bool await_ready() { return false; }
        void await_suspend(std::experimental::coroutine_handle<> coro) {
            s.async_read_some(std::move(buffers),
                              [this, coro](auto ec, auto sz) mutable {
                                  this->ec = ec;
                                  this->sz = sz;
                                  coro.resume();
                              });
        }
        auto await_resume() { return std::make_pair(ec, sz); }
    };
    return Awaiter{s, std::forward(buffers)};
}


async_read_some () возвращает объект класса Awaiter, который реализует контракт, требуемый co_await:


  • await_ready () вызывается в начале ожидания для проверки, готов ли уже результат асинхронной операции. Поскольку для получения результата нам всегда нужно дождаться, пока данные будут прочитаны, мы возвращаем false.
  • await_suspend () вызывается перед тем, как вызывающая сопрограмма будет приостановлена. Здесь мы инициируем асинхронное чтение и передаем обработчик, который сохранит результаты выполнения асинхронной операции в переменных-членах класса Awaiter и возобновит сопрограмму.
  • await_resume () — возвращаемое значение этой функции будет результатом выполнения co_await. Просто возвращаем сохраненные ранее результаты выполнения асинхронной операции.


Если теперь попытаться собрать нашу программу, то получим ошибку компиляции:

error: this function cannot be a coroutine: 'std::experimental::coroutines_v1::coroutine_traits' has no member named 'promise_type'
    void do_read() {
         ^


Причина в том, что компилятор требует, чтобы для сопрограммы тоже был реализован некоторый контракт. Это делается с помощью специализации шаблона std: experimental: coroutine_traits:

template 
struct std::experimental::coroutine_traits {
    struct promise_type {
        void get_return_object() {}
        std::experimental::suspend_never initial_suspend() { return {}; }
        std::experimental::suspend_never final_suspend() { return {}; }
        void return_void() {}
        void unhandled_exception() { std::terminate(); }
    };
};


Мы специализировали coroutine_traits для сопрограмм с возращаемым значением типа void и любым количеством и типами параметров. Сопрограмма do_read () подходит под это описание. Специализация шаблона содержит тип promise_type с следующими функциями:


  • get_return_object () вызывается для создания объекта, который сопрограмма будет впоследствии заполнять и возвращать. В нашем случае ничего создавать не нужно, так как do_read () ничего не возвращает.
  • initial_suspend () определяет, будет ли сопрограмма приостановлена перед первым вызовом. Аналогия — запуск приостановленного потока в Windows. Нам нужно, чтобы do_read () выполнялась без начальной остановки, поэтому возвращаем suspend_never.
  • final_suspend () определяет, будет ли сопрограмма приостановлена перед возвратом значения и завершением. Возвращаем suspend_never.
  • return_void () указывает компилятору, что сопрограмма ничего не возвращает.
  • unhandled_exception () вызывается, если внутри сопрограммы было сгенерировано исключение, и оно не было обработано внутри сопрограммы. В этом случае аварийно завершаем программу.


Теперь можно запустить сервер и проверить его работоспособность, открыв несколько подключений с помощью telnet.

Сопрограмма для записи в сокет

Функция записи do_write () все еще основывается на использовании коллбека. Исправим это. Перепишем do_write () в следующем виде:

auto do_write(std::size_t length) {
    auto self(shared_from_this());
    struct Awaiter {
        std::shared_ptr ssn;
        std::size_t length;
        std::error_code ec;

        bool await_ready() { return false; }
        auto await_resume() { return ec; }
        void await_suspend(std::experimental::coroutine_handle<> coro) {
            const auto[ec, sz] = co_await async_write(
                ssn->socket_, boost::asio::buffer(ssn->data_, length));
            this->ec = ec;
            coro.resume();
        }
    };
    return Awaiter{self, length};
}


Напишем awaitable-обертку для записи в сокет:

template 
auto async_write(SyncReadStream &s, DynamicBuffer &&buffers) {
    struct Awaiter {
        SyncReadStream &s;
        DynamicBuffer &&buffers;

        std::error_code ec;
        size_t sz;

        bool await_ready() { return false; }
        auto await_resume() { return std::make_pair(ec, sz); }
        void await_suspend(std::experimental::coroutine_handle<> coro) {
            boost::asio::async_write(
                s, std::move(buffers), [this, coro](auto ec, auto sz) mutable {
                    this->ec = ec;
                    this->sz = sz;
                    coro.resume();
                });
        }
    };
    return Awaiter{s, std::forward(buffers)};
}


Последний шаг — перепишем do_read () в виде явного цикла:

void do_read() {
    auto self(shared_from_this());
    while (true) {
        const auto[ec, sz] = co_await async_read_some(
            socket_, boost::asio::buffer(data_, max_length));
        if (!ec) {
            auto ec = co_await do_write(sz);
            if (ec) {
                std::cout << "Error writing to socket: " << ec << std::endl;
                break;
            }
        } else {
            std::cout << "Error reading from socket: " << ec << std::endl;
            break;
        }
    }
}


Логика программы теперь записана в виде, близком к синхронному коду, однако она выполняется асинхронно. Ложкой дёгтя является то, что нам пришлось написать дополнительный awaitable-класс для возвращаемого значения do_write (). Это иллюстрирует один из недостатков Coroutines TS — распространение co_await вверх по стеку асинхронных вызовов [4].


Переделку функции server: do_accept () в сопрограмму оставим в качестве упражнения. Полный текст программы можно найти на GitHub.

Заключение

Мы рассмотрели использование Boost.Asio с Coroutines TS для программирования асинхронных сетевых приложений. Преимущество такого подхода — улучшение читабельности кода, поскольку он становится близок по форме к синхронному. Недостаток — необходимость в написании дополнительных оберток для поддержки модели сопрограмм, реализованной в Coroutines TS.

Ссылки

  1. Асинхронность: назад в будущее
  2. Working Draft, Technical Specification for C++ Extensions for Coroutines
  3. Using C++ Coroutines with Boost C++ Libraries
  4. Возражения против принятия Coroutines с await в C++17

© Habrahabr.ru