суббота, 20 декабря 2014 г.

[prog.c++] Механизм registered_delivery для сообщений в SObjectizer

Во вчерашней заметке про message-passing я сказал, что есть идея еще одного способа взаимодействия между агентами. Эта идея начинает приобретать вполне конкретные очертания.

Смысл в том, что сейчас отсылка сообщения -- это никаким образом не контролируемая операция. Т.е. агент a1, отославший сообщение m1 агенту a2 никак не может узнать, дошло ли сообщение m1 до a2 или нет. Может оно потерялось, потому, что агента a2 нет или он не успел еще подписаться на m1? Может оно потерялось, потому, что агент a2 находился в состоянии, в котором обработка m1 невозможна? Может агент a2 выбросил исключение, когда обрабатывал m1?

Чем-то отсылка сообщения похоже на отсылку обычного письма через обычную почту. В подавляющем большинстве случаев письмо дойдет до адресата. Но, дошло или не дошло просто так не определить.

Поскольку иногда такое поведение почты не устраивает отправителей и получателей писем, на почте есть услуга "заказное письмо". Стоит оно дороже, требует заполнение дополнительных бумаг, но зато у отправителя остается квитанция, с помощью которой он может заставить почту определить, что же случилось с отосланным заказным письмом.

Собственно, а почему бы не сделать в SObjectizer аналог таких заказных писем в виде специального режима registered_delivery для отсылаемых сообщений?

Вот код агента a_processor_t из примера Producer/Consumer. Слева оригинальный код, написанный с использованием синхронного взаимодействия. Справа -- прототип реализации, базирующейся на механизме registered_delivery:

class a_processor_t : public so_5::rt::agent_t,
   private random_generator_mixin_t
{
public :
   a_processor_t(
      // Environment to work in.
      so_5::rt::environment_t & env,
      // Processor's name.
      std::string name,
      // Receiver mbox.
      const so_5::rt::mbox_t & receiver )
      :  so_5::rt::agent_t( env )
      ,  m_name( std::move( name ) )
      ,  m_receiver( receiver )
   {}

   virtual void
   so_define_agent() override
   {
      // Just one handler in the default state.
      so_default_state().event< msg_next_turn >(
            &a_processor_t::evt_next_turn );
   }

   virtual void
   so_evt_start() override
   {
      // Start working cycle.
      so_5::send_to_agent< msg_next_turn >( *this );
   }

private :
   // Start of next processing turn signal.
   struct msg_next_turn : public so_5::rt::signal_t {};

   // Processor name.
   const std::string m_name;

   // Receiver.
   const so_5::rt::mbox_t m_receiver;

   void
   evt_next_turn()
   {
      // Take requests from receiver.
      auto requests = take_requests();

      if( requests.empty() )
      {
         // No requests. There is no sense to make next request
         // immediately. It is better to take some time to generators
         // and receiver.
         TRACE() << "PRO(" << m_name << ") no request received, sleeping"
               << std::endl;
         so_5::send_delayed_to_agent< msg_next_turn >(
               *this, std::chrono::milliseconds( 25 ) );
      }
      else
      {
         // There are some requests. They must be processed.
         process_requests( requests );
         // Start next turn immediately.
         so_5::send_to_agent< msg_next_turn >( *this );
      }
   }

   std::vector< application_request >
   take_requests()
   {
      // There is a plenty of room for any errors related to
      // synchronous invocation. Catch them all and treat them
      // as inabillity of receiver to provide request array.
      try
      {
         return m_receiver->
               get_one< std::vector< application_request > >()
               .wait_for( std::chrono::milliseconds( 20 ) )
               .sync_get< a_receiver_t::msg_take_requests >();
      }
      catchconst std::exception & x )
      {
         TRACE() << "PRO(" << m_name << ") failed to take requests: "
               << x.what() << std::endl;
      }

      return std::vector< application_request >();
   }

   void
   process_requests( const std::vector< application_request > & requests )
   {
      TRACE() << "PRO(" << m_name << ") start processing, requests="
            << requests.size() << std::endl;

      // Just imitation of requests processing.
      // Processing time is proportional to count of requests.
      const auto processing_time = std::chrono::microseconds(
            requests.size() * random( 1501500 ) );
      std::this_thread::sleep_for( processing_time );

      TRACE() << "PRO(" << m_name << ") processing took: "
            << processing_time.count() / 1000.0 << "ms" << std::endl;
   }
};
class a_processor_t : public so_5::rt::agent_t,
   private random_generator_mixin_t
{
public :
   a_processor_t(
      // Environment to work in.
      so_5::rt::environment_t & env,
      // Processor's name.
      std::string name,
      // Receiver mbox.
      const so_5::rt::mbox_t & receiver )
      :  so_5::rt::agent_t( env )
      ,  m_name( std::move( name ) )
      ,  m_receiver( receiver )
   {}

   virtual void
   so_define_agent() override
   {
      // Just one handler in the default state.
      so_default_state().event( &a_processor_t::evt_next_turn );
   }

   virtual void
   so_evt_start() override
   {
      // Start working cycle.
      ask_for_requests();
   }

private :
   // Processor name.
   const std::string m_name;

   // Receiver.
   const so_5::rt::mbox_t m_receiver;

   void
   evt_next_turn(
      const so_5::rt::delivery_receipt<
            a_receiver_t::msg_take_requests,
            std::vector< application_request > > & evt )
   {
      if( !evt.processed() || evt.result().empty() )
      {
         // No requests. There is no sense to make next request
         // immediately. It is better to take some time to generators
         // and receiver.
         TRACE() << "PRO(" << m_name << ") no request received, sleeping"
               << std::endl;
         ask_for_requests( std::chrono::milliseconds( 25 ) );
      }
      else
      {
         // There are some requests. They must be processed.
         process_requests( evt.result() );
         // Start next turn immediately.
         ask_for_requests();
      }
   }

   void
   ask_for_requests( std::chrono::milliseconds pause =
         std::chrono::milliseconds::zero() )
   {
      so_5::send_delayed< a_receiver_t::msg_take_requests >(
            *this,
            m_receiver,
            pause,
            so_5::registered_delivery<
                  std::vector< application_request > >( *this ) );
   }

   void
   process_requests( const std::vector< application_request > & requests )
   {
      TRACE() << "PRO(" << m_name << ") start processing, requests="
            << requests.size() << std::endl;

      // Just imitation of requests processing.
      // Processing time is proportional to count of requests.
      const auto processing_time = std::chrono::microseconds(
            requests.size() * random( 1501500 ) );
      std::this_thread::sleep_for( processing_time );

      TRACE() << "PRO(" << m_name << ") processing took: "
            << processing_time.count() / 1000.0 << "ms" << std::endl;
   }
};

Суть в том, что при отсылке сообщения добавляется специальный маркер, создающийся посредством шаблонной функции so_5::registered_delivery<R>, где R -- это тип ожидаемого возвращаемого значения. В данном примере в качестве R выступает vector<application_request>. Этот специальный маркер так же задает, кому именно должен быть прислан отчет о доставке сообщения. В данном случае отчет должен придти агенту-отправителю сообщения, но можно задать и любой другой адрес. AFAIK, эта штука напоминает конструкцию pipeTo из Akka.

Если SObjectizer взялся за доставку помеченного маркером registered_delivery сообщения, то затем получателю отчета о доставке будет прислано сообщение so_5::rt::delivery_receipt. В этом сообщении будет содержаться ссылка на исходное сообщение, результат обработки сообщения, информация об исключении, если таковое возникло в процессе доставки.

Полагаю, delivery_receipt будет иметь приблизительно такой вид:

templateclass MSG, class RESULT >
class delivery_receipt
{
public :
   // Доступ к исходному сообщению, для которого создан отчет.
   const MSG &
   msg() const;

   // Получение исходного сообщения в виде умного указателя.
   // Может использоваться, например, для перепосылки этого
   // же экземпляра еще раз.
   intrusive_ptr_t< MSG >
   msg_as_instance() const;

   // Было ли сообщение полностью обработано агентом.
   // Значение true возвращается только если сообщение было
   // принято каким-то из обработчиков событий агента-получателя
   // и этот обработчик не выпустил наружу никаких исключений.
   bool
   processed() const;

   // Доступ к результату обработки сообщения.
   // Если processed() возвращает false, то обращение к этому
   // методу будет приводить к возникновению исключения.
   const RESULT &
   result() const;

   // Доступ к исключению, из-за которого сообщение не было доставлено
   // до агента-получателя или же которое было выпущено самим
   // агентом-получателем в процессе обработки сообщения.
   std::exception_ptr
   exception() const;

private :
   ...
};

Думаю, что если механизм registered_delivery будет реализован в SObjectizer, то он может существенно изменить способы обработки некоторых сценариев.

Например, в настоящее время широко используется подход с повтором сообщений после тайм-аутов. Например, агент a1 шлет агенту a2 пачку сообщений m1, m2,...,mN. Какие-то из них агентом a2 обрабатываются и в ответ отсылаются r1,...,rK. Когда a1 получает ответы r(i), он удаляет у себя информацию о запросах m(i). Но если в течении какого-то времени ответ на запрос m(j) не пришел, агент a1 повторяет m(j).

С появлением registered_delivery агент a1 может перестать использовать таймеры. Он отсылает m1,...,mN, а в ответ получает отчеты dr1,...,drN. Получив отчет агент a1 сможет понять, был ли m(j) обработан и с каким результатом. Или же не был и почему.

В общем, это действительно может быть очень интересная новая страничка в истории развития SObjectizer. Если, конечно, все это не останется на уровне празных фантазий ;)

Комментариев нет: