Реализация подписчиков в c++ — пляшем от печки

b662ee8db8d4e0290eac5c2728b3427a

За время долгой работы в IT непосредственно с кодом, подмечаю одну особенность, что писать приходится всё меньше (в последнее время практически не писать), а ревьювить всё больше. На последнем месте работы за полтора года я изучил уже примерно столько же кода, сколько на прошлом… лет за 8.

Всё чаще видны нагромождения тонн кода, которые по факту не нужны, не вносят никакой дополнительной пользы. Но создают раз за разом головную боль для следующего читающего этот код программиста, который вынужден что-то поправить или дописать в этом коде. По итогу, программист махает рукой на эту чудную «архитектуру»… и пишет ещё один wrapper / adapter над ним. И, таким образом, передаёт пламенный привет последующим коллегам в будущее;).

Попробуем взять и переписать с минимумом кода одну из очень часто встречающихся задач — рассылку уведомлений объектам в коде при возникновении какого-то события. На первый взгляд кажется, что в c++ уже есть все инструменты, чтобы написать этот код в несколько строк: функтор std::function<...> — чтобы сохранить отложенный вызов, контейнер std::vector — чтобы сохранить цепочку отложенных вызовов. По которым нужно просто пробежаться при возникновении события и вызвать сохранённые функторы.

Однако в моей практике приходилось встречаться с разного вида монстрами, которые реализовывали подобную задачу с количеством строк, начиная от тысячи. Там, где через эти тысячи удавалось продраться, чтобы понять заложенную автором идею, оказывалось, что либо идеи никакой дополнительной нет, либо автор пытался навесить в реализацию множество дополнительных «фич», сделать код ответственным за всё, включая сохранение аргументов вызова в БД и запуска космического корабля для доставки сообщения на Марс. Одной из самых монструозных библиотек для этой задачи, на мой взгляд, является библиотека boost: signals2, о которой есть даже статья с разбором — https://rsdn.org/article/cpp/signals2.xml.

Проще некуда

С чего начнём? С названия — пусть будет «оповещатель» (в коде — «notifier»).

У нашего оповещателя есть клиенты, которые предоставляют что-то, что надо вызвать в нужный момент времени, в виде функтора std::function<...> с фиксированным интерфейсом. Никаких «угадай параметр», «параметр по умолчанию, если не задали», «а тут по ссылке, а тут по значению» и т.п. Это проблема клиента. Если это вообще проблема. Пока что вообще сделаем оповещения без параметров.

При обращении клиента для подписки на событие ему хорошо бы выдать какую-то бумажку, билетик то бишь. Без бумажки ты букашка, а с бумажкой… Это про другое, конечно, но «билетик», а точнее, идентификатор подписки, нужен хотя бы для того, чтобы клиент мог отменить подписку, когда захочет.

Как будут доставляться события? Капитан Очевидность подсказывает следующий код — просто пройтись по списку и вызвать функторы.

Что ж, накатим:

class notifier {
public:
    typedef int sub_id_t;

private:
    struct subscription {
        std::function m_callback;
        const sub_id_t m_id;

        subscription(std::function c, sub_id_t id)
            : m_callback(std::move(c)), m_id(id)
        {}   
    };   

public:
    sub_id_t subscribe(std::function callback) {
        m_list.emplace_back(std::move(callback), m_next_id);
        return m_next_id++;
    }

    bool unsubscribe(sub_id_t id) {
        auto it = find_if(m_list.begin(), m_list.end(),
            [id](auto& v){ return v.m_id == id; });
        if (it != m_list.end()) {
            m_list.erase(it);
            return true;
        }
        return false;
    }

    void notify() {
        for (auto& s : m_list)
            s.m_callback();
    }

private:
    sub_id_t m_next_id = 1;
    std::list m_list;
};

Для однопоточного кода вполне себе рабочее решение. Кроме возможности задать аргументы при оповещении, что можно очень просто решить шаблонами. Но об этом ниже.

Трубим сразу в несколько труб

На практике подобные оповещатели используются всё-таки в многопоточном коде. Это и доставка оповещений, которая может быть вызвана сразу из нескольких потоков. Это и подписка / отписка. Самое тривиальное решение для использования однопоточного кода в многопоточном мире — диктатура глобальных блокировок! Зарядим один mutex на всё:

class notifier {
    ... // as before
public:
    sub_id_t subscribe(std::function callback) {
        std::lock_guard l{m_list_mtx};
        ... // as before
    }

    bool unsubscribe(sub_id_t id) {
        std::lock_guard l{m_list_mtx};
        ... // as before
    }

    void notify() {
        std::lock_guard l{m_list_mtx};
        ... // as before
    }

private:
    std::mutex m_list_mtx;
    ... // as before
};

Несмотря на топорность этого решения, такое встречается в промышленном коде! Однако весьма несложно значительно улучшить его, если заметить, что во время выполнения кода подписчика блокировать оповещатель вовсе не обязательно. Давайте снимать блокировку перед вызовом очередного подписчика во время доставки сообщения и устанавливать снова для следующей итерации в цикле по подписчикам.

А как на счёт подписки / отписки во время, пока выполняется доставка оповещений в другом потоке (а может даже и не в одном)? Ну, если гарантировать, что модификации контейнера делаются только в одном потоке в один момент времени, то контейнер мы не испортим. Что касается итерации по нему в notify (), то в момент вызова подписчика контейнер может быть модифицирован — это нужно учитывать при продолжении итерации. Если это std::vector — все его итераторы могут стать недействительными (invalid), и тогда итерацию нужно делать по позиции. И учитывать, что удаление более раннего подписчика нарушит порядок обхода — пропустим один подписчик. В текущем решении мы используем std::list, а значит, недействительным станет только итератор элемента, который удаляют.

Посмотрим на процедуры подписки/отписки со стороны клиентского кода. Очевидно, что этот код ожидает, что после завершения вызова subscribe(...) в предоставленный функтор будут доставляться оповещения. А что там с отпиской? Её могут вызвать и во время доставки оповещений. Разумным было бы гарантировать клиентскому коду, что после возврата из unsubscribe(...) соответствующий функтор уже не вызывается и вызываться уже никогда не будет. Клиент вполне может захотеть удалить, к примеру, связанный с этим функтором объект после этого.

Чтобы реализовать соответствующие гарантии, будем хранить в каждой структуре subscription счётчик, сколько потоков прямо сейчас непосредственно вызвали функтор. И положим рядом condition variable, на котором можно будет организовать ожидание, что никто уже не выполняет этот функтор.

Итого:

class notifier {
    ... // as before
private:
    struct subscription {
        std::function m_callback;
        const sub_id_t m_id;
        // how many notify cycles use this object right now
        unsigned m_refs = 0;
        std::condition_variable m_waiter;

        subscription(std::function c, sub_id_t id)
            : m_callback(std::move(c)), m_id(id)
        {}
    };

public:
    ... // as before

    bool unsubscribe(sub_id_t id) {
        std::unique_lock l{m_list_mtx};

        auto it = find_if(m_list.begin(), m_list.end(),
            [id](auto& v){ return v.m_id == id; });

        if (it != m_list.end()) {
            // if somebody executes this subscription's callback, m_refs != 0.
            // So this line unlocks the global lock and waits for notification
            it->m_waiter.wait(l, [&it](){ return ! it->m_refs; });
            m_list.erase(it);
            return true;
        }
        return false;
    }

    void notify() {
        std::unique_lock l{m_list_mtx};

        for (auto &s : m_list) {
            // mark this subscription as used so nobody can remove it
            ++s.m_refs;
            l.unlock();

            try {
                s.m_callback();
            } catch (...) {
            }

            l.lock();
            if (! --s.m_refs)
                s.m_waiter.notify_all();
        }
    }
    ... // as before
};

Простенько и со вкусом! Вполне себе неплохое решение, за исключением всё ещё отсутствия аргументов для функторов и маленького факта — отписку нельзя вызывать прямо или косвенно из кода подписчика. В противном случае получим взаимоблокировку (dead lock). Исполнение зависнет на ожидании condition variable в unsubscribe (…), пока не останется потоков выполняющих функтор этого подписчика. Но так как в цепочке вызова этого функтора и вызван unsubscribe (…), ожидаемой ситуации никогда не наступит.

Либо нужно писать строгим шрифтом пользователям этого оповещателя такое ограничение на его использование, либо как-то разрешить эту ситуацию в коде. Так как в серьёзном промышленном коде цепочки вызовов настолько длинны и неисповедимы, что очень сложно гарантировать, откуда и когда будет вызван unsubscribe (…), лучше решить это всё-таки в коде оповещателя. И не требовать от клиентов слишком многого. Но это улучшение будем делать в следующей статье.

Как это использовать?

Элементарно!

int main() {
    notifier s;

    // trivial logger streaming into std::cout with locks
    g_sync_logger() << "---- test1 ----";

    // Trivial test for delivering an event on the same thread
    auto id1 = s.subscribe([]{ g_sync_logger() << "subscriber 1 executed"; });
    s.notify();
    verify(s.unsubscribe(id1));

    s.notify(); // nothing to notify

    g_sync_logger() << "---- test2 ----";

    // MT test for unsubscribing while delivering an event
    id1 = s.subscribe([]{
        g_sync_logger() << "subscriber 2 started";
        std::this_thread::sleep_for(std::chrono::seconds(1));
        g_sync_logger() << "subscriber 2 finihed";
    });

    std::thread t{[&s](){ s.notify(); }};

    std::this_thread::sleep_for(std::chrono::milliseconds(200));

    g_sync_logger() << "trying to unsubscribe the subscriber 2";
    verify(s.unsubscribe(id1));
    g_sync_logger() << "finished unsubscription of the subscriber 2";

    t.join();
}

Вывод приложения (в квадратных скобках — ID потока):

$ ./notifs-1-1
75043.766 [44674] ---- test1 ----
75043.766 [44674] subscriber 1 executed
75043.766 [44674] ---- test2 ----
75043.766 [44675] subscriber 2 started
75043.966 [44674] trying to unsubscribe the subscriber 2
75044.766 [44675] subscriber 2 finihed
75044.766 [44674] finished unsubscription of the subscriber 2

Как-то так. Оповещатель занимает всего 80 строк. Дальнейшие шаги продолжим в следующей статье цикла.

Исходный код текущей версии подписчика на Github: https://github.com/Corosan/subscribers/blob/main/src/notifs-1–1.cpp

А в заключение небольшой опрос.

Habrahabr.ru прочитано 2107 раз