SObjectizer: от простого к сложному. Часть III

    В очередной статье про SObjectizer продолжим следить за эволюцией простого поначалу агента, который все более и более усложняется по мере своего развития. Рассмотрим, как быть с отложенными сообщениями, в которых мы больше не заинтересованы. И воспользуемся некоторой функциональностью иерархических конечных автоматов.



    В предыдущей статье мы остановились на том, что у нас появился агент email_analyzer, который можно считать более-менее надежно решающим свою задачу. Однако, он сам, последовательно, выполняет три стадии проверки email-а: сперва проверяет заголовки, затем содержимое, затем аттачи.


    Скорее всего, каждая из этих операций не будет исключительно CPU-bound. Намного вероятнее, что вычленив какие-то значения из проверяемого фрагмента (например, из заголовков письма), потребуется сделать куда-то запрос для проверки допустимости этого значения. Например, запрос в БД дабы проверить, нет ли имени хоста-отправителя в черном списке. Пока будет выполняться данный запрос можно было бы выполнить еще какую-то операцию, например, разобрать содержимое текста письма на отдельные ключевые фразы, дабы их можно было проверить по какому-то словарю спам-маркеров. Или проверить, есть ли в аттачах архивы, и инициировать их проверку антивирусом. В общем, имеет смысл распараллелить операции анализа email-а.


    Давайте попробуем задействовать отдельных агентов на каждую операцию. Т.е. можно написать агентов вида:


    class email_headers_checker : public agent_t {
    public :
      struct result { check_status status_ }; /* Сообщение с результатом */
      email_headers_checker( context_t ctx, ... /* Какие-то параметры */ ) {...}
      virtual void so_evt_start() override {
        ... /* Иницирование операций по проверке заголовков */
      }
    ... /* Какие-то детали реализации */
    };
    class email_body_checker : public agent_t {...};
    class email_attachment_checker : public agent_t {...};
    

    Каждый такой агент будет выполнять специфические для своей операции действия, а затем отошлет результат email_analyzer в виде сообщения. Нашему email_analyzer потребуется создать экземпляры этих агентов у себя и дождаться от них сообщений с результатами анализа:


    void on_load_succeed( const load_email_succeed & msg ) {
      try {
        auto parsed_data = parse_email( msg.content_ );
        introduce_child_coop( *this,
          // Агенты-checker-ы будут работать на своем собственном
          // thread-pool-диспетчере, который был создан заранее
          // под специальным именем.
          disp::thread_pool::create_disp_binder(
              "checkers", disp::thread_pool::bind_params_t{} ),
          [&]( coop_t & coop ) {
            coop.make_agent< email_headers_checker >(
                so_direct_mbox(), parsed_data->headers() );
            coop.make_agent< email_body_checker >(
                so_direct_mbox(), parsed_data->body() );
            coop.make_agent< email_attach_checker >(
                so_direct_mbox(), parsed_data->attachments() );
          } );
      }
      catch( const exception & ) {...}
    }
    

    Тех, кто внимательно читал предыдущие статьи, фраза «дождаться от них сообщений» должна была бы насторожить. Ждать без ограничения времени не есть хорошо, это прямой путь получить зря болтающегося в системе и ничего не делающего агента. Поэтому при ожидании ответов от checker-ов нам имеет смысл поступить так же, как и при ожидании результата IO-операции: отослать самим себе какой-то отложенный сигнал, получив который мы поймем, что дальше ждать бессмысленно. Т.е. нам надо было бы написать что-то вроде:


    // Попытка представить агента email_analyzer с двумя отложенными сигналами.
    class email_analyzer : public agent_t {
      // Этот сигнал потребуется для того, чтобы отслеживать отсутствие
      // ответа от IO-агента в течении разумного времени.
      struct io_agent_response_timeout : public signal_t {};
      // Этот сигнал потребуется для того, чтобы отслеживать отсутствие
      // результатов проверки отдельных частей email-а.
      struct checkers_responses_timeout : public signal_t {};
    ...
      virtual void so_evt_start() override {
        ... /* Отсылка запроса IO-агенту */
        // И сразу же начинаем отсчет тайм-аута для ответа от IO-агента.
        send_delayed< io_agent_response_timeout >( *this, 1500ms );
      }
    ...
      void on_load_succeed( const load_succeed & msg ) {
        ... /* Создание коопераций с агентами checker-ами */
        // Сразу же начинаем отсчет тайм-аута для ответов от агентов-checker-ов.
        send_delayed< checkers_responses_timeout >( *this, 750ms );
      }
    ...
      void on_checkers_responses_timeout() {
        ... /* Отсылка отрицательного ответа. */
      }
    };
    

    Однако, пойдя по этому пути мы наступим на грабли: ожидая ответа от checker-ов мы запросто можем получить отложенный сигнал io_agent_response_timeout. Ведь его же никто не отменял. И когда это сигнал придет, мы сгенерируем отрицательный ответ из-за якобы имеющегося тайм-аута ввода-вывода, которого-то и нет. Давайте попробуем обойти эти грабли.


    Зачастую разработчики, не привыкшие к асинхронному обмену сообщениями, пытаются отменить отложенный сигнал. Это можно сделать, если сохранить идентификатор таймера при обращении к send_periodic:


    // Попытка представить агент email_analyzer с отменой отложенного
    // сигнала io_agent_response_timeout.
    class email_analyzer : public agent_t {
      struct io_agent_response_timeout : public signal_t {};
    ...
      virtual void so_evt_start() override {
        ... /* Отсылка запроса IO-агенту */
        // Для того, чтобы получить идентификатор таймера используем
        // send_periodic вместо send_delayed, но параметр period
        // выставляем в 0, что делает отсылаемый сигнал отложенным,
        // но не периодическим.
        io_response_timer_ = send_periodic< io_agent_response_timeout >(
          *this, 1500ms, 0ms );
      }
    ...
      void on_load_succeed( const load_succeed & msg ) {
        // Отменяем отложенный сигнал.
        io_response_timer_.reset();
        ... /* Создание коопераций с агентами checker-ами */
        // Сразу же начинаем отсчет тайм-аута для ответов от агентов-checker-ов.
        send_delayed< checkers_responses_timeout >( *this, 750ms );
      }
    ...
      // Идентификатор таймера для отложенного сигнала о тайм-ауте для IO-операции.
      timer_id_t io_response_timer_;
    };
    

    К сожалению, этот простой способ не всегда работает. Проблема в том, что отложенный сигнал может быть отослан агенту email_analyzer буквально за мгновение до того, как агент email_analyzer выполнит сброс таймера для этого отложенного сигнала. Тут уж ничего не поделать – чудеса многопоточности, они такие.


    Агент email_analyzer может зайти в on_load_succeed на контексте своей рабочей нити, может даже успеть войти в вызов reset() для таймера… Но тут его нить вытеснят, управление получит нить таймера SObjectizer-а, на которой произойдет отсылка отложенного сигнала. После чего управление опять получит рабочая нить агента email_analyzer() и метод reset() для таймера сделает отмену уже отосланного сигнала. Однако, сигнал уже находится в очереди сообщений агента, откуда его уже никто не выбросит – раз уж сообщение попало в очередь к агенту, то изъять его оттуда нельзя.


    Самое плохое в этой ситуации то, что подобная ошибка будет возникать эпизодически. Из-за чего понять, что именно происходит и в чем именно ошибка, будет сложно. Так что нужно помнить, что отмена отложенного сообщения – это вовсе не гарантия того, что оно не будет отослано.


    Итак, если просто отменять отложенное сообщение неправильно, то что же делать?


    Например, можно использовать состояния агента. Когда email_analyzer ждет ответа от IO-агента, он находится в одном состоянии. Когда ответ от IO-агента приходит, агент email_analyzer переходит в другое состояние, в котором он будет ждать ответов от checker-ов. Т.к. во втором состоянии email_analyzer на сигнал io_agent_response_timeout не подписан, то этот сигнал будет просто проигнорирован.


    С введением состояний в агент email_analyzer мы могли бы получить что-то вроде:


    // Попытка представить агент email_analyzer с использованием
    // нескольких состояний.
    class email_analyzer : public agent_t {
      struct io_agent_response_timeout : public signal_t {};
      struct checkers_responses_timeout : public signal_t {};
    
      // Состояние, в котором агент будет ждать результата IO-операции.
      state_t st_wait_io{ this };
      // Состояние, в котором агент будет ждать ответов от checker-ов.
      state_t st_wait_checkers{ this };
      ...
      virtual void so_define_agent() override {
        // Подписываем агента на разные события в разных состояниях.
        // Для того, чтобы это было наглядно, используем вторую способ
        // подписки агентов – через методы класса state_t.
        st_wait_io
          .event( &email_analyzer::on_load_succeed )
          .event( &email_analyzer::on_load_failed )
          .event< io_agent_response_timeout >( &email_analyzer::on_io_timeout );
        st_wait_checkers
          .event( &email_analyzer::on_header_check_result )
          .event( &email_analyzer::on_body_check_result )
          .event( &email_analyzer::on_attach_check_result )
          .event< checkers_responses_timeout >( &email_analyzer::on_checkers_timeout );
      }
      ...
    };
    

    Однако, в SObjectizer можно поступить еще проще: можно назначить временной лимит на пребывание агента в конкретном состоянии. Когда этот лимит истечет, агент будет принудительно переведен в другое состояние. Т.е. мы можем написать что-то вроде:


    // Попытка представить агента email_analyzer с использованием ограничения времени
    // на пребывание агента в конкретном состоянии.
    class email_analyzer : public agent_t {
      state_t st_wait_io{ this };
      state_t st_io_timeout{ this };
    
      state_t st_wait_checkers{ this };
      state_t st_checkers_timeout{ this };
    ...
      virtual void so_define_agent() override {
        st_wait_io
          .event( &email_analyzer::on_load_succeed )
          .event( &email_analyzer::on_load_failed )
          // Ограничиваем время ожидания.
          .time_limit( 1500ms, st_io_timeout );
        st_wait_checkers
          .event( &email_analyzer::on_header_check_result )
          .event( &email_analyzer::on_body_check_result )
          .event( &email_analyzer::on_attach_check_result )
          .time_limit( 750ms, st_checkers_timeout );
      }
    };
    

    Но просто ограничить время пребывания в некотором состоянии недостаточно. Нужно еще предпринять какие-то действия, когда это время истечет. Как это сделать?


    Использовать такую вещь, как обработчик входа в состояние. Когда агент входит в конкретное состояние, SObjectizer вызывает функцию-обработчик входа в это состояние, если пользователь такую функцию назначил. Это означает, что на вход в st_io_timeout мы можем повесить обработчик, который отсылает check_result с отрицательным результатом и завершает работу агента:


    st_io_timeout.on_enter( [this]{
      send< check_result >( reply_to_, email_file_, check_status::check_failure );
      so_deregister_agent_coop_normally();
    } );
    

    Точно такой же обработчик мы повесим и на вход в st_checkers_timeout. А т.к. действия внутри этих обработчиков будут одинаковыми, то мы можем вынести их в отдельный метод агента email_analyzer и указать этот метод в качестве обработчика входа и для состояния st_io_timeout, и для состояния st_checkers_timeout:


    class email_analyzer : public agent_t {
      state_t st_wait_io{ this };
      state_t st_io_timeout{ this };
    
      state_t st_wait_checkers{ this };
      state_t st_checkers_timeout{ this };
    ...
      virtual void so_define_agent() override {
        ...
        st_io_timeout
          .on_enter( &email_analyzer::on_enter_timeout_state );
        ...
        st_checkers_timeout
          .on_enter( &email_analyzer::on_enter_timeout_state );
      };
    ...
      void on_enter_timeout_state() {
        send< check_result >( reply_to_, email_file_, check_status::check_failure );
        so_deregister_agent_coop_normally();
      }
    };
    

    Но и это еще не все. Раз уж мы затронули тему состояний агентов и их возможностей, то можно развить ее дальше и провести рефакторинг кода email_analyzer.


    Нетрудно заметить, что в коде очень часто дублируется парочка действий: отсылка сообщения check_result и дерегистрация кооперации агента. Такое дублирование не есть хорошо, cледует от него избавиться.


    По сути, работа агента email_analyzer сводится к тому, чтобы в итоге агент оказался в одном из двух состояний: либо все завершилось нормально и следует отослать положительный результат, после чего завершить свою работу, либо же все завершилось ошибкой, нужно отослать отрицательный результат и, опять таки, завершить работу агента. Так давайте это и выразим прямо в коде с помощью двух состояний агента: st_success и st_failure.


    // Попытка представить агента email_analyzer со специальными финальными
    // состояниями st_success и st_failure.
    class email_analyzer : public agent_t {
      state_t st_wait_io{ this };
      state_t st_wait_checkers{ this };
    
      state_t st_failure{ this };
      state_t st_success{ this };
    ...
      virtual void so_define_agent() override {
        st_wait_io
          .event( &email_analyzer::on_load_succeed )
          .event( &email_analyzer::on_load_failed )
          // Ограничиваем время ожидания.
          .time_limit( 1500ms, st_failure );
        st_wait_checkers
          .event( &email_analyzer::on_header_check_result )
          .event( &email_analyzer::on_body_check_result )
          .event( &email_analyzer::on_attach_check_result )
          .time_limit( 750ms, st_failure );
        st_failure
          .on_enter( [this]{
            send< check_result >( reply_to_, email_file_, status_ );
            so_deregister_agent_coop_normally();
          } );
        st_success
          .on_enter( [this]{
            send< check_result >( reply_to_, email_file_, check_status::safe );
            so_deregister_agent_coop_normally();
          } );
      };
    ...
      // Новый атрибут нужен для сохранения актуального отрицательного результата.
      check_status status_{ check_status::check_failure };
    };
    

    Это позволит нам в коде агента просто менять состояние для завершения работы агента тем или иным образом:


    void on_load_failed( const load_email_failed & ) {
      st_failure.activate();
    }
    
    void on_checker_result( check_status status ) {
      // На первом же неудачном результате прерываем свою работу.
      if( check_status::safe != status ) {
        status_ = status;
        st_failure.activate();
      }
      else {
        ++checks_passed_;
        if( 3 == checks_passed_ )
          // Все результаты получены. Можно завершать проверку с
          // положительным результатом.
          st_success.activate();
      }
    }
    

    Но можно пойти и еще дальше. Для состояний st_failure и st_success есть одно общее действие, которое нужно выполнить при входе в любое их этих состояний – обращение к so_deregister_agent_coop_normally(). И это не случайно, ведь оба этих состояния отвечают за завершение работы агента. А раз так, то мы можем воспользоваться вложенными состояниями. Т.е. мы введем состояние st_finishing, для которого st_failure и st_success будут подсостояниями. При входе в st_finishing будет вызываться so_deregister_agent_coop_normally(). А при входе в st_failure и st_success – будет только отсылаться соответствующее сообщение.


    Т.к. состояния st_failure и st_success вложены в st_finishing, то при входе в любое из них сначала будет вызваться обработчик входа в st_finishing, а уже затем – обработчик входа в st_failure или st_success. Получится, что мы при входе в st_finishing мы дерегистрируем агента, а следом, при входе в st_failure или st_success, отсылаем сообщение check_result.


    Если кто-то из читателей чувствует себя не комфортно при упоминании вложенных состояний, обработчиков входа в состояния, ограничений на время пребывания в состоянии, то имеет смысл ознакомится с одной из основополагающих статей на тему иерархических конечных автоматов: David Harel, Statecharts: A visual formalism for complex systems. Science of Computer Programming. Состояния агентов в SObjectizer реализуют изрядную часть описанных там возможностей.


    В итоге всех этих преобразований агент email_analyzer примет показанный ниже вид.


    // Седьмая версия агента email_analyzer, с распараллеливанием работы по проверке
    // содержимого email-а и использованием вложенных состояний.
    
    class email_analyzer : public agent_t {
      state_t st_wait_io{ this };
      state_t st_wait_checkers{ this };
    
      state_t st_finishing{ this };
      state_t st_failure{  initial_substate_of{ st_finishing } };
      state_t st_success{ substate_of{ st_finishing } };
    
    public :
      email_analyzer( context_t ctx,
        string email_file,
        mbox_t reply_to )
        : agent_t(ctx), email_file_(move(email_file)), reply_to_(move(reply_to))
      {}
    
      virtual void so_define_agent() override {
        st_wait_io
          .event( &email_analyzer::on_load_succeed )
          .event( &email_analyzer::on_load_failed )
          // Назначаем тайм-аут для ожидания ответа.
          .time_limit( 1500ms, st_failure );
    
        st_wait_checkers
          .event( [this]( const email_headers_checker::result & msg ) {
              on_checker_result( msg.status_ );
            } )
          .event( [this]( const email_body_checker::result & msg ) {
              on_checker_result( msg.status_ );
            } )
          .event( [this]( const email_attach_checker::result & msg ) {
              on_checker_result( msg.status_ );
            } )
          // Еще один тайм-аут для ответов.
          .time_limit( 750ms, st_failure );
    
        // Для состояний, которые отвечают за завершение работы,
        // нужно определить только обработчики входа.
        st_finishing.on_enter( [this]{ so_deregister_agent_coop_normally(); } );
        st_failure.on_enter( [this]{
            send< check_result >( reply_to_, email_file_, status_ );
          } );
        st_success.on_enter( [this]{
            send< check_result >( reply_to_, email_file_, check_status::safe );
          } );
      }
    
      virtual void so_evt_start() override {
        // Начинаем работать в состоянии по умолчанию, поэтому
        // нужно принудительно перейти в нужное состояние.
        st_wait_io.activate();
    
        // При старте сразу же отправляем запрос IO-агенту для загрузки
        // содержимого email файла.
        send< load_email_request >(
            so_environment().create_mbox( "io_agent" ),
            email_file_,
            so_direct_mbox() );
      }
    
    private :
      const string email_file_;
      const mbox_t reply_to_;
    
      // Храним последний отрицательный результат для того, чтобы отослать
      // его при входе в состояние st_failure.
      check_status status_{ check_status::check_failure };
    
      int checks_passed_{};
    
      void on_load_succeed( const load_email_succeed & msg ) {
        // Меняем состояние т.к. переходим к следующей операции.
        st_wait_checkers.activate();
    
        try {
          auto parsed_data = parse_email( msg.content_ );
          introduce_child_coop( *this,
            // Агенты-checker-ы будут работать на своем собственном
            // thread-pool-диспетчере, который был создан заранее
            // под специальным именем.
            disp::thread_pool::create_disp_binder(
                "checkers", disp::thread_pool::bind_params_t{} ),
            [&]( coop_t & coop ) {
              coop.make_agent< email_headers_checker >(
                  so_direct_mbox(), parsed_data->headers() );
              coop.make_agent< email_body_checker >(
                  so_direct_mbox(), parsed_data->body() );
              coop.make_agent< email_attach_checker >(
                  so_direct_mbox(), parsed_data->attachments() );
            } );
        }
        catch( const exception & ) {
          st_failure.activate();
        }
      }
    
      void on_load_failed( const load_email_failed & ) {
        st_failure.activate();
      }
    
      void on_checker_result( check_status status ) {
        // На первом же неудачном результате прерываем свою работу.
        if( check_status::safe != status ) {
          status_ = status;
          st_failure.activate();
        }
        else {
          ++checks_passed_;
          if( 3 == checks_passed_ )
            // Все результаты получены. Можно завершать проверку с
            // положительным результатом.
            st_success.activate();
        }
      }
    };
    

    Ну а теперь имеет смысл посмотреть на код получившегося агента email_analyzer и задать себе простой, но важный вопрос: а оно того стоило?


    Очевидно, что с ответом на этот вопрос все не так однозначно. Но поговорить об этом мы попробуем уже в следующей статье. В которой затронем тему уроков, которые мы извлекли после более чем десяти лет использования SObjectizer в разработке программных систем.


    Исходные коды к показанным в статье примерам можно найти в этом репозитории.

    Поделиться публикацией
    Похожие публикации
    AdBlock похитил этот баннер, но баннеры не зубы — отрастут

    Подробнее
    Реклама
    Комментарии 23
    • 0
      Хм, довольно неожиданно, что из on_enter одного состояния нельзя перейти в другое состояние.
      Было бы не плохо этот момент хотя бы немного осветить в этой статье.
      • 0
        Было бы не плохо этот момент хотя бы немного осветить в этой статье.
        Да, наверное, вы правы. Какие-то вещи представляются очевидными мне самому, но они не обязаны быть таковыми для всех остальных.

        Если разрешить переходы в другое состояние в on_enter/on_exit, то легко дойти до ситуаций, когда возникнет зацикливание. И тогда следов проблемы не найдешь.

        Еще один не самый очевидный момент: действия в on_enter/on_exit должны быть noexcept. Поскольку если при смене состояния (особенно если эта смена происходит в сложном иерархическом автомате) возникает исключение, то откатить все к исходной точке и обеспечить strong exception guarantee, скорее всего не получится.
        • 0
          Как мне кажется, за зацикливанием должен следить все таки пользователь библиотеки. А то ведь при желании зацикливание можно и другими средствами организовать.

          А при таких раскладах приходится слать дополнительное, совершенно не нужное, сообщение для смены состояния.

          Кстати, на счет noexept, so_5::send ведь не noexept?
          • 0
            Как мне кажется, за зацикливанием должен следить все таки пользователь библиотеки.
            В условиях иерархических конечных автоматов и, особенно, наследования, следить может быть слишком сложно. Т.е. вы написали класс агента A с несколькими состояниями, затем ваш коллега отнаследовался от A и ввел еще несколько состояний. Ваш коллега вообще может не знать, что в каком-то своем on_enter/on_exit вы делаете еще одну смену состояния. Так же как и вы не можете знать, что будет происходить в on_enter/on_exit у наследников вашего класса.
            А при таких раскладах приходится слать дополнительное, совершенно не нужное, сообщение для смены состояния.
            А расскажите, пожалуйста, про ваш случай подробнее. Может мы и правда слишком жестко к ограничениям относимся.
            Кстати, на счет noexept, so_5::send ведь не noexept?
            Не noexcept, send может бросать исключения.
            • 0
              А расскажите, пожалуйста, про ваш случай подробнее.

              У меня есть некий агент, который может выполнять внешние команды. Выполнение команды связано с IO операциями и занимает время. Параллельно выполнять команды нельзя.

              Агент принимает команды и складывает их в очередь. В числе прочих, агент имеет такие статусы как st_wait_command и st_wait_perform. Пока выполняется команда агент находится в состоянии st_wait_perform. После выполнения команды нужно начать выполнять следующую команду из очереди, если она не пуста. Или же вернутся в состояние st_wait_command.

              И вот код по началу выполнения следующей команды было бы удобно разместить в on_enter состояния st_wait_comman. Тогда оставалось бы просто перейти в состояние st_wait_command после выполнения команды.

              Сейчас я уже нашел более-менее подходящее решение — вместо перехода в статус st_wait_command вызывать отдельный метод, который решит нужно ли менять состояние или же начать выполнять новую команду. Это работает, но эстетически смущает. Ведь этот метод занимается как раз тем, чем должен бы заниматься on_enter.

              Не noexcept, send может бросать исключения.

              Выходит, если нельзя использовать внутри on_enter/on_exit без try/catch (как вы делаете у себя в примере).
              • 0
                Выходит, его нельзя использовать *
                • 0
                  Я пока прокомментирую вот это:
                  Выходит, если нельзя использовать внутри on_enter/on_exit без try/catch (как вы делаете у себя в примере).

                  Методы on_enter/on_exit лучше всего рассматривать как некоторые аналоги конструкторов и деструкторов, поскольку у них:
                  • во-первых, очень специфические задачи (on_enter похож на конструктор, т.к. позволяет задать какие-то начальные значения, а on_exit похож на деструктор, т.к. позволяет освободить ресурсы);
                  • во-вторых, они выполняются в очень специфическом контексте, в котором возможности самого SObjectizer-а по преодолению возникающих проблем крайне ограничены. Очень похоже на ситуацию с выбросом исключения из деструктора объекта — как правило тут уж ничего кроме std::terminate не сделать.

                  Поэтому если есть необходимость запихнуть в on_enter/on_exit какие-то сложные действия с высокой вероятностью возникновении ошибки, то нужно обрамлять эти действия блоком try/catch и самостоятельно ошибку устранять.

                  Но тут возникает другой момент: допустим, в on_enter вам нужно сделать действия A, B и C, и на действии C у вас возникает ошибка. Что делать в этом случае? Вы не может остаться в новом состоянии, т.к. из подготовительных действий A, B и C вы сделали только A и B. Но и просто откатить A и B так же недостаточно, т.к. вы не может просто так вернуться в свое исходное состояние.

                  В общем, куда не кинь, везде клин :(

                  Поэтому мы исходим из следующих соображений:
                  • когда при переходе из состояния в состояние нужно делать какие-то сложные цепочки действий с высокой вероятностью ошибок, то это следует делать не в on_enter/on_exit, а непосредственно в коде агента перед вызовом so_change_state;
                  • на долю on_enter/on_exit остаются самые тривиальные действия, вроде назначения начальных значений. Если уж эти действия приводят к ошибкам, то лучше уж вызвать std::terminate и рестартовать приложение, чем пытаться выбраться из такой ситуации.
                  • 0
                    Да, это понятно, что возникают проблемы с исключениями при смене состояний.
                    Просто логично было бы предположить, что SO будет действовать согласно so_exception_reaction, а не просто вызовет std::terminate. Тот же самый рестарт агента может быть логичным поведением программы в подобных случаях.
                    • 0
                      Просто логично было бы предположить, что SO будет действовать согласно so_exception_reaction, а не просто вызовет std::terminate.

                      Не так все просто, к сожалению. Пользователь может выставить реакцию ignore_exception. Но т.к. смена состояния не была нормально завершена, то агент оказывается в непонятном (и, скорее всего, некорректном) состоянии.

                      Пользователь может получить полный контроль за действиями при смене состояния, если он делает их сам вне on_enter/on_exit. Но вот внутри on_enter/on_exit возможности сильно ограничиваются.
                      • 0
                        Ну да, ситуация не простая. Варианты, конечно, все еще есть. Но они, наверное, сделают ситуацию только еще более запутанной.
                        • 0
                          Угу. Поэтому в первой реализации поддержки иерархических конечных автоматов и обработчиков on_enter/on_exit мы пошли по пути жестких ограничений. Тогда и реализация оказывается более простой, и поведение более предсказуемым и понятным.

                          По мере накопления опыта и рассмотрения сценариев от разных пользователей можно будет подумать о том, как эти ограничения смягчить.
                      • 0
                        Хотя и если реакция зависит от состояния агента, то, вопрос, конечно становится несколько более сложным. Возможно, лучше использовать реакцию кооперации.
                    • 0
                      Сейчас я уже нашел более-менее подходящее решение — вместо перехода в статус st_wait_command вызывать отдельный метод, который решит нужно ли менять состояние или же начать выполнять новую команду. Это работает, но эстетически смущает.

                      Если я правильно понял ситуацию, то это самый простой и надежный способ. Я бы и сам так делал.
                      • 0
                        По мере разработки таки наступила ситуация, где без дополнительного сообщения пока не получается выкрутиться.

                        Стало нужным добавить таймаут для команд. И тут бы идеально подошел time_limit для состояния st_wait_perform. Но так как после таймаута нужно начать выполнять следующую команду из очереди (если она есть), а соответственно и менять состояние обратно на st_wait_perform, то в методах on_enter/on_exit не выйдет обработать событие таймаута. Потому придется слать отложенное сообщение, судя по всему.
                        • +1
                          И тут бы идеально подошел time_limit для состояния st_wait_perform.
                          Ну вот тут я не уверен. time_limit хорош для безусловного перехода в другое состояние, когда не приходится при выходе анализировать, что успели сделать, что не успели.

                          У вас же, при использовании time_limit, потребуется проверять некоторый признак, пришел ли уже результат IO-операции или нет. Это дополнительный атрибут в агенте, дополнительная логика и т.д.

                          Если у вас агент входит в st_wait_perform и может в этом состоянии обрабатывать несколько IO-операций последовательно, то обычное отложенное сообщение выглядит более удобным решением, чем time_limit. При этом, однако, нужно не забыть вот о чем: если вы взвели отложенное сообщение M(1) для операции OP(1), потом операция OP(1) у вас успешно закончилась и вы успели начать операцию OP(2) (отправив M(2) для контроля тайм-аута OP(2)), то вы запросто можете получить M(1) и принять его за M(2). Например, у вас может быть что-то вроде:
                          class io_performer : public so_5::agent_t {
                            struct io_timeout : public so_5::signal_t {};
                            ...
                            void on_next_operation(mhood_t<start_next_io>) {
                              // Начинаем отсчет времени для очередной операции.
                              so_5::send_delayed<io_timeout>(*this, ...);
                              // Начинаем саму IO-операцию.
                              perform_io(...);
                            }
                            void on_io_result(mhood_t<io_result> cmd) {
                              ... // Должным образом обрабатываем результат.
                              if(has_more_io_ops())
                                so_5::send<start_next_io>(*this, ...);
                            }
                            void on_timeout(mhood_t<io_timeout>) {
                               ... // Обрабатываем тайм-аут текущей операции.
                            }
                          };
                          Вот в этом случае у вас отложенные сигналы от предыдущих операций будут восприниматься как тайм-ауты для текущей операции. Самый надежный способ избежать этого на данный момент — это включать в отложенное сообщение какой-то ID текущей операции. Например:
                          class io_performer : public so_5::agent_t {
                            struct io_timeout : public so_5::message_t {
                              op_id id_;
                              io_timeout(op_id id) : id_(std::move(id)) {}
                            };
                            ...
                            void on_next_operation(mhood_t<start_next_io>) {
                              // Начинаем отсчет времени для очередной операции.
                              current_id_ = create_current_op_id();
                              so_5::send_delayed<io_timeout>(*this, ..., current_op_id_);
                              // Начинаем саму IO-операцию.
                              perform_io(...);
                            }
                            void on_io_result(mhood_t<io_result> cmd) {
                              ... // Должным образом обрабатываем результат.
                              current_op_id_ = null_id; // Сбрасываем ID, т.к. текущая операция завершилась.
                              if(has_more_io_ops())
                                so_5::send<start_next_io>(*this, ...);
                            }
                            void on_timeout(mhood_t<io_timeout> cmd) {
                              if(current_op_id_ == cmd->id_) {
                                 ... // Обрабатываем тайм-аут текущей операции.
                              }
                            }
                            ...
                            op_id current_op_id_;
                          };
                          • 0
                            Да, ровно так сейчас и вышло. У операций уже до этого были ID, так что с этим проблем не возникло.

                            Видимо, я хочу от встроенного механизма конечных автоматов слишком многого. :) Ну или моя задача не особо на них (на конечные автоматы, в смысле) ложится и я пытаюсь притянуть ее за уши.
                            • 0
                              Ну или моя задача не особо на них ложится и я пытаюсь притянуть ее за уши.
                              Ну или же ее можно решить на КА, но за счет дополнительных сообщений и, возможно, еще одного состояния.

                              Например:
                              st_neutral:
                              ..on_enter: отослать себе check_queue
                              ..msg_check_queue -> если очередь пуста, то идем в st_wait_command, если не пуста, то идем в st_wait_perform;

                              st_wait_command:
                              ..msg_command -> поставить заявку в очередь, перейти в st_wait_perform;

                              st_wait_perform:
                              ..on_enter: инициировать первую операцию из очереди;
                              ..msg_io_result: обработать результат, перейти в st_neutral;
                              ..msg_command: поставить заявку в очередь;
                              ..time_limit: перейти в st_io_timedout;

                              st_io_timeout:
                              ..on_enter: обработать тайм-аут для текущей операции, отослать себе msg_check_queue;
                              ..msg_check_queue: делегировать обработку msg_check_queue состоянию st_neutral;

                              Правда, не уверен, что эта логика оказывается проще и эффективнее.

                              Делегировать обработку сообщения из текущего состояния в другое состояние S можно посредством метода state_t::transfer_to_state (пример здесь).
                            • 0
                              Так может сделать встроенную возможность (на уровне SO) задавать таймерам идентификаторы (при формировании таймера)? И соответственно, возможность запустить, перезапустить или остановить таймер с указанным ID (в качестве ID выступает просто число, а разработчик сам решает как он их назначает и различает).
                              Тогда для данного случая, можно было бы не заботится о том, что такой таймер уже есть, т.к. при начале обработки очередного сообщения достаточно было бы «перезапустить» таймер с определённым ID. Если такого в очереди ещё не было, он появляется. Если уже есть, начинает отсчёт заново. Если перешли в состояние st_wait_perform, отключаем таймер.
                              А если он таки сработал (вызван обработчик), значит действительно «зависли в обработке».
                              С одной стороны, это снижает некоторую универсальность механизмов работы с сообщениями, т.к. выделяет отдельное понятие таймеры со своим API. С другой стороны, я так понял всё-равно для работы с таймерами, уже есть свой отдельный API.
                              • 0
                                К сожалению, я не понял идею такого API :(

                                Отменить таймер можно и сейчас. Например:
                                so_5::timer_id_t timer = so_5::send_periodic<msg>(*this,
                                  std::chrono::milliseconds(250), // Задержка.
                                  std::chrono::milliseconds::zero() // Нет повторения, однократная доставка.
                                );
                                ...
                                timer.release(); // Отменили таймер.
                                
                                Но тут вот в чем проблема, если таймер должен сработать в момент T, то на самом деле отложенное сообщение может стать в очередь получателя в диапазоне [T-d1, T+d2], где d1 и d2 — это очень маленькие величины, но, к сожалению не нулевые.

                                Предположим, что отложенное сообщение встает в очередь в момент времени (T-20us), а агент в момент времени (T-15us) отменяет таймер. Реальной отмены не будет, т.к. сообщение уже в очереди получателя.

                                И тут есть всего два надежных способа:

                                1. Самый простой. Передавать в отложенном сообщении некий прикладной ID. Как правило, в каждой задаче этот ID разный. Где-то строковый идентификатор, где-то указатель.

                                2. Создать отдельный mbox, подписаться на сообщение из него. Отослать отложенное сообщение в этот mbox. Затем, при отмене, нужно и отменить отложенное сообщение, и снять подписку с этого mbox-а. В таком случае даже если сообщение уже в очереди получателя, то оно будет проигнорировано, т.к. подписки на него уже нет.

                                Второй способ достаточно накладный. Хотя вот реализация time_limit для state_t сейчас работает именно по такому принципу.
                                • 0
                                  >> К сожалению, я не понял идею такого API :(
                                  Ну, в целом Вы ответили, что всё уже реализовано )

                                  Идея была добавить поддержку ID в самом API SO, чтобы каждому разработчику не приходилось это делать самостоятельно. Например как ещё один параметр (не обязательный)
                                  so_5::timer_id_t timer = so_5::send_periodic<msg,TYPEID>(*this,
                                    std::chrono::milliseconds(250), 
                                    std::chrono::milliseconds::zero(),
                                    myID  // <-- здесь передаём свой ID, который потом придёт к нам в обработчике
                                  );
                                  ...
                                  

                                  А проблема неточности таймеров, это да. Надёжный способ, требует сложной реализации. Я просто считал, что [T-d1, T+d2] входит в документированную погрешность, которую нужно учитывать. Разве что, думал, что по таймерам есть гарантия «сработает НЕ РАНЬШЕ заданного времени»(т.е. доставка будет только в момент >=T). При этом, конечно остаётся проблема отмены таймера, когда сообщение уже в очереди, но не обработано. Но с другой стороны, если гарантировано помещение в очередь для >=T, то значит отмена таймера (в момент >=T) фактически происходит когда таймер уже сработал и честно ждёт обработки в очереди.

                                  • 0
                                    Я просто считал, что [T-d1, T+d2] входит в документированную погрешность, которую нужно учитывать. Разве что, думал, что по таймерам есть гарантия «сработает НЕ РАНЬШЕ заданного времени»(т.е. доставка будет только в момент >=T).
                                    Это, кстати, хороший момент. Тут зависит от таймерного механизма. Если нет округления времени срабатывания (например, в механизмах timer_heap и timer_list), то таймер срабатывает только в момент >=T. А вот как для timer_wheel, где T должно попадать в «окно»… Наверное, тоже >=T, но навскидку не вспомню. Кроме того, пользователь может и свой таймерный механизм подсунуть, что у него будет — хз…

                                    Но тут другое важно. Допустим, что агент A решает отменить заявку в (T-d1), он пытается вызвать timer_.release() и тут его нить вытесняется операционкой. Проходит немного времени и наступает момент T, нить таймера выставляет заявку. Тут просыпается нить агента A и выполнение timer_.release() продолжается. Но заявка уже в очереди. Вероятность этого невысока, но, как ни странно, на больших нагрузках она регулярно трансформируется в реальность.
                                    • 0
                                      Да. Согласен. Значит всё-таки решение задачи ложных или повторных срабатываний таймера остаётся разработчику. Что в целом тоже нормально…
                                      Надёжный код, должен подразумевать обработку и таких ситуаций.

                                      P.S. Даже std::condition_variable могут ложно просыпаться, чего уж… :)
                                      • +1
                                        Значит всё-таки решение задачи ложных или повторных срабатываний таймера остаётся разработчику.
                                        Я бы не сказал, что это «ложные и повторные». Это скорее очередная гримаса многопоточности, особенно во времена реальных многоядерных машин. Ведь два потока действительно могут работать параллельно и независимо, от чего взаимные сочетания «кто кого обогнал, кто от кого отстал» могут принимать самые причудливые формы.

              Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.