[Из песочницы] Решаем проблемы с RAII у std::thread: cancellation_token как альтернатива pthread_cancel и boost::thread::interrupt
Статья рассматривает проблемы в std: thread, попутно разрешая древний спор на тему «что использовать: pthread_cancel, булев флаг или boost: thread: interrupt?»
Проблема
У класса std: thread, который добавили в C++11, есть одна неприятная особенность — он не соответствует с идиоме RAII (Resource Acquisition Is Initialization). Выдержка из стандарта:
30.3.1.3 thread destructor
~thread ();
If joinable () then terminate (), otherwise no effects.
Чем нам грозит такой деструктор? Программист должен быть очень аккуратен, когда речь идёт об разрушении объекта std::thread:
void dangerous_thread()
{
std::thread t([] { do_something(); });
do_another_thing(); // may throw - can cause termination!
t.join();
}Если из функции do_another_thing вылетит исключение, то деструктор std::thread завершит всю программу, вызвав std::terminate. Что с этим можно сделать? Давайте попробуем написать RAII-обёртку вокруг std::thread и посмотрим, куда нас приведёт эта попытка.
Добавляем RAII в std: thread
class thread_wrapper
{
public:
// Constructors
~thread_wrapper()
{ reset(); }
void reset()
{
if (joinable())
{
// ???
}
}
// Other methods
private:
std::thread _impl;
};thread_wrapper копирует интерфейс std::thread и реализует ещё одну дополнительную функцию — reset. Эта функция должна перевести поток в non-joinable состояние. Деструктор вызывает эту функцию, так что после этого _impl разрушится, не вызывая std::terminate.
Для того, чтобы перевести _impl в non-joinable состояние, у reset есть два варианта: detach или join. Проблема с detach в том, что поток продолжит выполняться, сея хаос и нарушая идиому RAII. Так что наш выбор — это join:
thread_wrapper::reset()
{
if (joinable())
join();
}Серьёзная проблема
К сожалению, такая реализация thread_wrapper ничем не лучше, чем обычный std::thread. Почему? Давайте рассмотрим следующий пример использования:
void use_thread()
{
std::atomic alive{true};
thread_wrapper t([&alive] { while(alive) do_something(); });
do_another_thing();
alive = false;
} Если из do_another_thing вылетит исключение, то аварийного завершения не произойдёт. Однако, вызов join из деструктора thread_wrapper зависнет навечно, потому что alive никогда не примет значение false, и поток никогда не завершится.
Всё дело в том, что у объекта thread_wrapper нет способа повлиять на выполняемую функцию, для того чтобы «попросить» её завершиться. Ситуация усложняется ещё и тем, что в функции do_something поток выполнения вполне может «уснуть» на условной переменной или в блокирующем вызове операционной системы.
Таким образом, для решения проблемы с деструктором std::thread необходимо решить более серьёзную проблему:
Как прервать выполнение длительной функции, особенно если в этой функции поток выполнения может «уснуть» на условной переменной или в блокирующем вызове ОС?
Частный случай этой проблемы — это прерывание потока выполнения целиком. Давайте рассмотрим три существующих способа для прерывания потока выполнения: pthread_cancel, boost::thread::interrupt и булев флаг.
Существующие решения
pthread_cancel
Отправляет выбранному потоку запрос на прерывание. Спецификация POSIX содержит особый список прерываемых функций (read, write и т.д.). После вызова pthread_cancel для какого-нибудь потока эти функции в данном потоке начинают кидать исключение особого типа. Это исключение нельзя проигнорировать — catch-блок, поймавший такое исключение, обязан кинуть его дальше, поэтому это исключение полностью разматывает стек потока и завершает его. Поток может на время запретить прерывание своих вызовов с помощью функции pthread_setcancelstate (одно из возможных применений: чтобы избежать исключений из деструкторов, функций логгирования и т.п.).
Плюсы:
- Можно прервать ожидание на условных переменных
- Можно прервать блокирующие вызовы ОС
- Сложно проигнорировать запрос на прерывание
Минусы:
- Большие проблемы с переносимостью: кроме очевидного отсутствия
pthread_cancelв Windows, он также отсутствует в некоторых реализациях libc (например, в bionic, который используется в Android) - Проблемы с
std::condition_variable::waitв C++14 и более поздних стандартах - Может вызвать проблемы в C коде, который использует прерываемые функции (вероятный список спецэффектов: утечки ресурсов, не разблокированные вовремя мьютексы и т.д.)
- Прерываемые функции в деструкторе требуют особых предосторожностей (например,
closeявляется прерываемой функцией) - Нельзя использовать в среде без исключений
- Нельзя применить для прерывания отдельных функций или задач
Проблемы с std::condition_variable::wait появляются из-за того, что в C++14 std::condition_variable::wait получил спецификацию noexcept. Если разрешить прерывания с помощью pthread_setcancelstate, то мы теряем возможность прерывать ожидание на условых переменных, а если прерывания будут разрешены, то у нас нет возможности соответствовать спецификации noexcept, потому что мы не можем «проглотить» это особое исключение.
boost: thread: interrupt
Библиотека Boost.Thread предоставляет опциональный механизм прерывания потоков, чем-то похожий на pthread_cancel. Для того, чтобы прервать поток выполнения, достаточно позвать у соответствующего ему объекта boost::thread метод interrupt. Проверить состояния текущего потока можно с помощью функции boost::this_thread::interruption_point: в прерванном потоке эта функция кидает исключение типа boost::thread_interrupted. В случае, если использование исключений запрещено с помощью BOOST_NO_EXCEPTIONS, то для проверки состояния можно использовать boost::this_thread::interruption_requested. Boost.Thread также позволяет прерывать ожидание в boost::condition_variable::wait. Для реализации этого используется thread-local storage и дополнительный мьютекс внутри условной переменной.
Плюсы:
- Переносимость
- Можно прервать
boost::condition_variable::wait - Можно использовать в среде без исключений
Минусы:
- Привязка к Boost.Thread — данный механизм прерывания нельзя использовать со стандартными условными переменными или потоками
- Требует дополнительного мьютекса внутри
condition_variable - Накладные расходы: добавляет две дополнительных блокировки/разблокировки мьютексов в каждый
condition_variable::wait - Нельзя прервать блокирующие вызовы ОС
- Проблематично применить для прерывания отдельных функций или задач (судя по коду, это можно сделать только при использовании исключений)
- Незначительное нарушение философии исключений — прерывание потока не является исключительной ситуацией в жизненном цикле программы
Булев флаг
Если почитать на StackOverflow вопросы про pthread_cancel (1, 2, 3, 4), то один из самых популярных ответов: «Используйте вместо pthread_cancel булев флаг».
Атомарная переменная alive в нашем примере с исключениями — это и есть булев флаг:
void use_thread()
{
std::atomic alive{true};
thread_wrapper t([&alive] { while(alive) do_something(); });
do_another_thing(); // may throw
alive = false;
} Плюсы:
- Платформно-независимый
- Очевидны точки прерывания выполнения потока
Минусы:
- Дупликация кода
- Мешает декомпозиции — нет простого и эффективного способа написать блокирующую функцию
- Нельзя прервать ожидание на условных переменных (особенно если они находятся вне класса с булевым флагом)
- Нельзя прервать блокирующие вызовы ОС
Cancellation token
Что делать? Давайте возьмём за основу булев флаг и начнём решать связанные с ним проблемы. Дупликация кода? Отлично — давайте завернём булев флаг в отдельный класс. Назовём его cancellation_token.
class cancellation_token
{
public:
explicit operator bool() const
{ return !_cancelled; }
void cancel()
{ _cancelled = true; }
private:
std::atomic _cancelled;
}; Теперь можно положить cancellation_token в наш thread_wrapper:
class thread_wrapper
{
public:
// Constructors
~thread_wrapper()
{ reset(); }
void reset()
{
if (joinable())
{
_token.cancel();
_impl.join();
}
}
// Other methods
private:
std::thread _impl;
cancellation_token _token;
};Отлично, теперь осталось только передать ссылку на токен в ту функцию, которая исполняется в отдельном потоке:
template
thread_wrapper(Function&& f, Args&&... args)
{ _impl = std::thread(f, args..., std::ref(_token)); } Так как thread_wrapper мы пишем для иллюстративных целей, то можно пока не использовать std::forward и, заодно, проигнорировать те проблемы, которые возникнут в с move-конструктором и функцией swap.
Настало время вспомнить пример с use_thread и исключениями:
void use_thread()
{
std::atomic alive{true};
thread_wrapper t([&alive] { while(alive) do_something(); });
do_another_thing();
alive = false;
} Для того, чтобы добавить поддержку cancellation_token, нам достаточно добавить правильный аргумент в лямбду и убрать alive:
void use_thread()
{
thread_wrapper t([] (cancellation_token& token) { while(token) do_something(); });
do_another_thing();
}Замечательно! Даже если из do_another_thing вылетит исключение — деструктор thread_wrapper всё равно вызовёт cancellation_token::cancel и поток завершит своё выполнение. Кроме того, убрав код булева флага в cancellation_token, мы значительно сократили количество кода в нашем примере.
Прерывание ожидания
Настало время научить наши токены прерывать блокирующие вызова, например — ожидание на условных переменных. Чтобы абстрагироваться от конкретных механизмов прерывания, нам понадобится интерфейс cancellation_handler:
struct cancellation_handler
{
virtual void cancel() = 0;
};Хэндлер для прервания ожидания на условной переменной выглядит примерно так:
class cv_handler : public cancellation_handler
{
public:
cv_handler(std::condition_variable& condition, std::unique_lock& lock) :
_condition(condition), _lock(lock)
{ }
virtual void cancel()
{
unique_lock l(_lock.get_mutex());
_condition.notify_all();
}
private:
std::condition_variable& _condition;
std::unique_lock& _lock;
}; Теперь достаточно положить указатель на cancellation_handler в наш cancellation_handler и вызвать cancellation_handler::cancel из cancellation_token::cancel:
class cancellation_token
{
std::mutex _mutex;
std::atomic _cancelled;
cancellation_handler* _handler;
public:
explicit operator bool() const
{ return !_cancelled; }
void cancel()
{
std::unique_lock l(_mutex);
if (_handler)
_handler->cancel();
_cancelled = true;
}
void set_handler(cancellation_handler* handler)
{
std::unique_lock l(_mutex);
_handler = handler;
}
}; Прерываемая версия ожидания на условной переменной выглядит примерно так:
void cancellable_wait(std::condition_variable& cv, std::unique_lock& l, cancellation_token& t)
{
cv_handler handler(cv, l); // implements cancel()
t.set_handler(&handler);
cv.wait(l);
t.set_handler(nullptr);
} Внимание! Приведённая реализация небезопасна как с точки зрения исключений и потокобезопасности. Она здесь только для того, чтобы проиллюстрировать механизм работы cancellation_handler. Ссылки на правильную реализацию можно найти в конце статьи.
Реализовав соответствующий cancellation_handler, можно научить токен прерывать блокирующие вызовы ОС и блокирующие функции из других библиотек (если у этих функций есть хотя бы какой-нибудь механизм для прерывания ожидания).
Библиотека rethread
Описанные токены, хэндлеры и потоки реализованы в виде open-source библиотеки: https://github.com/bo-on-software/rethread, с документацией (на английском), тестами и бенчмарками.
Вот список главных отличий приведённого кода от того, что реализовано в библиотеке:
cancellation_token— это интерфейс с несколькими реализациями. Прерываемые функции получаютcancellation_tokenпо константной ссылке.- Токен использует атомики вместо мьютексов для часто используемых операций
- Обёртка над потоком называется
rethread::thread
Что есть в библиотеке:
- Токены
- RAII-совместимые потоки
- Прерываемое ожидание на любых условных переменных, совместимых по интерфейсу с
std::condition_variable - Прерываемое ожидание в
poll— это позволяет реализовать прерываемые версии многих блокирующих POSIX вызовов (read,write, и т.д.)
Производительность
Измерения проводились на ноутбуке с процессором Intel Core i7–3630QM @ 2.4GHz.
Ниже приведены результаты бенчмарков токенов из rethread.
Измерялась производительность следующих операций:
- Проверка состояния — это цена вызова функции
cancellation_token::is_cancelled()(или эквивалентное этому контекстное приведение к булеву типу) - Вызов прерываемой функции — это накладные расходы на одну прерываемую блокирующую функцию: регистрация хэндлера в токене перед вызовом и «разрегистрация» после завершения вызова
- Создание одного
standalone_cancellation_token
Ubuntu 16.04
| Процессорное время, нс | |
|---|---|
| Проверка состояния токена | 1.7 |
| Вызов прерываемой функции | 15.0 |
| Создание токена | 21.3 |
Windows 10
| Процессорное время, нс | |
|---|---|
| Проверка состояния токена | 2.8 |
| Вызов прерываемой функции | 17.0 |
| Создание токена | 33.0 |
Отрицательный оверхэд
Столь низкие накладные расходы на прерываемость создают интересный эффект:
В некоторых ситуациях прерываемая функция работает быстрее, чем «обычный» подход.
В коде без использования токенов блокирующие функции не могут блокироваться навечно — тогда не получится достичь «нормального» завершения приложения (извращения вроде exit(1); нельзя считать нормой). Для того, чтобы избежать вечной блокировки и регулярно проверять состояние, нам нужен таймаут. Например, такой:
while (alive)
{
_condition.wait_for(lock, std::chrono::milliseconds(100));
// ...
}Во-первых, такой код будет просыпаться каждые 100 миллисекунд только для того, чтобы проверить флаг (значение таймаута можно увеличить, но оно ограниченно сверху «разумным» временем завершения приложения).
Во-вторых, этот код неоптимален даже без таких бессмысленных пробуждений. Дело в том, что вызов condition_variable::wait_for(...) менее эффективен, чем condition_variable::wait(...): как минимум, ему нужно получить текущее время, посчитать время пробуждения, и т.д.
Для доказательства этого утверждения в rethread_testing были написаны два синтетических бенчмарка, в которых сравнивались две примитивных реализации многопоточной очереди: «обычная» (с таймаутом) и прерываемая (с токенами). Измерялось процессорное время, затраченное на то, чтобы дождаться появления в очереди одного объекта.
| Процессорное время, нс | |
|---|---|
| Ubuntu 16.04 & g++ 5.3.1 («обычная» очередь) | 5913 |
| Ubuntu 16.04 & g++ 5.3.1 (прерываемая очередь) | 5824 |
| Windows 10 & MSVS 2015 («обычная» очередь) | 2467 |
| Windows 10 & MSVS 2015 (прерываемая очередь) | 1729 |
Итак, на MSVS 2015 прерываемая версия работает в 1.4 быстрее, чем «обычная» версия с таймаутами. На Ubuntu 16.04 разница не столь заметна, но даже там прерываемая версия явно выигрывает у «обычной».
Заключение
Это не единственное возможное решение изложенной проблемы. Наиболее заманчивая альтернатива — положить токен в thread-local storage и кидать исключение при прерывании. Поведение будет похоже на boost::thread::interrupt, но без дополнительного мьютекса в каждой условной переменной и со значительно меньшими накладными расходами. Основной недостаток такого подхода — уже упомянутое нарушение философии исключений и неочевидность точек прерывания.
Важное достоинство подхода с токенами состоит в том, что можно прерывать не потоки целиком, а отдельные задачи, а если использовать реализованный в библиотеке cancellation_token_source — то и несколько задач одновременно.
Почти весь свои «хотелки» в библиотеке я реализовал. На мой взгляд — не хватает интеграции с блокирующими вызовами системы вроде работы с файлами или сокетами. Написать прерываемые версии для read, write, connect, accept и т.д. не составит особого труда, основные проблемы — нежелание совать токены в стандартные iostream’ы и отсутствие общепринятой альтернативы.
