Асинхронные задачи в С++11

Доброго времени суток, хотел бы поделиться с сообществом своей небольшой библиотектой.Я программирую на С/C++, и, к сожалению, в рабочих проектах не могу использовать стандарт C++11. Но вот пришли майские праздники, появилось свободное время и я решил поэкспериментировать и по-изучать этот запретный плод. Самое лучшее для изучения чего либо — это практика. Чтение статей о языке программирования научит максимум лучше читать, поэтому я решил написать маленькую библиотеку для асинхронного выполнения функций.Сразу оговорюсь, что я знаю, что существует std: future, std: async и тп. Мне было интересно реализовать самому нечто подобное и окунуться в мир лямбда-функций, потоков и мьютексов с головой. Праздники — отличное время для велопрогулок.Итак начнем Я решил что моя библиотека будет функционировать следующим образом.Существует некоторый пул, с фиксированным количеством потоков.В него добавляются задачи, используя синтаксис лямбда функций.Из самой задачи можно извлечь результат ее выполнения, или просто дождаться окончания ее работы.Забегая вперед, выглядит это примерно так: … act: control control (N_THREADS); auto some_task = act: make_task ([](std: vector:: const_iterator begin, std: vector:: const_iterator end) { double sum = 0; for (auto i = begin; i!= end; ++i) { sum+=(*i); } return sum; } , data.begin (), data.end ()); control << some_task; cout << some_task->get () << endl; ... Класс задачи Для начала необходимо создать класс, описывающий задачу: template class task : public task { };

template class task { protected: const ClassType &m_func; std: tuple m_vars; ReturnType m_return; public: task (const ClassType &v, Args… args): m_func (v), m_vars (args …) {} virtual ~task () {} private: }; Как известно, лямба функция раскрывается в класс-функтор с оператором operator ().Наш класс задачи шаблонный, его тип извлекается из типа оператора функтора &T: operator ().Класс хранит в себе указатель на функтор, аргументы функции в виде std: tuple и возвращаемое значение.

Итак теперь мы можем хранить в объекте лямбда-функцию с параметрами, теперь надо научиться ее вызывать.Для этого необходимо вызвать opertator () у m_func с параметрами, хранящимися в m_vars.С начала я не знал как это сделать, но усиленное использования гугла и переход по второй ссылке принесло результат:

template struct seq { };

template struct gens: gens { };

template struct gens<0, S...> { typedef seq type; }; С помощью этой конструкции можно добавить в класс следующие функции:

… public: void invoke () { ReturnType r = caller (typename gens:: type ()); } private: template ReturnType caller (seq) const { return m_func (std: get(m_vars) …); } … Базовый класс задачи Теперь реализуем базовый класс задачи:: class abstract_task { protected: mutable std: mutex m_mutex; mutable std: condition_variable m_cond_var; mutable bool m_complete; public: abstract_task (): m_complete (false) {} virtual ~abstract_task () {} virtual void invoke () = 0; virtual void wait () const { std: unique_lock lock (m_mutex); while (! m_complete) { m_cond_var.wait (lock); } } }; Класс содержит в себе мьютекс и переменную состояния, сигнализирующую о завершении задачи. Соотвественно наш класс задачи притерпит некоторые изменения, которые я опущую, так как исходный код доступен на гитхабе.Создание задач Сделаем функцию-обертку для создания задач: template std: shared_ptr> make_task (T func, Args … args) { return std: shared_ptr>(new task(func, args …)); } Так как у нас класс виртуальный, логично использовать указатель и мы это сделаем, и не просто указатель, а умный указатель.Класс управления Теперь реализуем сущность для исполнения задач в фоновых потоках.Приведу лишь часть кода: … class control { std: deque> m_tasks; std: vector m_pool; std: mutex m_mutex; std: condition_variable m_cond_var; std: condition_variable m_empty_cond; std: atomic m_run; std: vector m_active; public: control (std: size_t pool_size = 2) { m_run.store (true, std: memory_order_relaxed); auto func = [this](int n) { while (m_run.load (std: memory_order_relaxed)) { std: unique_lock lock (m_mutex); m_active[n] = true; if (m_tasks.empty ()) { m_empty_cond.notify_all (); m_active[n] = false; m_cond_var.wait (lock); } else { std: shared_ptr t = m_tasks.front (); m_tasks.pop_front (); lock.unlock (); t→invoke (); lock.lock (); m_active[n] = false; } } }; pool_size = pool_size > 0? pool_size: 1; m_active.resize (pool_size, false); for (std: size_t i = 0; i < pool_size; ++i) { m_pool.emplace_back(func, i); } } ... Для интереса, я использовал все фичерсы нового стандарта, применение которых я хоть как то мог обосновать.Данный класс создает массив потоков и массив переменных состояния активности для мониторинга выполнения заданий дочерними потоками.Главный цикл дочернего потока контролируется атомарной переменной (по идее достаточно было объявить ее volatile, так как тут нет состояния гонки, главный поток в нее только пишет, а дочерние только читают)

Производительность Я бы не стал писать эту статью скорее всего, если бы не проведенный мной тест производительности данного решения по сравнению с std: async.Конфигурация: Intel® Core i7–2600 CPU @ 3.40GHz$gcc --versiongcc (Debian 4.8.2–21) 4.8.2

Тест заключается в параллельном сложении массивов, а затем асинхронном сложении результатов всех сложений. Результатом операции будет:

res = sum (array)*N_P

Числа указаны в миллисекундах.

Тест 1 Оптимизация выключена, количество элементов в массиве 100000000, количество порождаемых задач 73, Количество потоков в пуле 6Результаты: test_act 16775 OKtest_async 16028 OK

Производительность сравнима.Тест 2 Оптимизация включена, количество элементов в массиве 100000000, количество порождаемых задач 73, Количество потоков в пуле 6Результаты: test_act 1597.6 OKtest_async 2530.5 OK

Моя реализация быстрее в полтора раза.Тест 3 Оптимизация включена, количество элементов в массиве 100000000, количество порождаемых задач 73, Количество потоков в пуле 7Результаты: test_act 1313.1 OKtest_async 2503.7 OK

Тест 4 Оптимизация включена, количество элементов в массиве 100000000, количество порождаемых задач 73, Количество потоков в пуле 8Результаты: test_act 1402 OKtest_async 2492.2 OK

Тест 5 Оптимизация включена, количество элементов в массиве 100000000, количество порождаемых задач 173, Количество потоков в пуле 8Результаты: test_act 4435.7 OKtest_async 5789.4 OK

Выводы и баги Данные результаты скорее всего связаны с тем, что async порождает для каждой задачи свой поток, в моей же реализации количество потоков фиксировано и накладные расходы на их создание отсутствуют.Баг — захват переменных области видимости (через []) в лямбда функции вызывает SIGSEGV. Хотя передача их же через параметры работает прекрасно.Не знаю насколько ли полезна данная статья и сама библиотека, но, по крайней мере, я применил некоторые возможности нового стандарта на своей практике.Исходный код

© Habrahabr.ru