Асинхронность 2: телепортация сквозь порталы
Не прошло и года, как я добрался до продолжения статьи про асинхронность. Эта статья развивает идеи той, самой первой статьи про асинхронность [1]. В ней обсуждается достаточно сложная задача, на примере которой будет раскрыта мощь и гибкость использования сопрограмм в различных нетривиальных сценариях. В заключение будут рассмотрены две задачи на состояние гонки (race-condition), а также небольшой, но очень приятный бонус.За всё это время первая статья уже выбилась в поисковый топ.
Итак, поехали!
Задача Изначальная формулировка незамысловата и звучит так: Получить тяжелый объект по сети и передать его в UI.Будем усложнять задачу, добавив «интересные» требования на UI:
Действие порождается из UI-потока через какое-либо событие. Результат нужно возвратить обратно в UI. Мы не хотим блокировать UI, поэтому операцию необходимо производить асинхронно. Добавим «веселые» условия на получение объекта: Операции с сетью медленные, поэтому объект будем кешировать. Хочется иметь персистентный кеш, чтобы после рестарта объекты сохранялись. Персистентное устройство медленное, поэтому для более быстрой отдачи объектов будем дополнительно кешировать их в памяти. Займемся аспектами производительности: Хочется иметь параллельную, а не последовательную запись в кеши (персистентное хранилище и память). Чтение из кешей должно быть также параллельным, при этом если значение найдено в одном из кешей, то сразу использовать его, не дожидаясь ответа от другого кеша. Сетевые операции не должны никоим образом интерферировать с кешами, то есть если, например, кеши тупят, то это не должно сказываться на сетевых взаимодействиях. Хочется поддерживать большое количество соединений в ограниченном количестве потоков, то есть хочется асинхронного сетевого взаимодействия для более бережного отношения к ресурсам. Усугубим логикой: Нам потребуется отмена операций. При этом если мы получили-таки по сети наш объект, то дальше отмена не должна применяться на последующие операции по обновлению кеша, то есть необходимо реализовать «отмену отмены» на некоторую совокупность действий. Если кому-то показалось недостаточно хардкорно, то добавим еще требований: Необходимо реализовать таймауты на операции. Причем таймауты должны быть как на всю операцию, так и на некоторые части. Например: таймаут на все сетевое взаимодействие: соединение, запрос, ответ; таймаут на всю операцию, включая сетевое взаимодействие и работу с кешами. Планировщики операций могут быть как свои собственные, так и инородные (например, планировщик в UI-потоке). Никакие операции не должны блокировать потоки. Это означает, что запрещено использование мьютексов и других средств синхронизации, так как они будут блокировать наши потоки. Вот теперь достаточно. Если у кого-то сразу появился ответ, как это сделать, то я с удовольствием познакомлюсь с таким решением. Ну, а ниже я предлагаю свое решение: понятно, что в нем акцент будет делаться не на реализацию, например, кешей и персистентности, а на конкретное параллельное и асинхронное взаимодействие с учетом требования к блокировкам и планировщикам.
Решение Для решения будем использовать следующую модель. Опишу суть происходящего:
UI, Mem Cache, Disk Cache, Network суть объекты, которые выполняют соответствующие операции над нашим свежесозданным обработчиком Handler.
Handler выполняет нехитрую последовательность: Параллельно запускает операцию получения данных из кешей объектов Mem Cache и Disk Cache. В случае успеха, то есть при получении ответа с найденным результатом хотя бы из одного кеша, сразу возвращает результат. А в случае неуспеха (как на диаграмме) выполнение продолжается.
После ожидания отсутствия результата от обоих кешей Handler обращается к Network для получения объекта по сети. Для этого происходит подключение к сервису (connect), отправка запроса (send) и получение ответа (receive). Такие операции выполняются асинхронно и не блокируют другие сетевые взаимодействия.
Полученный от компонента Network объект записывается параллельно в оба кеша.
После ожидания завершения записи в кеши происходит возврат значения в UI-поток.
В программе присутствуют следующие планировщики и ассоциированные с ними объекты: UI-поток, который инициирует асинхронную операцию Handler и в который должен вернуться результат;
общий пул потоков, в котором выполняются все основные операции, включая Mem Cache и Disk Cache;
сетевой пул потоков для Network. Создается отдельно от основного пула потоков для того, чтобы загруженность основного пула не влияла на сетевой пул потоков.
Как я уже писал ранее, объекты будем реализовывать простейшим способом, так как для аспектов асинхронности это не имеет особого значения:
// stub: дисковый кеш
struct DiskCache
{
boost: optional
// кеш в памяти: хеш-таблица
struct MemCache
{
boost: optional
void set (const std: string& key, const std: string& val)
{
map[key] = val;
}
private:
std: unordered_map
struct Network { // … // получение объекта по сети std: string get (const std: string& key) { net: Socket socket; JLOG («connecting»); socket.connect (address, port); // первый байт — размер строки Buffer sz (1, char (key.size ())); socket.write (sz); // затем — строка socket.write (key); // получаем размер результата socket.read (sz); Buffer val (size_t (sz[0]), 0); // получаем сам результат socket.read (val); JLOG («val received»); return val; }
private: std: string address; int port; // … };
// UI-объект: взаимодействие с UI struct UI: IScheduler { void schedule (Handler handler) { // запуск операции в UI-потоке // … }
void handleResult (const std: string& key, const std: string& val) { TLOG («UI result inside UI thread:» << key << ";" << val); // TODO: add some actions } }; Как правило, все UI-фрейворки содержат метод, который позволяет запускать в UI-потоке необходимые действия (например, в Android: Activity.runOnUiThread, Ultimate++: PostCallback, Qt: через signal-slot механизм). Эти методы и должны быть использованы в реализации метода UI::schedule.Инициализация всего хозяйства происходит в императивном стиле:
// создаем пул потоков для общих действий ThreadPool cpu (3, «cpu»); // создаем пул потоков для сетевых действий ThreadPool net (2, «net»);
// планировщик для сериализации действий с диском Alone diskStorage (cpu, «disk storage»); // планировщик для сериализации действий с памятью Alone memStorage (cpu, «mem storage»);
// задание планировщика по умолчанию
scheduler
// привязка дискового портала к дисковому планировщику
portal
UI& ui = single
Это дает нам ожидание без блокировки потока: во время вызова goWait мы как бы ставим наши операции в текущей сопрограмме на паузу, а когда завершится переданный обработчик, продолжим исполнение как ни в чем не бывало.
goWait: запуск нескольких асинхронных операций и ожидание их завершения
Теперь реализуем функцию, которая запускает целую пачку асинхронных операций и ожидает их завершения:
void goWait (std: initializer_list
void goWait (std: initializer_list
Пример: рекурсивно-параллельные числа Фибоначчи Для иллюстрации использования рассмотрим следующий пример. Предположим, на нас нашла блажь и нам захотелось посчитать ряд Фибоначчи рекурсивно и параллельно. Нет проблем: int fibo (int v) { if (v < 2) return v; int v1, v2; goWait({ [v, &v1] { v1 = fibo(v-1); }, [v, &v2] { v2 = fibo(v-2); } }); return v1 + v2; } Отмечу, что здесь никогда не будет переполнения стека: каждый вызов функции fibo происходит в своей собственной сопрограмме.Waiter: запуск нескольких асинхронных операций и ожидание их завершения Часто нам необходимо не просто дождаться фиксированного набора обработчиков, но и между делом сделать что-то полезное и только потом дождаться. Иногда мы вообще не знаем, какое количество обработчиков может понадобиться, то есть мы их создаем по ходу выполнения наших операций. Фактически нам необходимо оперировать с группой обработчиков как единым целым. Для этого можно использовать примитив Waiter со следующим интерфейсом: struct Waiter { Waiter& go(Handler); void wait(); }; Тут всего два метода:go: запустить еще один обработчик; wait: дождаться всех запущенных обработчиков. Запускать вышеуказанные методы можно несколько раз за все время жизни объекта Waiter.Идея реализации ровно такая же: необходимо иметь proceeder, который продолжал бы работу нашей сопрограммы. Однако добавляется небольшая тонкость: теперь proceeder разделяется между запущенными сопрограммами и объектом Waiter. Соответственно, в момент вызова метода wait нам необходимо избавиться от копии в самом Waiter. Вот как это можно сделать:
void Waiter: wait ()
{
if (proceeder.unique ())
{
// только Waiter владеет proceeder =>
JLOG («everything done, nothing to do»);
return;
}
defer ([this] {
// перемещаем proceeder в область вне сопрограммы
auto toDestroy = std: move (proceeder);
// разделяемый proceeder удалится либо здесь,
// либо в какой-либо сопрограмме обработчика
});
// proceeder в этот момент был удален,
// восстановим его снова для последующего использования
init0();
}
И опять ничего не нужно делать! Спасибо за это shared_ptr. Аминь!
Пример: рекурсивно-параллельные числа Фибоначчи
Для закрепления материала рассмотрим альтернативную реализацию нашей блажи с использованием Waiter:
int fibo (int v)
{
if (v < 2)
return v;
int v1;
Waiter w;
w.go([v, &v1] { v1 = fibo(v-1); });
int v2 = fibo(v-2);
w.wait();
return v1 + v2;
}
Еще вариант:
int fibo (int v)
{
if (v < 2)
return v;
int v1, v2;
Waiter()
.go([v, &v1] { v1 = fibo (v-1); })
.go([v, &v2] { v2 = fibo (v-2); })
.wait();
return v1 + v2;
}
Выбирай не хочу.goAnyWait: запуск нескольких асинхронных операций и ожидание завершения хотя бы одной
По-прежнему будем запускать несколько операций одновременно. Но ожидать мы будем ровно до тех пор, пока не завершится хотя бы одна операция:
size_t goAnyWait(std::initializer_list
size_t goAnyWait (std: initializer_list
size_t index = static_cast
template
Result result;
deferProceed ([&handlers, &result](Handler proceed) {
std: shared_ptr
for (const auto& handler: handlers) { go ([counter, &handler] { Result result = handler (); if (result) // пытаемся продолжить только при наличии результата counter→tryProceed (std: move (result)); }); } }); return result; } Интрига тут состоит в том, что std: move перемещает результат только тогда, когда условие внутри tryProceed выполняется. А все потому, что std: move не выполняет перемещение как таковое, как бы кому-то этого ни хотелось. Это всего лишь cast-операция над ссылками.С ожиданиями разобрались, переходим к планировщикам и пулам потоков.
Планировщик, пулы, синхронизация Интерфейс планировщика После рассмотрения, так сказать, базовых основ, переходим к десерту.Введем интерфейс планировщика: struct IScheduler: IObject { virtual void schedule (Handler handler) = 0; }; Его задача — исполнять обработчики. Обратите внимание, что у интерфейса планировщика нет ни отмен, ни таймаутов, ни отложенных операций. Интерфейс планировщика должен быть кристально чистым, чтобы его можно было легко состыковать с различными фреймворками (ср. с [2]: тут тебе и отмена, и акторы, и задержки, будет очень удобно скрещивать с UI-планировщиками). Пул потоков Нам потребуется пул потоков для выполнения различных действий, реализующий интерфейс планировщика: typedef boost: asio: io_service IoService; struct IService: IObject { virtual IoService& ioService () = 0; };
struct ThreadPool: IScheduler, IService { ThreadPool (size_t threadCount); void schedule (Handler handler) { service.post (std: move (handler)); }
private: IoService& ioService ();
std: unique_ptr
Класс сопрограммы После введения пула потоков и планировщика настала очередь ввести класс для манипуляций над сопрограммами. Называться он будет, как это ни странно, Journey (почему так, будет ясно позже): struct Journey { void proceed (); Handler proceedHandler (); void defer (Handler handler); void deferProceed (ProceedHandler proceed); static void create (Handler handler, mt: IScheduler& s);
private: Journey (mt: IScheduler& s);
struct CoroGuard { CoroGuard (Journey& j_) : j (j_) { j.onEnter0(); } ~CoroGuard () { j.onExit0(); } coro: Coro* operator→() { return &j.coro; } private: Journey& j; }; void start0(Handler handler); void schedule0(Handler handler); CoroGuard guardedCoro0(); void proceed0(); void onEnter0(); void onExit0(); mt: IScheduler* sched; coro: Coro coro; Handler deferHandler; }; Что тут бросается в глаза? Приватный конструктор. Он вызывается статическим публичным методом create. Journey содержит внутри себя указатель на планировщик sched, саму сопрограмму coro и deferHandler-обработчик, который вызывается внутри defer. CoroGuard — прокси-класс, который при каждой операции над сопрограммой автоматически выполняет действия onEnter0 при входе в нее и onExit0 при выходе. Чтобы понять, как это работает, посмотрим на реализацию нескольких простых методов: void Journey: schedule0(Handler handler) { VERIFY (sched!= nullptr, «Scheduler must be set in journey»); sched→schedule (std: move (handler)); }
void Journey: proceed0() { // используем защитник для продолжения сопрограммы guardedCoro0()→resume (); }
Journey: CoroGuard Journey: guardedCoro0() { return CoroGuard (*this); }
// возврат в сопрограмму можно делать только с использованием планировщика void Journey: proceed () { schedule0([this] { proceed0(); }); }
// тот самый обработчик, который возвращает управление сопрограмме Handler Journey: proceedHandler () { return [this] { proceed (); }; }
// запуск новой сопрограммы // см. также задачу 1 void Journey: start0(Handler handler) { schedule0([handler, this] { // снова используем защитник guardedCoro0()→start ([handler] { JLOG («started»); // не забывает про исключения try { handler (); } catch (std: exception& e) { (void) e; JLOG («exception in coro:» << e.what()); } JLOG("ended"); }); }); } Давайте теперь разберем работу defer: void Journey::defer(Handler handler) { // запоминаем обработчик deferHandler = handler; // и выходим из текущей сопрограммы coro::yield(); }
// deferProceed, используемый ранее void Journey: deferProceed (ProceedHandler proceed) { defer ([this, proceed] { proceed (proceedHandler ()); }); } Все просто! Осталось понять, где же запускаются наши отложенные обработчики deferHandler. TLS Journey* t_journey = nullptr;
void Journey: onEnter0() { t_journey = this; }
// см. также задачу 2 void Journey: onExit0() { if (deferHandler == nullptr) { // нет обработчика => действия завершены, можно самоликвидироваться delete this; } else { // в противном случае выполняем отложенное действие deferHandler (); deferHandler = nullptr; } // восстанавливаем значение, так как теперь находимся вне сопрограммы t_journey = nullptr; } Ну и напоследок рассмотрим реализацию статической функции create: void Journey: create (Handler handler, mt: IScheduler& s) { (new Journey (s))→start0(std: move (handler)); } Стоит отметить, что пользователь не имеет никакой возможности явного создания Journey, то есть он вообще не подозревает о том, что есть такой класс. Но об этом чуть позже, а сейчас…Телепортация Наконец переходим к клубничке! Телепортация… Речь пойдет про примитив, который возможно реализовать только с использованием сопрограмм. А этот примитив настолько мощный и настолько простой, что стоит на нем остановиться поподробнее и посмаковать. Ведь это ж клубничка! Проще всего начать обсуждение с реализации:
void Journey: teleport (mt: IScheduler& s) { if (&s == sched) { JLOG («the same destination, skipping teleport <-> » << s.name()); return; } JLOG("teleport " << sched->name () << " -> » << s.name()); sched = &s; defer(proceedHandler()); } Тут делаются две вещи:Проверяется, отличается ли планировщик сопрограммы от того планировщика, который подали на вход метода. Если он совпадает с ним, то ничего делать не надо, планировщик и так нужный. Если отличается, то происходит смена планировщика сопрограммы и перевхождение в сопрограмму: defer выполняет функцию, которая приводит к выходу из сопрограммы и запуску обработчика для скорейшего продолжения сопрограммы. Однако для возврата будет использоваться новый планировщик, поэтому вход в сопрограмму произойдет уже в новом пуле потоков. Схема ниже поясняет процесс переключения исполнения сопрограммы с Scheduler/Thread на Scheduler2/Thread2:
Что это нам дает? На самом деле это дает переключение между пулами потоков, а также, вообще говоря, между планировщиками. В частности, можно переключаться между UI-потоком и потоками вычислений, чтобы UI, что называется, не тупил:
auto result = someCalculations (); teleport (uiScheduler); showResult (result); teleport (calcScheduler); auto newResult = continueSmartCalculations (result); teleport (uiScheduler); updateResult (newResult); //… То есть, для того чтобы сходить в UI, нужно всего лишь телепортироваться в нужный поток, и все будет потокобезопасно в соответствии с требованиями разработки UI-приложений. Тот же самый прием можно применить, если нам нужно сходить, например, в сетевой пул потоков или пул потоков базы данных — в общем, в любое место, где можно воспользоваться интерфейсом планировщика.Порталы Теперь можно переходить к вишенке. Я бы даже сказал, к сладкой и сочной черешенке. Как вы уже успели заметить, для обновления состояния UI-приложения нам потребовалось сделать телепортацию сначала в UI-планировщик, а затем обратно. Для того чтобы каждый раз этого не делать, создадим портал, который произведет обратное возвращение автоматически, как только необходимое действие завершится.
struct Portal { Portal (mt: IScheduler& destination) : source (journey ().scheduler ()) { JLOG («creating portal » << source.name() << " <=> » << destination.name()); teleport(destination); } ~Portal() { teleport(source); }
private: mt: IScheduler& source; }; То есть в конструкторе мы запоминаем источник (текущий планировщик сопрограммы), а затем телепортируем сопрограмму в известное направление. В деструкторе происходит телепортация в исходный планировщик.Благодаря такой RAII-идиоме не нужно беспокоиться о том, что мы внезапно можем оказаться не там, где ожидалось (например, не будем делать тяжелые вычисления в UI-потоке либо в сетевом пуле потоков), все будет делаться автоматически.
Давайте рассмотрим пример:
ThreadPool tp1(1, «tp1»); ThreadPool tp2(1, «tp2»);
go ([&tp2] { Portal p (tp2); JLOG («throwing exception»); throw std: runtime_error («exception occur»); }, tp1); Сопрограмма стартует в tp1, затем создается портал и происходит переключение в tp2. После генерации исключения вызывается деструктор портала, который фактически замораживает раскрутку исключений, телепортирует в tp1 и продолжает сопрограмму, которая продолжит раскрутку исключения уже в другом потоке. Бесплатно и без СМС! Для того чтобы еще более усугубить использование порталов (хотя, казалось бы, куда уж), навернем:
struct Scheduler { Scheduler (); void attach (mt: IScheduler& s) { scheduler = &s; }
void detach () { scheduler = nullptr; }
operator mt: IScheduler&() const { VERIFY (scheduler!= nullptr, «Scheduler is not attached»); return *scheduler; }
private: mt: IScheduler* scheduler; };
struct DefaultTag;
template
template
template
struct X { void op () {} };
portal
Неблокирующие мьютексы Для работы с разделяемыми ресурсами часто применяют мьютексы. Оно и понятно: такой примитив просто использовать и в большинстве случаев он себя оправдывает.Но что происходит с мьютексом, когда кто-то уже захватил ресурс? В этом случае происходит ожидание на мьютексе до тех пор, пока ресурс не освободится. При этом поток блокируется и перестает выполнять полезную работу.
А чего бы нам хотелось? С точки зрения производительности нам бы хотелось, чтобы потоки были задействованы чуть более чем полностью, а не отвлекались на ожидания. «Будет исполнено», — ответила сопрограмма и самодовольно ухмыльнулась.
Реализовать неблокирующие мьютексы с использованием сопрограмм можно различными способами. Я применю элегантный способ, позволяющий повторно использовать уже имеющийся функционал с минимальным количеством добавлений. Для этого создадим новый планировщик:
struct Alone: mt: IScheduler { Alone (mt: IService& service);
void schedule (Handler handler) { strand.post (std: move (handler)); }
private: boost: asio: io_service: strand strand; }; В конструкторе класса Alone в качестве входного параметра используется интерфейс IService, который позволяет нам корректно инициализировать io_service: strand из boost.asio. Фактически это еще один планировщик boost.asio, который гарантирует, что в одно и то же время будет запущено не более одного обработчика. Это как раз и соответствует нашим представлениям о том, что такое мьютекс (mutual exclusion).Так как идиома Alone реализует интерфейс планировщика, мы без зазрения совести можем использовать всю мощь наших телепортаций и порталов, как будто так и надо.
Для закрепления материала рассмотрим код:
struct MemCache
{
boost: optional
// инициализация
ThreadPool common_pool (3); // общий пул потоков
Alone mem_alone (common_pool); // сериализация действий с памятью
portal
// теперь выполняем необходимые операции
auto value = portal
Нужно не только учитывать изменчивые условия выполнения: необходимо иметь возможность реагировать на сетевые факторы — корректно обрабатывать таймауты. Конечно, хорошо, когда мы смогли получить результат, но если результат получен не вовремя, то он может оказаться не нужным. Раньше надо было! Что толку, что мы выучили предмет сегодня, если экзамен был вчера, а мы на него не явились?
Все эти требования ложатся тяжким бременем на существующие фреймворки. Поэтому, как правило, на эти требования забивают и потом уже огребают на продакшене, когда что-то висит, что-то тупит, ресурсы заняты, а действие уныло продолжается несмотря на бесполезность результата. Давайте попытаемся сделать подход к этой штанге.
Для начала введем типы внешних событий и ассоциированных исключений:
enum EventStatus { ES_NORMAL, ES_CANCELLED, ES_TIMEDOUT, };
struct EventException: std: runtime_error { EventException (EventStatus s); EventStatus status ();
private: EventStatus st; }; Для управления сопрограммой извне (см. отмена) необходим некоторый объект, разделяющий состояние между вызывающим и вызываемым: struct Goer { Goer (); EventStatus reset (); bool cancel (); bool timedout (); private: struct State { State () : status (ES_NORMAL) {} EventStatus status; };
bool setStatus0(EventStatus s); State& state0();
std: shared_ptr
void Journey: handleEvents () { // может быть вызван из деструктора if (! eventsAllowed || std: uncaught_exception ()) return; auto s = gr.reset (); if (s == ES_NORMAL) return; // нет событий throw EventException (s); }
void Journey: disableEvents () { handleEvents (); eventsAllowed = false; }
void Journey: enableEvents () { eventsAllowed = true; handleEvents (); } Тут стоит обратить внимание на добавление флага, нужно ли обрабатывать события или нет. Иногда нам стоит доделать какие-то важные действия, прежде чем мы бросим исключение и раскрутим стек. Для этого предназначен защитник: struct EventsGuard { EventsGuard (); // вызывает disableEvents () ~EventsGuard (); // вызывает enableEvents () }; Возникает вопрос, а когда вызывается тот самый пресловутый handleEvents? А вот когда: void Journey: defer (Handler handler) { // добавляем перед выходом из сопрограммы handleEvents (); defe