SObjectizer: от простого к сложному. Часть I

В первой статье мы рассказали о том, что такое SObjectizer и почему он получился именно таким. Во второй — попробуем показать, как может выглядеть более-менее реальный код на SObjectizer. С демонстрацией того, в какую сторону этот код обычно эволюционирует. Ибо первоначально, когда у разработчика появляется возможность работать с Actor Model, он начинает этой возможностью злоупотреблять, создавая проблемы и себе, и тем, кто будет эксплуатировать программный продукт, написанный в стиле «актор на каждый чих». Только спустя некоторое время и некоторое количество набитых шишек приходит понимание того, что прелесть модели акторов вовсе не в возможности создавать их десятками тысяч или даже просто тысячами. Но давайте пойдем последовательно, не опережая события.

Для демонстрации выдумаем себе такую абстрактную задачу: есть имя файла с email-ом (грубо говоря, в этот файл сохранили все, что пришло по POP3 протоколу, включая заголовки, тело письма, аттачи и пр). Нужно выдать результат оценки подозрительности содержимого этого файла: выглядит ли письмо безопасно или подозрительно, или же при попытке оценить его содержимое возникла какая-то проблема и актуальную оценку выдать нельзя. Задача абстрактная, любые совпадения с чем-либо похожим из реальной жизни являются непреднамеренной случайностью.

Естественно, таких имен с файлами email-ов у нас будет не одно и не два. Будет некий поток этих имен, с которым нужно разбираться. Желательно, используя возможности современного многоядерного железа, т. е. запуская обработку нескольких email-ов в параллель.

Схематично покажем, как эта задача может быть решена на SObjectizer-е «в лоб». После чего укажем проблемы выбранного подхода, сделаем следующую итерацию и т.д. Дабы в итоге на примерах подвести читателя к тому пониманию «удобного использования модели акторов на C++» которое у нас сложилось за десять лет работы с SObjectizer-ом в реальных проектах.

Для начала определимся с тем, как выдаются запросы на проверку файлов с email-ами и как возвращаются результаты проверок. Используем для этих целей простые сообщения:

// Избавляемся от необходимости указывать префиксы so_5 и std.
// В последующих примерах эти using-и дублировать не будем,
// подразумевая, что они уже выполнены.
using namespace so_5;
using namespace std;
using namespace chrono_literals;

// Сообщение для проверки одного файла с email-ом.
struct check_request {
  // Имя проверяемого файла.
  string email_file_;
  // Кому нужно отослать результат проверки.
  mbox_t reply_to_;
};

// Статус проверки, который будет возвращен в ответном сообщении.
enum class check_status {
  safe,
  suspicious,
  dangerous,
  check_failure,
  check_timedout
};

// Сообщение с результатом проверки одного файла с email.
// Содержит не только статус проверки, но и имя проверяемого файла.
// Это имя нужно лишь для того, чтобы облегчить сопоставление
// получаемых результатов проверки.
struct check_result {
  string email_file_;
  check_status status_;
};

Получается, что когда нам нужно проверить email, мы отсылает сообщение check_request на некий mbox. В этом сообщении передается имя файла и обратный адрес, куда должен быть отослан результат проверки. Соответственно, следующим шагом нам нужно определить, куда же именно будут отсылаться сообщения check_request.

Можно, конечно же, создать одного агента, который бы получал все сообщения check_request и обрабатывал бы их самостоятельно. Но такой агент очень быстро стал бы узким местом. Поэтому мы сделаем так, что у нас будет один агент-менеджер, который получает сообщения check_request и под каждое полученное сообщение создает агента-анализатора. Именно агент-анализатор будет заниматься проверкой email-а, а агент-мендежер будет выполнять роль фабрики агентов-анализаторов.

Сходу можно написать самую простую версию агента-менеджера:

// Агент, который будет играть роль менеджера агентов email_analyzer.
class analyzer_manager final : public agent_t {
public :
  analyzer_manager( context_t ctx ) : agent_t( ctx )
  {
    // Класс объявлен как final, поэтому подписки агента можно сделать
    // прямо в конструкторе. Если бы final не было, подписки лучше было
    // бы вынести в метод so_define_agent(), что упростило бы разработку
    // производных классов.
    so_subscribe_self()
      // В этом случае тип сообщения, на который идет подписка,
      // выводится автоматически.
      .event( &analyzer_manager::on_new_check_request );
  }

private :
  void on_new_check_request( const check_request & msg ) {
    // Создаем кооперацию с единственным агентом внутри.
    // Эта кооперация будет дочерней для кооперации с агентом-менеджером.
    // Т.е. SObjectizer Environment проконтролирует, чтобы кооперация с
    // агентом-анализатором завершила свою работу перед тем, как
    // завершит свою работу кооперация с агентом-менеджером.
    introduce_child_coop( *this, [&]( coop_t & coop ) {
        // В кооперацию будет входить всего один агент.
        coop.make_agent< email_analyzer >( msg.email_file_, msg.reply_to_ );
      } );
  }
};

Для обработки email-ов нам нужно будет зарегистрировать в SObjectizer Environment экземпляр агента типа analyzer_manager и каким-то образом сделать его персональный mbox (т.н. direct_mbox) доступным для всех. Тот, кому нужно проверить email, отошлет на этот mbox сообщение check_request, сообщение дойдет до analyzer_manager, будет создан агент email_analyzer ну и дальше все, как и задумывалось…

Теперь нужно реализовать агента email_analyzer, который и будет производить анализ email-ов. Самое простое, что приходит в голову — это агент, который сам выполняет все операции: т.е. загружает содержимое из файла, парсит это содержимое на составные части (заголовки, тело, аттачи), анализирует все это и выдает заключение.

Фактически, агенту email_analyzer нужно будет определить только свою реализацию метода so_evt_start (), которая автоматически вызывается у каждого агента после того, как агент успешно регистрируется внутри SObjectizer Environment. Посему агент email_analyzer будет выглядеть очень просто:

// Агент для анализа содержимого одного email-а.
// Получает все нужные ему параметры в конструкторе,
// выполняет все свои действия в единственном методе so_evt_start.
class email_analyzer : public agent_t {
public :
  email_analyzer( context_t ctx,
    // Имя файла с email для анализа.
    string email_file,
    // Куда нужно отослать результат анализа.
    mbox_t reply_to )
    : agent_t(ctx), email_file_(move(email_file)), reply_to_(move(reply_to))
  {}

  virtual void so_evt_start() override {
    try {
      // Стадии обработки обозначаем лишь схематично.
      auto raw_data = load_email_from_file( email_file_ );
      auto parsed_data = parse_email( raw_data );
      auto status = check_headers( parsed_data->headers() );
      if( check_status::safe == status )
        status = check_body( parsed_data->body() );
      if( check_status::safe == status )
        status = check_attachments( parsed_data->attachments() );
      send< check_result >( reply_to_, email_file_, status );
    }
    catch( const exception & ) {
      // В случае какой-либо ошибки отсылаем статус о невозможности
      // проверки файла с email-ом по техническим причинам.
      send< check_result >(
          reply_to_, email_file_, check_status::check_failure );
    }
    // Больше мы не нужны, поэтому дерегистрируем кооперацию,
    // в которой находимся.
    so_deregister_agent_coop_normally();
  }

private :
  const string email_file_;
  const mbox_t reply_to_;
};

Итак, у нас есть очень тривиальные реализации агентов analyzer_manager и email_analyzer. Которые, к сожалению, имеют несколько серьезных проблем.

Первая проблема состоит в том, что агенты email_analyzer не будут работать в параллель. Дело в том, что при их создании не указывается диспетчер, к которому они должны быть привязаны. Поэтому эти агенты автоматически привязываются к дефолтному диспетчеру SObjectizer Environment, а этот дефолтный диспетчер является однопоточным: т.е. у него всего одна рабочая нить, на которой последовательно запускаются события привязанных к диспетчеру агентов.

Поэтому, если мы хотим, чтобы агенты email_analyzer работали независимо друг от друга, нам нужно явно привязывать их к соответствующему типу диспетчера. В данном случае хорошо подойдет диспетчер с пулом рабочих потоков. Соответственно, кто-то должен создать экземпляр такого диспетчера и кто-то должен привязывать email_analyzer-ов к этому экземпляру. Очевидно, что этот кто-то — это агент analyzer_manager:

class analyzer_manager final : public agent_t {
public :
  analyzer_manager( context_t ctx )
    : agent_t( ctx )
    , analyzers_disp_(
        // Нужен приватный, т.е. видимый только нашему менеджеру
        // диспетчер, на котором и будут работать агенты-анализаторы.
        disp::thread_pool::create_private_disp(
          // Указываем, в рамках какого SObjectizer Environment
          // будет работать диспетчер. Это нужно для корректного запуска
          // и останова диспетчера.
          so_environment(),
          // Просто захардкодим количество рабочих потоков для диспетчера.
          // В реальном приложении это количество может быть вычислено
          // на основании, например, thread::hardware_concurrency() или
          // взято из конфигурации.
          16 ) )
  {
    so_subscribe_self()
      .event( &analyzer_manager::on_new_check_request );
  }

private :
  disp::thread_pool::private_dispatcher_handle_t analyzers_disp_;

  void on_new_check_request( const check_request & msg ) {
    introduce_child_coop( *this,
      // Агент из новой кооперации будет автоматически привязан к приватному
      // диспетчеру с пулом рабочих потоков (при привязке будут использоваться
      // параметры по умолчанию).
      analyzers_disp_->binder( disp::thread_pool::bind_params_t() ),
      [&]( coop_t & coop ) {
        // В кооперацию будет входить всего один агент.
        coop.make_agent< email_analyzer >( msg.email_file_, msg.reply_to_ );
      } );
  }
};

Такая несложная модификация analyzer_manager позволила нам избавиться от первой проблемы. Но осталась еще и вторая: неконтролируемое создание неограниченного количества агентов email_analyzer.

Текущая реализация analyzer_manager работает по принципу: получил сообщение check_email с именем файла для проверки, создал агента email_analyzer и забыл про все. Но, очевидно, что для более-менее высокой нагрузки этот вариант не подходит. Если сразу создать 100500 агентов email_analyzer, которые будут работать на пуле из N рабочих потоков, то ничего хорошего кроме лишнего расхода памяти не будет. Лучше сразу ограничивать количество одновременно работающих агентов и создавать новых после того, как завершают работу предыдущие. Плюс хранить очередь заданий на обработку, из которой и будут браться элементы для новых агентов.

Поэтому еще раз модифицируем нашего analyzer_manager-а: добавим в него очередь запросов и ограничение на количество одновременно работающих агентов.

class analyzer_manager final : public agent_t {
  // Этот сигнал нам нужен для того, чтобы мы могли попробовать
  // запустить в работу очередной анализатор.
  struct try_create_next_analyzer : public signal_t {};
  // А этот сигнал будет информировать нас о том, что очередной
  // анализатор завершил свою работу.
  struct analyzer_finished : public signal_t {};

public :
  analyzer_manager( context_t ctx )
    : agent_t( ctx )
    , analyzers_disp_(
        disp::thread_pool::create_private_disp( so_environment(), 16 ) )
  {
    so_subscribe_self()
      .event( &analyzer_manager::on_new_check_request )
      // А в этом случае метод-обработчик не имеет параметров,
      // поэтому тип сигнала-инцидента указывается явно.
      .event< try_create_next_analyzer >( &analyzer_manager::on_create_new_analyzer )
      .event< analyzer_finished >( &analyzer_manager::on_analyzer_finished );
  }

private :
  const size_t max_parallel_analyzers_{ 16 };
  size_t active_analyzers_{ 0 };

  disp::thread_pool::private_dispatcher_handle_t analyzers_disp_;

  list< check_request > pending_requests_;

  void on_new_check_request( const check_request & msg ) {
    // Работаем по очень простой схеме: сперва сохраняем очередной
    // запрос в список ожидания, затем отсылаем себе сигнал для
    // попытки запустить очередного обработчика.
    // И создавать агента-анализатора будем уже при обработке сигнала.
    pending_requests_.push_back( msg );
    // Отсылаем сигнал сами себе.
    send< try_create_next_analyzer >( *this );
  }

  void on_create_new_analyzer() {
    // Запустить новый анализатор можно только если еще не достигнут
    // лимит на их количество.
    if( active_analyzers_ >= max_parallel_analyzers_ )
      return;

    lauch_new_analyzer();

    // Если список не пуст и возможность стартовать анализаторов
    // сохраняется, то продолжим это делать.
    if( !pending_requests_.empty()
        && active_analyzers_ < max_parallel_analyzers_ )
      send< try_create_next_analyzer >( *this );
  }

  void on_analyzer_finished() {
    // Фиксируем факт, что анализаторов стало меньше.
    --active_analyzers_;

    // Если есть, что запускать на обработку, делаем это.
    if( !pending_requests_.empty() )
      lauch_new_analyzer();
  }

  void lauch_new_analyzer() {
    introduce_child_coop( *this,
      analyzers_disp_->binder( disp::thread_pool::bind_params_t() ),
      [this]( coop_t & coop ) {
        coop.make_agent< email_analyzer >(
          pending_requests_.front().email_file_,
          pending_requests_.front().reply_to_ );

        // Нам нужно автоматически получить уведомление, когда эта кооперация
        // перестанет работать. Для чего мы назначаем специальный нотификатор,
        // задачей которого будет отсылка сигнала analyzer_finished.
        coop.add_dereg_notificator(
          // Нотификатор получает ряд параметров, но нам они сейчас не нужны.
          [this]( environment_t &, const string &, const coop_dereg_reason_t & ) {
            send< analyzer_finished >( *this );
          } );
      } );

    // Фиксируем тот факт, что анализаторов стало больше.
    ++active_analyzers_;

    // Соответствующую заявку в списке ожидания больше хранить не нужно.
    pending_requests_.pop_front();
  }
};

В принципе, мы получили более-менее нормальное решение, которое можно было бы счесть удовлетворительным. Если бы не одно «но».

Это «но» состоит в том, что хотя у нас и есть возможность запускать в параллельную работу несколько агентов-анализаторов, распараллеливание получится так себе. Если, скажем, одновременно стартуют пять агентов, то все пятеро сразу же начнут I/O операции и пока эти операции будут выполняться, никто не сможет делать ничего другого. Потом I/O операции закончатся и все пятеро агентов начнут разбор прочитанных с диска данных. Тем самым займут процессор. Этим можно было бы воспользоваться для того, чтобы начать I/O операции для следующих нескольких агентов-анализаторов. Но мы не можем этого сделать, пока первые пять агентов заняты своей работой.

Решить эту проблему можно, если изъять из email_analyzer I/O операцию. Вместо того, чтобы загружать данные из файла самостоятельно, агент email_analyzer может делегировать эту задачу специальному IO-агенту. Т.е. агент email_analyzer стартует, отсылает сообщение IO-агенту и затем получает результат I/O операции в виде ответного сообщения. Тем самым предоставляя возможность другому email_analyzer-у выполнить свою часть работы (отослать сообщение IO-агенту или обработать ответное сообщение от IO-агента). Но разговор о том, как это будет выглядеть и насколько хорошим окажется такое решение мы продолжим в следующей статье.

Пока же можно показать одну важную возможность, которую мы получили в своей текущей реализации агента-менеджера с его списком ожидания: мы можем легко контролировать время ожидания запросов в этом списке.

Действительно, у операции проверки письма наверняка будут какие-то разумные пределы времени ожидания ответа. И если за это время оценить безопасность не удалось, то, скорее всего, и не нужно будет пытаться это делать. Исходя из этого мы можем легко модифицировать агента-менеджера так, чтобы он выбрасывал из списка ожидания те запросы, которые провели в ожидании слишком много времени (например, больше 10 секунд). Для этого задействуем периодическое сообщение, которое будет приходить к менеджеру два раза в секунду. Получив это сообщение менеджер пробежится по списку ожидания и выбросит те запросы, которые ждали больше 10 секунд. Подход, конечно, не очень точный, но зато очень простой и надежный:

class analyzer_manager final : public agent_t {
  struct try_create_next_analyzer : public signal_t {};
  struct analyzer_finished : public signal_t {};

  // Потребуется еще один сигнал для таймера проверки времени жизни
  // заявки в списке ожидания.
  struct check_lifetime : public signal_t {};

  // Кроме того, нам потребуется другая структура для хранения заявки
  // в списке ожидания. Кроме самой заявки нужно будет хранить еще
  // и время поступления в список ожидания.
  using clock = chrono::steady_clock;
  struct pending_request {
    clock::time_point stored_at_;
    check_request request_;
  };

public :
  analyzer_manager( context_t ctx )
    : agent_t( ctx )
    , analyzers_disp_(
        disp::thread_pool::create_private_disp( so_environment(), 16 ) )
  {
    so_subscribe_self()
      .event( &analyzer_manager::on_new_check_request )
      .event< try_create_next_analyzer >( &analyzer_manager::on_create_new_analyzer )
      .event< analyzer_finished >( &analyzer_manager::on_analyzer_finished )
      // Для обработки таймера нам нужен еще одно событие-обработчик.
      .event< check_lifetime >( &analyzer_manager::on_check_lifetime );
  }

  // Используем стартовый метод для того, чтобы запустить периодический таймер.
  virtual void so_evt_start() override {
    // Для периодических таймеров нужно сохранять возвращаемый timer_id,
    // иначе таймер будет автоматически отменен.
    check_lifetime_timer_ = send_periodic< check_lifetime >( *this, 500ms, 500ms );
  }

private :
  const size_t max_parallel_analyzers_{ 16 };
  size_t active_analyzers_{ 0 };

  disp::thread_pool::private_dispatcher_handle_t analyzers_disp_;

  // Ограничение на время пребывания заявки в списке ожидания.
  const chrono::seconds max_lifetime_{ 10 };
  // Идентификатор таймера для периодического сигнала check_lifetime.
  timer_id_t check_lifetime_timer_;

  list< pending_request > pending_requests_;

  void on_new_check_request( const check_request & msg ) {
    // Теперь при сохранении фиксируем время.
    pending_requests_.push_back( pending_request{ clock::now(), msg } );
    send< try_create_next_analyzer >( *this );
  }

  void on_create_new_analyzer() {
    if( active_analyzers_ >= max_parallel_analyzers_ )
      return;

    lauch_new_analyzer();

    if( !pending_requests_.empty()
        && active_analyzers_ < max_parallel_analyzers_ )
      send< try_create_next_analyzer >( *this );
  }

  void on_analyzer_finished() {
    --active_analyzers_;

    if( !pending_requests_.empty() )
      lauch_new_analyzer();
  }

  void on_check_lifetime() {
    // Продолжать просмотр списка можно пока в нем есть элементы, которые
    // подлежат изъятию.
    while( !pending_requests_.empty() &&
      pending_requests_.front().stored_at_ + max_lifetime_ < clock::now() )
    {
      // Отсылаем неудачный результат проверки email-а самостоятельно.
      send< check_result >(
        pending_requests_.front().request_.reply_to_,
        pending_requests_.front().request_.email_file_,
        check_status::check_timedout );
      pending_requests_.pop_front();
    } 
  }
  
  void lauch_new_analyzer() {
    introduce_child_coop( *this,
      analyzers_disp_->binder( disp::thread_pool::bind_params_t() ),
      [this]( coop_t & coop ) {
        coop.make_agent< email_analyzer >(
          pending_requests_.front().request_.email_file_,
          pending_requests_.front().request_.reply_to_ );

        coop.add_dereg_notificator(
          [this]( environment_t &, const string &, const coop_dereg_reason_t & ) {
            send< analyzer_finished >( *this );
          } );
      } );

    ++active_analyzers_;

    pending_requests_.pop_front();
  }
};

Пожалуй, на этом пока можно прерваться дабы сохранить разумный объем статьи. В последующих статьях мы продолжим рассматривать этот пример и опишем более сложные реализации агентов, продемонстрировав некоторые специфические особенности SObjectizer-а.

Пока же можно отметить, что в показанных примерах мы уже вплотную столкнулись с одной из самых важных проблем, с которыми сталкивается разработчик, использующий Actorl Model: защита от перегрузок.

Эта проблема возникает, например, когда в системе оказывается слишком много агентов, для того, чтобы их события можно было нормально диспетчировать. Так, если мы позволяем создавать агентов email_analyzer без ограничения их количества, то в один прекрасный момент мы может оказаться в ситуации, когда несколько тысяч таких агентов ждут своей очереди на обработку события и ждут очень долго (счет может идти на минуты и даже десятки минут в самых патологических случаях). В данной статье мы показали один из самых действенных способов решения этого проявления проблемы перегрузок: ограничение на количество агентов и создание новых агентов только по мере появления подходящих для этого возможностей (по мере уничтожения старых агентов).

У проблемы перегрузки есть и другие проявления. Например, возникновение такого количества сообщений, которое приложение не успеет обработать за разумное время. Это так же очень неприятная проблема и SObjectizer предоставляет некоторые инструменты для борьбы с ней. Но более подробно этот вопрос мы затронем в одной из следующих статей.

Кроме проблемы перегрузок есть и еще одна проблема, присущая построенным на акторах/агентах системам: сложность обозримости происходящего в приложении. Это когда в приложении есть 100500 агентов, каждый из которых, вроде бы, работает правильно, но вот понять, работает ли все приложение должным образом, оказывается непросто. Этот вопрос мы так же затронем, но в последующих статьях.

Пока же мы надеемся на то, что приведенные в данной статье примеры и доводы оказались понятными. Ну, а если что-то осталось непонятным, то с удовольствием ответим на вопросы в комментариях.

Исходные коды к показанным в статье примерам можно найти в этом репозитории.

Комментарии (0)

© Habrahabr.ru