SObjectizer: от простого к сложному. Часть I
Для демонстрации выдумаем себе такую абстрактную задачу: есть имя файла с email-ом (грубо говоря, в этот файл сохранили все, что пришло по POP3 протоколу, включая заголовки, тело письма, аттачи и пр). Нужно выдать результат оценки подозрительности содержимого этого файла: выглядит ли письмо безопасно или подозрительно, или же при попытке оценить его содержимое возникла какая-то проблема и актуальную оценку выдать нельзя. Задача абстрактная, любые совпадения с чем-либо похожим из реальной жизни являются непреднамеренной случайностью.
Естественно, таких имен с файлами email-ов у нас будет не одно и не два. Будет некий поток этих имен, с которым нужно разбираться. Желательно, используя возможности современного многоядерного железа, т. е. запуская обработку нескольких email-ов в параллель.
Схематично покажем, как эта задача может быть решена на SObjectizer-е «в лоб». После чего укажем проблемы выбранного подхода, сделаем следующую итерацию и т.д. Дабы в итоге на примерах подвести читателя к тому пониманию «удобного использования модели акторов на C++» которое у нас сложилось за десять лет работы с SObjectizer-ом в реальных проектах.
Для начала определимся с тем, как выдаются запросы на проверку файлов с email-ами и как возвращаются результаты проверок. Используем для этих целей простые сообщения:
// Избавляемся от необходимости указывать префиксы so_5 и std.
// В последующих примерах эти using-и дублировать не будем,
// подразумевая, что они уже выполнены.
using namespace so_5;
using namespace std;
using namespace chrono_literals;
// Сообщение для проверки одного файла с email-ом.
struct check_request {
// Имя проверяемого файла.
string email_file_;
// Кому нужно отослать результат проверки.
mbox_t reply_to_;
};
// Статус проверки, который будет возвращен в ответном сообщении.
enum class check_status {
safe,
suspicious,
dangerous,
check_failure,
check_timedout
};
// Сообщение с результатом проверки одного файла с email.
// Содержит не только статус проверки, но и имя проверяемого файла.
// Это имя нужно лишь для того, чтобы облегчить сопоставление
// получаемых результатов проверки.
struct check_result {
string email_file_;
check_status status_;
};
Получается, что когда нам нужно проверить email, мы отсылает сообщение check_request на некий mbox. В этом сообщении передается имя файла и обратный адрес, куда должен быть отослан результат проверки. Соответственно, следующим шагом нам нужно определить, куда же именно будут отсылаться сообщения check_request.
Можно, конечно же, создать одного агента, который бы получал все сообщения check_request и обрабатывал бы их самостоятельно. Но такой агент очень быстро стал бы узким местом. Поэтому мы сделаем так, что у нас будет один агент-менеджер, который получает сообщения check_request и под каждое полученное сообщение создает агента-анализатора. Именно агент-анализатор будет заниматься проверкой email-а, а агент-мендежер будет выполнять роль фабрики агентов-анализаторов.
Сходу можно написать самую простую версию агента-менеджера:
// Агент, который будет играть роль менеджера агентов email_analyzer.
class analyzer_manager final : public agent_t {
public :
analyzer_manager( context_t ctx ) : agent_t( ctx )
{
// Класс объявлен как final, поэтому подписки агента можно сделать
// прямо в конструкторе. Если бы final не было, подписки лучше было
// бы вынести в метод so_define_agent(), что упростило бы разработку
// производных классов.
so_subscribe_self()
// В этом случае тип сообщения, на который идет подписка,
// выводится автоматически.
.event( &analyzer_manager::on_new_check_request );
}
private :
void on_new_check_request( const check_request & msg ) {
// Создаем кооперацию с единственным агентом внутри.
// Эта кооперация будет дочерней для кооперации с агентом-менеджером.
// Т.е. SObjectizer Environment проконтролирует, чтобы кооперация с
// агентом-анализатором завершила свою работу перед тем, как
// завершит свою работу кооперация с агентом-менеджером.
introduce_child_coop( *this, [&]( coop_t & coop ) {
// В кооперацию будет входить всего один агент.
coop.make_agent< email_analyzer >( msg.email_file_, msg.reply_to_ );
} );
}
};
Для обработки email-ов нам нужно будет зарегистрировать в SObjectizer Environment экземпляр агента типа analyzer_manager и каким-то образом сделать его персональный mbox (т.н. direct_mbox) доступным для всех. Тот, кому нужно проверить email, отошлет на этот mbox сообщение check_request, сообщение дойдет до analyzer_manager, будет создан агент email_analyzer ну и дальше все, как и задумывалось…
Теперь нужно реализовать агента email_analyzer, который и будет производить анализ email-ов. Самое простое, что приходит в голову — это агент, который сам выполняет все операции: т.е. загружает содержимое из файла, парсит это содержимое на составные части (заголовки, тело, аттачи), анализирует все это и выдает заключение.
Фактически, агенту email_analyzer нужно будет определить только свою реализацию метода so_evt_start (), которая автоматически вызывается у каждого агента после того, как агент успешно регистрируется внутри SObjectizer Environment. Посему агент email_analyzer будет выглядеть очень просто:
// Агент для анализа содержимого одного email-а.
// Получает все нужные ему параметры в конструкторе,
// выполняет все свои действия в единственном методе so_evt_start.
class email_analyzer : public agent_t {
public :
email_analyzer( context_t ctx,
// Имя файла с email для анализа.
string email_file,
// Куда нужно отослать результат анализа.
mbox_t reply_to )
: agent_t(ctx), email_file_(move(email_file)), reply_to_(move(reply_to))
{}
virtual void so_evt_start() override {
try {
// Стадии обработки обозначаем лишь схематично.
auto raw_data = load_email_from_file( email_file_ );
auto parsed_data = parse_email( raw_data );
auto status = check_headers( parsed_data->headers() );
if( check_status::safe == status )
status = check_body( parsed_data->body() );
if( check_status::safe == status )
status = check_attachments( parsed_data->attachments() );
send< check_result >( reply_to_, email_file_, status );
}
catch( const exception & ) {
// В случае какой-либо ошибки отсылаем статус о невозможности
// проверки файла с email-ом по техническим причинам.
send< check_result >(
reply_to_, email_file_, check_status::check_failure );
}
// Больше мы не нужны, поэтому дерегистрируем кооперацию,
// в которой находимся.
so_deregister_agent_coop_normally();
}
private :
const string email_file_;
const mbox_t reply_to_;
};
Итак, у нас есть очень тривиальные реализации агентов analyzer_manager и email_analyzer. Которые, к сожалению, имеют несколько серьезных проблем.
Первая проблема состоит в том, что агенты email_analyzer не будут работать в параллель. Дело в том, что при их создании не указывается диспетчер, к которому они должны быть привязаны. Поэтому эти агенты автоматически привязываются к дефолтному диспетчеру SObjectizer Environment, а этот дефолтный диспетчер является однопоточным: т.е. у него всего одна рабочая нить, на которой последовательно запускаются события привязанных к диспетчеру агентов.
Поэтому, если мы хотим, чтобы агенты email_analyzer работали независимо друг от друга, нам нужно явно привязывать их к соответствующему типу диспетчера. В данном случае хорошо подойдет диспетчер с пулом рабочих потоков. Соответственно, кто-то должен создать экземпляр такого диспетчера и кто-то должен привязывать email_analyzer-ов к этому экземпляру. Очевидно, что этот кто-то — это агент analyzer_manager:
class analyzer_manager final : public agent_t {
public :
analyzer_manager( context_t ctx )
: agent_t( ctx )
, analyzers_disp_(
// Нужен приватный, т.е. видимый только нашему менеджеру
// диспетчер, на котором и будут работать агенты-анализаторы.
disp::thread_pool::create_private_disp(
// Указываем, в рамках какого SObjectizer Environment
// будет работать диспетчер. Это нужно для корректного запуска
// и останова диспетчера.
so_environment(),
// Просто захардкодим количество рабочих потоков для диспетчера.
// В реальном приложении это количество может быть вычислено
// на основании, например, thread::hardware_concurrency() или
// взято из конфигурации.
16 ) )
{
so_subscribe_self()
.event( &analyzer_manager::on_new_check_request );
}
private :
disp::thread_pool::private_dispatcher_handle_t analyzers_disp_;
void on_new_check_request( const check_request & msg ) {
introduce_child_coop( *this,
// Агент из новой кооперации будет автоматически привязан к приватному
// диспетчеру с пулом рабочих потоков (при привязке будут использоваться
// параметры по умолчанию).
analyzers_disp_->binder( disp::thread_pool::bind_params_t() ),
[&]( coop_t & coop ) {
// В кооперацию будет входить всего один агент.
coop.make_agent< email_analyzer >( msg.email_file_, msg.reply_to_ );
} );
}
};
Такая несложная модификация analyzer_manager позволила нам избавиться от первой проблемы. Но осталась еще и вторая: неконтролируемое создание неограниченного количества агентов email_analyzer.
Текущая реализация analyzer_manager работает по принципу: получил сообщение check_email с именем файла для проверки, создал агента email_analyzer и забыл про все. Но, очевидно, что для более-менее высокой нагрузки этот вариант не подходит. Если сразу создать 100500 агентов email_analyzer, которые будут работать на пуле из N рабочих потоков, то ничего хорошего кроме лишнего расхода памяти не будет. Лучше сразу ограничивать количество одновременно работающих агентов и создавать новых после того, как завершают работу предыдущие. Плюс хранить очередь заданий на обработку, из которой и будут браться элементы для новых агентов.
Поэтому еще раз модифицируем нашего analyzer_manager-а: добавим в него очередь запросов и ограничение на количество одновременно работающих агентов.
class analyzer_manager final : public agent_t {
// Этот сигнал нам нужен для того, чтобы мы могли попробовать
// запустить в работу очередной анализатор.
struct try_create_next_analyzer : public signal_t {};
// А этот сигнал будет информировать нас о том, что очередной
// анализатор завершил свою работу.
struct analyzer_finished : public signal_t {};
public :
analyzer_manager( context_t ctx )
: agent_t( ctx )
, analyzers_disp_(
disp::thread_pool::create_private_disp( so_environment(), 16 ) )
{
so_subscribe_self()
.event( &analyzer_manager::on_new_check_request )
// А в этом случае метод-обработчик не имеет параметров,
// поэтому тип сигнала-инцидента указывается явно.
.event< try_create_next_analyzer >( &analyzer_manager::on_create_new_analyzer )
.event< analyzer_finished >( &analyzer_manager::on_analyzer_finished );
}
private :
const size_t max_parallel_analyzers_{ 16 };
size_t active_analyzers_{ 0 };
disp::thread_pool::private_dispatcher_handle_t analyzers_disp_;
list< check_request > pending_requests_;
void on_new_check_request( const check_request & msg ) {
// Работаем по очень простой схеме: сперва сохраняем очередной
// запрос в список ожидания, затем отсылаем себе сигнал для
// попытки запустить очередного обработчика.
// И создавать агента-анализатора будем уже при обработке сигнала.
pending_requests_.push_back( msg );
// Отсылаем сигнал сами себе.
send< try_create_next_analyzer >( *this );
}
void on_create_new_analyzer() {
// Запустить новый анализатор можно только если еще не достигнут
// лимит на их количество.
if( active_analyzers_ >= max_parallel_analyzers_ )
return;
lauch_new_analyzer();
// Если список не пуст и возможность стартовать анализаторов
// сохраняется, то продолжим это делать.
if( !pending_requests_.empty()
&& active_analyzers_ < max_parallel_analyzers_ )
send< try_create_next_analyzer >( *this );
}
void on_analyzer_finished() {
// Фиксируем факт, что анализаторов стало меньше.
--active_analyzers_;
// Если есть, что запускать на обработку, делаем это.
if( !pending_requests_.empty() )
lauch_new_analyzer();
}
void lauch_new_analyzer() {
introduce_child_coop( *this,
analyzers_disp_->binder( disp::thread_pool::bind_params_t() ),
[this]( coop_t & coop ) {
coop.make_agent< email_analyzer >(
pending_requests_.front().email_file_,
pending_requests_.front().reply_to_ );
// Нам нужно автоматически получить уведомление, когда эта кооперация
// перестанет работать. Для чего мы назначаем специальный нотификатор,
// задачей которого будет отсылка сигнала analyzer_finished.
coop.add_dereg_notificator(
// Нотификатор получает ряд параметров, но нам они сейчас не нужны.
[this]( environment_t &, const string &, const coop_dereg_reason_t & ) {
send< analyzer_finished >( *this );
} );
} );
// Фиксируем тот факт, что анализаторов стало больше.
++active_analyzers_;
// Соответствующую заявку в списке ожидания больше хранить не нужно.
pending_requests_.pop_front();
}
};
В принципе, мы получили более-менее нормальное решение, которое можно было бы счесть удовлетворительным. Если бы не одно «но».
Это «но» состоит в том, что хотя у нас и есть возможность запускать в параллельную работу несколько агентов-анализаторов, распараллеливание получится так себе. Если, скажем, одновременно стартуют пять агентов, то все пятеро сразу же начнут I/O операции и пока эти операции будут выполняться, никто не сможет делать ничего другого. Потом I/O операции закончатся и все пятеро агентов начнут разбор прочитанных с диска данных. Тем самым займут процессор. Этим можно было бы воспользоваться для того, чтобы начать I/O операции для следующих нескольких агентов-анализаторов. Но мы не можем этого сделать, пока первые пять агентов заняты своей работой.
Решить эту проблему можно, если изъять из email_analyzer I/O операцию. Вместо того, чтобы загружать данные из файла самостоятельно, агент email_analyzer может делегировать эту задачу специальному IO-агенту. Т.е. агент email_analyzer стартует, отсылает сообщение IO-агенту и затем получает результат I/O операции в виде ответного сообщения. Тем самым предоставляя возможность другому email_analyzer-у выполнить свою часть работы (отослать сообщение IO-агенту или обработать ответное сообщение от IO-агента). Но разговор о том, как это будет выглядеть и насколько хорошим окажется такое решение мы продолжим в следующей статье.
Пока же можно показать одну важную возможность, которую мы получили в своей текущей реализации агента-менеджера с его списком ожидания: мы можем легко контролировать время ожидания запросов в этом списке.
Действительно, у операции проверки письма наверняка будут какие-то разумные пределы времени ожидания ответа. И если за это время оценить безопасность не удалось, то, скорее всего, и не нужно будет пытаться это делать. Исходя из этого мы можем легко модифицировать агента-менеджера так, чтобы он выбрасывал из списка ожидания те запросы, которые провели в ожидании слишком много времени (например, больше 10 секунд). Для этого задействуем периодическое сообщение, которое будет приходить к менеджеру два раза в секунду. Получив это сообщение менеджер пробежится по списку ожидания и выбросит те запросы, которые ждали больше 10 секунд. Подход, конечно, не очень точный, но зато очень простой и надежный:
class analyzer_manager final : public agent_t {
struct try_create_next_analyzer : public signal_t {};
struct analyzer_finished : public signal_t {};
// Потребуется еще один сигнал для таймера проверки времени жизни
// заявки в списке ожидания.
struct check_lifetime : public signal_t {};
// Кроме того, нам потребуется другая структура для хранения заявки
// в списке ожидания. Кроме самой заявки нужно будет хранить еще
// и время поступления в список ожидания.
using clock = chrono::steady_clock;
struct pending_request {
clock::time_point stored_at_;
check_request request_;
};
public :
analyzer_manager( context_t ctx )
: agent_t( ctx )
, analyzers_disp_(
disp::thread_pool::create_private_disp( so_environment(), 16 ) )
{
so_subscribe_self()
.event( &analyzer_manager::on_new_check_request )
.event< try_create_next_analyzer >( &analyzer_manager::on_create_new_analyzer )
.event< analyzer_finished >( &analyzer_manager::on_analyzer_finished )
// Для обработки таймера нам нужен еще одно событие-обработчик.
.event< check_lifetime >( &analyzer_manager::on_check_lifetime );
}
// Используем стартовый метод для того, чтобы запустить периодический таймер.
virtual void so_evt_start() override {
// Для периодических таймеров нужно сохранять возвращаемый timer_id,
// иначе таймер будет автоматически отменен.
check_lifetime_timer_ = send_periodic< check_lifetime >( *this, 500ms, 500ms );
}
private :
const size_t max_parallel_analyzers_{ 16 };
size_t active_analyzers_{ 0 };
disp::thread_pool::private_dispatcher_handle_t analyzers_disp_;
// Ограничение на время пребывания заявки в списке ожидания.
const chrono::seconds max_lifetime_{ 10 };
// Идентификатор таймера для периодического сигнала check_lifetime.
timer_id_t check_lifetime_timer_;
list< pending_request > pending_requests_;
void on_new_check_request( const check_request & msg ) {
// Теперь при сохранении фиксируем время.
pending_requests_.push_back( pending_request{ clock::now(), msg } );
send< try_create_next_analyzer >( *this );
}
void on_create_new_analyzer() {
if( active_analyzers_ >= max_parallel_analyzers_ )
return;
lauch_new_analyzer();
if( !pending_requests_.empty()
&& active_analyzers_ < max_parallel_analyzers_ )
send< try_create_next_analyzer >( *this );
}
void on_analyzer_finished() {
--active_analyzers_;
if( !pending_requests_.empty() )
lauch_new_analyzer();
}
void on_check_lifetime() {
// Продолжать просмотр списка можно пока в нем есть элементы, которые
// подлежат изъятию.
while( !pending_requests_.empty() &&
pending_requests_.front().stored_at_ + max_lifetime_ < clock::now() )
{
// Отсылаем неудачный результат проверки email-а самостоятельно.
send< check_result >(
pending_requests_.front().request_.reply_to_,
pending_requests_.front().request_.email_file_,
check_status::check_timedout );
pending_requests_.pop_front();
}
}
void lauch_new_analyzer() {
introduce_child_coop( *this,
analyzers_disp_->binder( disp::thread_pool::bind_params_t() ),
[this]( coop_t & coop ) {
coop.make_agent< email_analyzer >(
pending_requests_.front().request_.email_file_,
pending_requests_.front().request_.reply_to_ );
coop.add_dereg_notificator(
[this]( environment_t &, const string &, const coop_dereg_reason_t & ) {
send< analyzer_finished >( *this );
} );
} );
++active_analyzers_;
pending_requests_.pop_front();
}
};
Пожалуй, на этом пока можно прерваться дабы сохранить разумный объем статьи. В последующих статьях мы продолжим рассматривать этот пример и опишем более сложные реализации агентов, продемонстрировав некоторые специфические особенности SObjectizer-а.
Пока же можно отметить, что в показанных примерах мы уже вплотную столкнулись с одной из самых важных проблем, с которыми сталкивается разработчик, использующий Actorl Model: защита от перегрузок.
Эта проблема возникает, например, когда в системе оказывается слишком много агентов, для того, чтобы их события можно было нормально диспетчировать. Так, если мы позволяем создавать агентов email_analyzer без ограничения их количества, то в один прекрасный момент мы может оказаться в ситуации, когда несколько тысяч таких агентов ждут своей очереди на обработку события и ждут очень долго (счет может идти на минуты и даже десятки минут в самых патологических случаях). В данной статье мы показали один из самых действенных способов решения этого проявления проблемы перегрузок: ограничение на количество агентов и создание новых агентов только по мере появления подходящих для этого возможностей (по мере уничтожения старых агентов).
У проблемы перегрузки есть и другие проявления. Например, возникновение такого количества сообщений, которое приложение не успеет обработать за разумное время. Это так же очень неприятная проблема и SObjectizer предоставляет некоторые инструменты для борьбы с ней. Но более подробно этот вопрос мы затронем в одной из следующих статей.
Кроме проблемы перегрузок есть и еще одна проблема, присущая построенным на акторах/агентах системам: сложность обозримости происходящего в приложении. Это когда в приложении есть 100500 агентов, каждый из которых, вроде бы, работает правильно, но вот понять, работает ли все приложение должным образом, оказывается непросто. Этот вопрос мы так же затронем, но в последующих статьях.
Пока же мы надеемся на то, что приведенные в данной статье примеры и доводы оказались понятными. Ну, а если что-то осталось непонятным, то с удовольствием ответим на вопросы в комментариях.
Исходные коды к показанным в статье примерам можно найти в этом репозитории.