SObjectizer: от простого к сложному. Часть I

    В первой статье мы рассказали о том, что такое SObjectizer и почему он получился именно таким. Во второй – попробуем показать, как может выглядеть более-менее реальный код на SObjectizer. С демонстрацией того, в какую сторону этот код обычно эволюционирует. Ибо первоначально, когда у разработчика появляется возможность работать с Actor Model, он начинает этой возможностью злоупотреблять, создавая проблемы и себе, и тем, кто будет эксплуатировать программный продукт, написанный в стиле «актор на каждый чих». Только спустя некоторое время и некоторое количество набитых шишек приходит понимание того, что прелесть модели акторов вовсе не в возможности создавать их десятками тысяч или даже просто тысячами. Но давайте пойдем последовательно, не опережая события.

    Для демонстрации выдумаем себе такую абстрактную задачу: есть имя файла с email-ом (грубо говоря, в этот файл сохранили все, что пришло по POP3 протоколу, включая заголовки, тело письма, аттачи и пр). Нужно выдать результат оценки подозрительности содержимого этого файла: выглядит ли письмо безопасно или подозрительно, или же при попытке оценить его содержимое возникла какая-то проблема и актуальную оценку выдать нельзя. Задача абстрактная, любые совпадения с чем-либо похожим из реальной жизни являются непреднамеренной случайностью.

    Естественно, таких имен с файлами email-ов у нас будет не одно и не два. Будет некий поток этих имен, с которым нужно разбираться. Желательно, используя возможности современного многоядерного железа, т. е. запуская обработку нескольких email-ов в параллель.

    Схематично покажем, как эта задача может быть решена на SObjectizer-е «в лоб». После чего укажем проблемы выбранного подхода, сделаем следующую итерацию и т.д. Дабы в итоге на примерах подвести читателя к тому пониманию «удобного использования модели акторов на C++» которое у нас сложилось за десять лет работы с SObjectizer-ом в реальных проектах.

    Для начала определимся с тем, как выдаются запросы на проверку файлов с email-ами и как возвращаются результаты проверок. Используем для этих целей простые сообщения:

    // Избавляемся от необходимости указывать префиксы so_5 и std.
    // В последующих примерах эти using-и дублировать не будем,
    // подразумевая, что они уже выполнены.
    using namespace so_5;
    using namespace std;
    using namespace chrono_literals;
    
    // Сообщение для проверки одного файла с email-ом.
    struct check_request {
      // Имя проверяемого файла.
      string email_file_;
      // Кому нужно отослать результат проверки.
      mbox_t reply_to_;
    };
    
    // Статус проверки, который будет возвращен в ответном сообщении.
    enum class check_status {
      safe,
      suspicious,
      dangerous,
      check_failure,
      check_timedout
    };
    
    // Сообщение с результатом проверки одного файла с email.
    // Содержит не только статус проверки, но и имя проверяемого файла.
    // Это имя нужно лишь для того, чтобы облегчить сопоставление
    // получаемых результатов проверки.
    struct check_result {
      string email_file_;
      check_status status_;
    };
    

    Получается, что когда нам нужно проверить email, мы отсылает сообщение check_request на некий mbox. В этом сообщении передается имя файла и обратный адрес, куда должен быть отослан результат проверки. Соответственно, следующим шагом нам нужно определить, куда же именно будут отсылаться сообщения check_request.

    Можно, конечно же, создать одного агента, который бы получал все сообщения check_request и обрабатывал бы их самостоятельно. Но такой агент очень быстро стал бы узким местом. Поэтому мы сделаем так, что у нас будет один агент-менеджер, который получает сообщения check_request и под каждое полученное сообщение создает агента-анализатора. Именно агент-анализатор будет заниматься проверкой email-а, а агент-мендежер будет выполнять роль фабрики агентов-анализаторов.

    Сходу можно написать самую простую версию агента-менеджера:
    // Агент, который будет играть роль менеджера агентов email_analyzer.
    class analyzer_manager final : public agent_t {
    public :
      analyzer_manager( context_t ctx ) : agent_t( ctx )
      {
        // Класс объявлен как final, поэтому подписки агента можно сделать
        // прямо в конструкторе. Если бы final не было, подписки лучше было
        // бы вынести в метод so_define_agent(), что упростило бы разработку
        // производных классов.
        so_subscribe_self()
          // В этом случае тип сообщения, на который идет подписка,
          // выводится автоматически.
          .event( &analyzer_manager::on_new_check_request );
      }
    
    private :
      void on_new_check_request( const check_request & msg ) {
        // Создаем кооперацию с единственным агентом внутри.
        // Эта кооперация будет дочерней для кооперации с агентом-менеджером.
        // Т.е. SObjectizer Environment проконтролирует, чтобы кооперация с
        // агентом-анализатором завершила свою работу перед тем, как
        // завершит свою работу кооперация с агентом-менеджером.
        introduce_child_coop( *this, [&]( coop_t & coop ) {
            // В кооперацию будет входить всего один агент.
            coop.make_agent< email_analyzer >( msg.email_file_, msg.reply_to_ );
          } );
      }
    };
    

    Для обработки email-ов нам нужно будет зарегистрировать в SObjectizer Environment экземпляр агента типа analyzer_manager и каким-то образом сделать его персональный mbox (т.н. direct_mbox) доступным для всех. Тот, кому нужно проверить email, отошлет на этот mbox сообщение check_request, сообщение дойдет до analyzer_manager, будет создан агент email_analyzer ну и дальше все, как и задумывалось...

    Теперь нужно реализовать агента email_analyzer, который и будет производить анализ email-ов. Самое простое, что приходит в голову – это агент, который сам выполняет все операции: т.е. загружает содержимое из файла, парсит это содержимое на составные части (заголовки, тело, аттачи), анализирует все это и выдает заключение.

    Фактически, агенту email_analyzer нужно будет определить только свою реализацию метода so_evt_start(), которая автоматически вызывается у каждого агента после того, как агент успешно регистрируется внутри SObjectizer Environment. Посему агент email_analyzer будет выглядеть очень просто:
    // Агент для анализа содержимого одного email-а.
    // Получает все нужные ему параметры в конструкторе,
    // выполняет все свои действия в единственном методе so_evt_start.
    class email_analyzer : public agent_t {
    public :
      email_analyzer( context_t ctx,
        // Имя файла с email для анализа.
        string email_file,
        // Куда нужно отослать результат анализа.
        mbox_t reply_to )
        : agent_t(ctx), email_file_(move(email_file)), reply_to_(move(reply_to))
      {}
    
      virtual void so_evt_start() override {
        try {
          // Стадии обработки обозначаем лишь схематично.
          auto raw_data = load_email_from_file( email_file_ );
          auto parsed_data = parse_email( raw_data );
          auto status = check_headers( parsed_data->headers() );
          if( check_status::safe == status )
            status = check_body( parsed_data->body() );
          if( check_status::safe == status )
            status = check_attachments( parsed_data->attachments() );
          send< check_result >( reply_to_, email_file_, status );
        }
        catch( const exception & ) {
          // В случае какой-либо ошибки отсылаем статус о невозможности
          // проверки файла с email-ом по техническим причинам.
          send< check_result >(
              reply_to_, email_file_, check_status::check_failure );
        }
        // Больше мы не нужны, поэтому дерегистрируем кооперацию,
        // в которой находимся.
        so_deregister_agent_coop_normally();
      }
    
    private :
      const string email_file_;
      const mbox_t reply_to_;
    };
    

    Итак, у нас есть очень тривиальные реализации агентов analyzer_manager и email_analyzer. Которые, к сожалению, имеют несколько серьезных проблем.

    Первая проблема состоит в том, что агенты email_analyzer не будут работать в параллель. Дело в том, что при их создании не указывается диспетчер, к которому они должны быть привязаны. Поэтому эти агенты автоматически привязываются к дефолтному диспетчеру SObjectizer Environment, а этот дефолтный диспетчер является однопоточным: т.е. у него всего одна рабочая нить, на которой последовательно запускаются события привязанных к диспетчеру агентов.

    Поэтому, если мы хотим, чтобы агенты email_analyzer работали независимо друг от друга, нам нужно явно привязывать их к соответствующему типу диспетчера. В данном случае хорошо подойдет диспетчер с пулом рабочих потоков. Соответственно, кто-то должен создать экземпляр такого диспетчера и кто-то должен привязывать email_analyzer-ов к этому экземпляру. Очевидно, что этот кто-то – это агент analyzer_manager:
    class analyzer_manager final : public agent_t {
    public :
      analyzer_manager( context_t ctx )
        : agent_t( ctx )
        , analyzers_disp_(
            // Нужен приватный, т.е. видимый только нашему менеджеру
            // диспетчер, на котором и будут работать агенты-анализаторы.
            disp::thread_pool::create_private_disp(
              // Указываем, в рамках какого SObjectizer Environment
              // будет работать диспетчер. Это нужно для корректного запуска
              // и останова диспетчера.
              so_environment(),
              // Просто захардкодим количество рабочих потоков для диспетчера.
              // В реальном приложении это количество может быть вычислено
              // на основании, например, thread::hardware_concurrency() или
              // взято из конфигурации.
              16 ) )
      {
        so_subscribe_self()
          .event( &analyzer_manager::on_new_check_request );
      }
    
    private :
      disp::thread_pool::private_dispatcher_handle_t analyzers_disp_;
    
      void on_new_check_request( const check_request & msg ) {
        introduce_child_coop( *this,
          // Агент из новой кооперации будет автоматически привязан к приватному
          // диспетчеру с пулом рабочих потоков (при привязке будут использоваться
          // параметры по умолчанию).
          analyzers_disp_->binder( disp::thread_pool::bind_params_t() ),
          [&]( coop_t & coop ) {
            // В кооперацию будет входить всего один агент.
            coop.make_agent< email_analyzer >( msg.email_file_, msg.reply_to_ );
          } );
      }
    };
    

    Такая несложная модификация analyzer_manager позволила нам избавиться от первой проблемы. Но осталась еще и вторая: неконтролируемое создание неограниченного количества агентов email_analyzer.

    Текущая реализация analyzer_manager работает по принципу: получил сообщение check_email с именем файла для проверки, создал агента email_analyzer и забыл про все. Но, очевидно, что для более-менее высокой нагрузки этот вариант не подходит. Если сразу создать 100500 агентов email_analyzer, которые будут работать на пуле из N рабочих потоков, то ничего хорошего кроме лишнего расхода памяти не будет. Лучше сразу ограничивать количество одновременно работающих агентов и создавать новых после того, как завершают работу предыдущие. Плюс хранить очередь заданий на обработку, из которой и будут браться элементы для новых агентов.

    Поэтому еще раз модифицируем нашего analyzer_manager-а: добавим в него очередь запросов и ограничение на количество одновременно работающих агентов.
    class analyzer_manager final : public agent_t {
      // Этот сигнал нам нужен для того, чтобы мы могли попробовать
      // запустить в работу очередной анализатор.
      struct try_create_next_analyzer : public signal_t {};
      // А этот сигнал будет информировать нас о том, что очередной
      // анализатор завершил свою работу.
      struct analyzer_finished : public signal_t {};
    
    public :
      analyzer_manager( context_t ctx )
        : agent_t( ctx )
        , analyzers_disp_(
            disp::thread_pool::create_private_disp( so_environment(), 16 ) )
      {
        so_subscribe_self()
          .event( &analyzer_manager::on_new_check_request )
          // А в этом случае метод-обработчик не имеет параметров,
          // поэтому тип сигнала-инцидента указывается явно.
          .event< try_create_next_analyzer >( &analyzer_manager::on_create_new_analyzer )
          .event< analyzer_finished >( &analyzer_manager::on_analyzer_finished );
      }
    
    private :
      const size_t max_parallel_analyzers_{ 16 };
      size_t active_analyzers_{ 0 };
    
      disp::thread_pool::private_dispatcher_handle_t analyzers_disp_;
    
      list< check_request > pending_requests_;
    
      void on_new_check_request( const check_request & msg ) {
        // Работаем по очень простой схеме: сперва сохраняем очередной
        // запрос в список ожидания, затем отсылаем себе сигнал для
        // попытки запустить очередного обработчика.
        // И создавать агента-анализатора будем уже при обработке сигнала.
        pending_requests_.push_back( msg );
        // Отсылаем сигнал сами себе.
        send< try_create_next_analyzer >( *this );
      }
    
      void on_create_new_analyzer() {
        // Запустить новый анализатор можно только если еще не достигнут
        // лимит на их количество.
        if( active_analyzers_ >= max_parallel_analyzers_ )
          return;
    
        lauch_new_analyzer();
    
        // Если список не пуст и возможность стартовать анализаторов
        // сохраняется, то продолжим это делать.
        if( !pending_requests_.empty()
            && active_analyzers_ < max_parallel_analyzers_ )
          send< try_create_next_analyzer >( *this );
      }
    
      void on_analyzer_finished() {
        // Фиксируем факт, что анализаторов стало меньше.
        --active_analyzers_;
    
        // Если есть, что запускать на обработку, делаем это.
        if( !pending_requests_.empty() )
          lauch_new_analyzer();
      }
    
      void lauch_new_analyzer() {
        introduce_child_coop( *this,
          analyzers_disp_->binder( disp::thread_pool::bind_params_t() ),
          [this]( coop_t & coop ) {
            coop.make_agent< email_analyzer >(
              pending_requests_.front().email_file_,
              pending_requests_.front().reply_to_ );
    
            // Нам нужно автоматически получить уведомление, когда эта кооперация
            // перестанет работать. Для чего мы назначаем специальный нотификатор,
            // задачей которого будет отсылка сигнала analyzer_finished.
            coop.add_dereg_notificator(
              // Нотификатор получает ряд параметров, но нам они сейчас не нужны.
              [this]( environment_t &, const string &, const coop_dereg_reason_t & ) {
                send< analyzer_finished >( *this );
              } );
          } );
    
        // Фиксируем тот факт, что анализаторов стало больше.
        ++active_analyzers_;
    
        // Соответствующую заявку в списке ожидания больше хранить не нужно.
        pending_requests_.pop_front();
      }
    };
    

    В принципе, мы получили более-менее нормальное решение, которое можно было бы счесть удовлетворительным. Если бы не одно «но».

    Это «но» состоит в том, что хотя у нас и есть возможность запускать в параллельную работу несколько агентов-анализаторов, распараллеливание получится так себе. Если, скажем, одновременно стартуют пять агентов, то все пятеро сразу же начнут I/O операции и пока эти операции будут выполняться, никто не сможет делать ничего другого. Потом I/O операции закончатся и все пятеро агентов начнут разбор прочитанных с диска данных. Тем самым займут процессор. Этим можно было бы воспользоваться для того, чтобы начать I/O операции для следующих нескольких агентов-анализаторов. Но мы не можем этого сделать, пока первые пять агентов заняты своей работой.

    Решить эту проблему можно, если изъять из email_analyzer I/O операцию. Вместо того, чтобы загружать данные из файла самостоятельно, агент email_analyzer может делегировать эту задачу специальному IO-агенту. Т.е. агент email_analyzer стартует, отсылает сообщение IO-агенту и затем получает результат I/O операции в виде ответного сообщения. Тем самым предоставляя возможность другому email_analyzer-у выполнить свою часть работы (отослать сообщение IO-агенту или обработать ответное сообщение от IO-агента). Но разговор о том, как это будет выглядеть и насколько хорошим окажется такое решение мы продолжим в следующей статье.

    Пока же можно показать одну важную возможность, которую мы получили в своей текущей реализации агента-менеджера с его списком ожидания: мы можем легко контролировать время ожидания запросов в этом списке.

    Действительно, у операции проверки письма наверняка будут какие-то разумные пределы времени ожидания ответа. И если за это время оценить безопасность не удалось, то, скорее всего, и не нужно будет пытаться это делать. Исходя из этого мы можем легко модифицировать агента-менеджера так, чтобы он выбрасывал из списка ожидания те запросы, которые провели в ожидании слишком много времени (например, больше 10 секунд). Для этого задействуем периодическое сообщение, которое будет приходить к менеджеру два раза в секунду. Получив это сообщение менеджер пробежится по списку ожидания и выбросит те запросы, которые ждали больше 10 секунд. Подход, конечно, не очень точный, но зато очень простой и надежный:
    class analyzer_manager final : public agent_t {
      struct try_create_next_analyzer : public signal_t {};
      struct analyzer_finished : public signal_t {};
    
      // Потребуется еще один сигнал для таймера проверки времени жизни
      // заявки в списке ожидания.
      struct check_lifetime : public signal_t {};
    
      // Кроме того, нам потребуется другая структура для хранения заявки
      // в списке ожидания. Кроме самой заявки нужно будет хранить еще
      // и время поступления в список ожидания.
      using clock = chrono::steady_clock;
      struct pending_request {
        clock::time_point stored_at_;
        check_request request_;
      };
    
    public :
      analyzer_manager( context_t ctx )
        : agent_t( ctx )
        , analyzers_disp_(
            disp::thread_pool::create_private_disp( so_environment(), 16 ) )
      {
        so_subscribe_self()
          .event( &analyzer_manager::on_new_check_request )
          .event< try_create_next_analyzer >( &analyzer_manager::on_create_new_analyzer )
          .event< analyzer_finished >( &analyzer_manager::on_analyzer_finished )
          // Для обработки таймера нам нужен еще одно событие-обработчик.
          .event< check_lifetime >( &analyzer_manager::on_check_lifetime );
      }
    
      // Используем стартовый метод для того, чтобы запустить периодический таймер.
      virtual void so_evt_start() override {
        // Для периодических таймеров нужно сохранять возвращаемый timer_id,
        // иначе таймер будет автоматически отменен.
        check_lifetime_timer_ = send_periodic< check_lifetime >( *this, 500ms, 500ms );
      }
    
    private :
      const size_t max_parallel_analyzers_{ 16 };
      size_t active_analyzers_{ 0 };
    
      disp::thread_pool::private_dispatcher_handle_t analyzers_disp_;
    
      // Ограничение на время пребывания заявки в списке ожидания.
      const chrono::seconds max_lifetime_{ 10 };
      // Идентификатор таймера для периодического сигнала check_lifetime.
      timer_id_t check_lifetime_timer_;
    
      list< pending_request > pending_requests_;
    
      void on_new_check_request( const check_request & msg ) {
        // Теперь при сохранении фиксируем время.
        pending_requests_.push_back( pending_request{ clock::now(), msg } );
        send< try_create_next_analyzer >( *this );
      }
    
      void on_create_new_analyzer() {
        if( active_analyzers_ >= max_parallel_analyzers_ )
          return;
    
        lauch_new_analyzer();
    
        if( !pending_requests_.empty()
            && active_analyzers_ < max_parallel_analyzers_ )
          send< try_create_next_analyzer >( *this );
      }
    
      void on_analyzer_finished() {
        --active_analyzers_;
    
        if( !pending_requests_.empty() )
          lauch_new_analyzer();
      }
    
      void on_check_lifetime() {
        // Продолжать просмотр списка можно пока в нем есть элементы, которые
        // подлежат изъятию.
        while( !pending_requests_.empty() &&
          pending_requests_.front().stored_at_ + max_lifetime_ < clock::now() )
        {
          // Отсылаем неудачный результат проверки email-а самостоятельно.
          send< check_result >(
            pending_requests_.front().request_.reply_to_,
            pending_requests_.front().request_.email_file_,
            check_status::check_timedout );
          pending_requests_.pop_front();
        } 
      }
      
      void lauch_new_analyzer() {
        introduce_child_coop( *this,
          analyzers_disp_->binder( disp::thread_pool::bind_params_t() ),
          [this]( coop_t & coop ) {
            coop.make_agent< email_analyzer >(
              pending_requests_.front().request_.email_file_,
              pending_requests_.front().request_.reply_to_ );
    
            coop.add_dereg_notificator(
              [this]( environment_t &, const string &, const coop_dereg_reason_t & ) {
                send< analyzer_finished >( *this );
              } );
          } );
    
        ++active_analyzers_;
    
        pending_requests_.pop_front();
      }
    };
    

    Пожалуй, на этом пока можно прерваться дабы сохранить разумный объем статьи. В последующих статьях мы продолжим рассматривать этот пример и опишем более сложные реализации агентов, продемонстрировав некоторые специфические особенности SObjectizer-а.

    Пока же можно отметить, что в показанных примерах мы уже вплотную столкнулись с одной из самых важных проблем, с которыми сталкивается разработчик, использующий Actorl Model: защита от перегрузок.

    Эта проблема возникает, например, когда в системе оказывается слишком много агентов, для того, чтобы их события можно было нормально диспетчировать. Так, если мы позволяем создавать агентов email_analyzer без ограничения их количества, то в один прекрасный момент мы может оказаться в ситуации, когда несколько тысяч таких агентов ждут своей очереди на обработку события и ждут очень долго (счет может идти на минуты и даже десятки минут в самых патологических случаях). В данной статье мы показали один из самых действенных способов решения этого проявления проблемы перегрузок: ограничение на количество агентов и создание новых агентов только по мере появления подходящих для этого возможностей (по мере уничтожения старых агентов).

    У проблемы перегрузки есть и другие проявления. Например, возникновение такого количества сообщений, которое приложение не успеет обработать за разумное время. Это так же очень неприятная проблема и SObjectizer предоставляет некоторые инструменты для борьбы с ней. Но более подробно этот вопрос мы затронем в одной из следующих статей.

    Кроме проблемы перегрузок есть и еще одна проблема, присущая построенным на акторах/агентах системам: сложность обозримости происходящего в приложении. Это когда в приложении есть 100500 агентов, каждый из которых, вроде бы, работает правильно, но вот понять, работает ли все приложение должным образом, оказывается непросто. Этот вопрос мы так же затронем, но в последующих статьях.

    Пока же мы надеемся на то, что приведенные в данной статье примеры и доводы оказались понятными. Ну а если что-то осталось непонятным, то с удовольствием ответим на вопросы в комментариях.

    Исходные коды к показанным в статье примерам можно найти в этом репозитории.
    Поделиться публикацией
    Похожие публикации
    Реклама помогает поддерживать и развивать наши сервисы

    Подробнее
    Реклама
    Комментарии 0

    Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.