пятница, 14 июля 2017 г.

[prog.c++14] so_5_extra-1.0.1 и so-5.5.19.3

Сегодня мы выкатили очередную версию so_5_extra -- 1.0.1, а накануне обновили SObjectizer до версии 5.5.19.3. Доступно все, как обычно, на SourceForge: здесь so_5_extra, здесь SObjectizer.

В so_5_extra-1.0.1 добавлена всего одна фича: это collecting_mbox.

Смысл collecting_mbox в том, что иногда агенту X нужно получить строго N сообщений типа M от других агентов. Ну вот, например, стартует родительский агент-менеджер и создает пять нужных ему дочерних коопераций. Каждая дочерняя кооперация тратит какое-то время на свои начальные действия (например, загружается конфигурация, устанавливается подключение к БД, инициализируется внешнее устройство и т.д.), после чего отсылает родителю сигнал о том, что дочерняя кооперация готова. Соответственно, родительскому агенту нужно получить все пять сигналов о готовности дочерних коопераций, лишь после этого родительский агент сможет продолжать работать.

Без использования collecting_mbox эта задача решается за счет заведения счетчика внутри родительского агента. Т.е. чего-то вроде:

// Сигнал о том, что очередная дочерняя кооперация успешно стартовала.
struct child_started_t final : public so_5::signal_t {};

class parent_agent_t final : public so_5::agent_t
{
   // Счетчик дочерних коопераций, который подтвердили свой
   // успешный старт.
   unsigned int m_child_started = 0;

   // Реакция на поступление очередного сигнала о том, что дочерняя
   // кооперация стартовала.
   void on_child_started(mhood_t<child_started_t>)
   {
      ++m_child_started;
      if5 == m_child_started )
         ... // Можно начинать основную работу.
   }
   ...
};

А вот при использовании collecting_mbox вручную считать отдельные сигналы не нужно. Смысл collecting_mbox в том, что мы создаем новый mbox для определенного типа сообщений/сигналов и говорим, сколько сообщений/сигналов туда должно прийти. Отдаем этот новый mbox своим дочерним агентам. Когда дочерний агент завершает свои начальные действия, он отсылает сигнал в этот новый mbox. Новый mbox подсчитывает, сколько всего сигналов он получил. Когда получает все, то mbox сам отсылает специальное сообщение messages_collected_t на mbox родительского агента.

Простой пример

Вот как описанная выше ситуация может выглядеть в коде (это реальный пример mboxes/collecting_mbox/simple из состава so_5_extra):

#include <so_5_extra/mboxes/collecting_mbox.hpp>

#include <so_5/all.hpp>

// Сигнал, который отсылают дочерние агенты при своей готовности.
struct child_started_t final : public so_5::signal_t {};

// Простейший дочений агент, единственной задачей которого является
// отсылка сигнала child_started_t после начала работы.
class child_t final : public so_5::agent_t
{
public:
   child_t(context_t ctx, so_5::mbox_t ready_mbox)
      :  so_5::agent_t(std::move(ctx))
      ,  m_ready_mbox(std::move(ready_mbox))
   {}

   virtual void so_evt_start() override
   {
      // Стартовали, можно отослать сигнал родителю.
      so_5::send<child_started_t>(m_ready_mbox);
   }

private:
   // На этот mbox нужно будет отсылат сигнал о готовности.
   const so_5::mbox_t m_ready_mbox;
};

// Тип агента-родителя.
class sample_performer_t final : public so_5::agent_t
{
   // Тип, который нужен для накопления сигналов child_started_t.
   using child_started_mbox_t =
      so_5::extra::mboxes::collecting_mbox::mbox_template_t<child_started_t>;

public:
   // В конструктор передается количество дочерних агентов,
   // которые нужно создать.
   // Важно, что это значение для агента становится известно только
   // в run-time. Это влияет на определение типа child_started_mbox_t.
   sample_performer_t(context_t ctx, std::size_t child_count)
      :  so_5::agent_t(std::move(ctx))
      ,  m_child_count(child_count)
   {
      so_subscribe_self()
         // Сразу же подписываемся на уведомление о том, что
         // все дочерние агенты прислали child_started_t.
         .event(&sample_performer_t::on_all_child_started);
   }

   virtual void so_evt_start() override
   {
      // Создаем экземпляр collecting_mbox-а.
      // Для этого используется статический метод make из
      // child_started_mbox_t.
      auto ready_mbox = child_started_mbox_t::make(
            // SObjectizer Environment, в котором ведется работа.
            // Необходим для того, чтобы новый mbox получил уникальный ID.
            so_environment(),
            // На этот mbox затем collecting mbox пришет свое уведомление
            // о том, что нужное количество сигналов получено.
            so_direct_mbox(),
            // При создании задаем количество ожидаемых сигналов.
            // Затем это количество мы уже не сможем поменять.
            m_child_count);

      // Всех дочерних агентов запускаем на отдельном thread_pool-диспетчере.
      auto tp_disp = so_5::disp::thread_pool::create_private_disp(
            so_environment(),
            3 );
      for(std::size_t i = 0; i != m_child_count; ++i)
      {
         introduce_child_coop(*this,
               tp_disp->binder(so_5::disp::thread_pool::bind_params_t{}),
               [&ready_mbox](so_5::coop_t & coop) {
                  // При создании дочернего агента указываем новый mbox
                  // в качестве получателя сигнала child_started_t.
                  coop.make_agent<child_t>(ready_mbox);
               });
      }

      std::cout << "All children agents are created" << std::endl;
   }

private:
   // Количество дочерних агентов, которые предстоит создать.
   const std::size_t m_child_count;

   // Реакция на получение уведомления о том, что все дочерние агенты
   // стартовали.
   void on_all_child_started(
      // Это уведомление поступает к нам в виде сообщения messages_collected_t
      // из типа child_started_mbox_t.
      mhood_t<child_started_mbox_t::messages_collected_t>)
   {
      std::cout << "All children agents are started" << std::endl;
      so_deregister_agent_coop_normally();
   }
};

Подробнее о collecting_msg

Принцип работы

Экземпляр collecting_mbox получает сообщения определенного типа и накапливает их в своем внутреннем буфере. Когда collecting_mbox накопит N сообщений (где N задается программистом), он отсылает специальное сообщение messages_collected_t в указанный программистом целевой почтовый ящик. В этом новом экземпляре messages_collected_t содержится N накопленных ранее сообщений. После чего collecting_mbox накапливает следующие N сообщений, вновь отсылает новый экземпляр messages_collected_t. Затем снова накапливает N сообщений и т.д.

Возможно накопление только одного типа сообщений. Попытка отослать в collecting_mbox сообщение любого другого типа будет приводить к выбросу исключения.

Collecting_mbox можно использовать только для отсылки сообщений. Нельзя выполнять синхронные запросы с использованием collecting_mbox (т.е. нельзя передавать collecting_mbox в вызовы request_value и request_future -- это будет приводить к выбросу исключений).

Создание collecting_mbox

Для того, чтобы получить collecting_mbox, нужно сделать две вещи.

Во-первых, нужно настроить под свои нужды шаблон mbox_template_t из пространства имен so_5::extra::mboxes::collecting_mbox. Этот шаблон получает три параметра, лишь один из которых обязателен:

  • COLLECTING_MSG определяет тип сообщений/сигналов, которые нужно накапливать. В примере выше это был тип сигнала child_started_t;
  • TRAITS определяет тип дополнительных свойств mbox-а, зависящих от количества накапливаемых сообщений. По умолчанию в качестве TRAITS используется тип runtime_size_traits_t, который предназначен для случая, когда количество накапливаемых сообщений известно только в run-time. На эту тему далее чуть подробнее;
  • LOCK_TYPE определяет какой тип будет у мьютекса для защиты данных в mbox-е от многопоточного доступа. По умолчанию это std::mutex. Для однопоточных вариантов SObjectizer-а можно использовать so_5::null_mutex_t. Можно подсунуть и какой-нибудь собственный тип, который можно использовать совместно с std::lock_guard.

Обычно точный тип collecting_mbox-а задается посредством нужного typedef-а. Что-то вроде:

using my_msg_mbox_type = so_5::extra::mboxes::collecting_mbox::mbox_template_t<
      // Задаем только тип накапливаемых сообщений.
      // Остальные параметры остаются неизменными.
      my_msg>;

using another_mbox_type = so_5::extra::mboxes::collecting_mbox::mbox_template_t<
      // Тип сообщения.
      another_msg_type,
      // Говорим о том, что количество накапливаемых сообщений будет
      // известно только в run-time.
      so_5::extra::mboxes::collecting_mbox::runtime_size_traits_t,
      // Задаем null_mutex вместо std::mutex.
      // Т.е. mbox-ы этого типа будут пригодны к использованию только
      // в рамках однопоточного окружения.
      so_5::null_mutex_t>;

Подобные typedef-ы очень сильно упрощают жизнь при создании mbox-ов и при подписке на сообщение messages_collected_t.

Во-вторых, экземпляр collecting_mbox-а нужно создать. Делается это посредством статического метода make шаблонного класса mbox_template_t. Список параметров этого метода зависит от значения параметра TRAITS для mbox_template_t. Если в качестве TRAITS используется runtime_size_traits_t, то у mbox_template_t::make будет следующий формат:

so_5::mbox_t make(
      // В каком SObjectizer Environment идет работа.
      so_5::environment_t & env,
      // Куда прислать уведомление о накопленных сообщениях.
      const so_5::mbox_t & target_mbox,
      // Сколько именно сообщений нужно накапливать.
      std::size_t messages_to_collect);

Если в качестве TRAITS используется constexpr_size_traits_t, то формат mbox_template_t::make меняется, параметр messages_to_collect становится уже не нужным:

so_5::mbox_t make(
      // В каком SObjectizer Environment идет работа.
      so_5::environment_t & env,
      // Куда прислать уведомление о накопленных сообщениях.
      const so_5::mbox_t & target_mbox);

Если в качестве TRAITS используется еще какой-то тип, то формат mbox_template_t::make будет определяться этим типом. Пока в so_5_extra входит всего два готовых типа, которые могут использоваться в качестве параметра TRAITS: runtime_size_traits_t и constexpr_size_traits_t.

Внутри шаблона mbox_template_t определяется имя messages_collected_t. Это имя определяет тип сообщения, которое будет отсылаться в target_mbox когда collecting_mbox накопит заданное количество сообщений/шаблонов. Реальный тип этого сообщения зависит от нескольких факторов:

  • если COLLECTING_MSG задает тип сигнала, то messages_collected_t будет определять сообщение следующего вида:
    class messages_collected_t final : public so_5::message_t
    {
       ...
    public:
       std::size_t size();
    };
    Т.е., если collecting_mbox накапливает сигналы, то в messages_collected_t не будет никакой другой информации, кроме общего количества накопленных сигналов;
  • если COLLECTING_MSG задает тип сообщения, то messages_collected_t будет определять сообщение следующего вида:
    class messages_collected_t final : public so_5::message_t
    {
       some_container_t m_collected_messages;
       ...
    public:
       std::size_t size() const;

       templatetypename F >
       decltype(auto) with_nth( std::size_t index, F && f ) const;

       templatetypename F >
       void for_each( F && f ) const;

       templatetypename F >
       void for_each_with_index( F && f ) const;
    };
    Т.е. в messages_collected_t будут находиться все накопленные сообщения и к ним можно будет обратиться посредством методов with_nth, for_each и for_each_with_index. Но конкретный тип messages_collected_t и в этом случае будет зависеть от типа TRAITS.

Подробнее про параметр TRAITS

Самый хитрый параметр шаблона mbox_template_t -- это TRAITS. В реализации collecting_mbox-а были специально разделены случаи, когда количество накапливаемых сообщений известно в compile-time, и когда оно известно только в run-time.

Если количество накапливаемых сообщений известно в compile-time, то в реализации соответствующего messages_collected_t для хранения накопленных сообщений используется std::array, размер которого задается в compile-time.

Если же количество накапливаемых сообщения в compile-time не известно и становится доступно только в run-time, то в соответствующем messages_collected_t для хранения накопленных сообщений используется std::vector. Соответственно, с необходимостью динамически выделять память под его содержимое во время исполнения.

По умолчанию в качестве значения TRAITS используется runtime_size_traits_t. Это означает, что при создании mbox-а нужно будет указывать количество накапливаемых сообщений, а для их хранения в сообщении messages_collected_t будет использоваться std::vector.

Если же количество накапливаемых значений известно в compile-time, то можно использовать в качестве значения TRAITS тип constexpr_size_traits_t. Например:

using my_mbox_type = so_5::extra::mboxes::collecting_mbox::mbox_template_t<
      // Тип сообщения.
      my_msg_type,
      // Говорим о том, что количество накапливаемых сообщений нам
      // известно уже в compile-time.
      // Собираем всего по 5 сообщений.
      so_5::extra::mboxes::collecting_mbox::constexpr_size_traits_t<5>>;

Доступ к накопленным сообщениям

Если collecting_mbox используется для накопления сообщений, а не сигналов, то все накопленные экземпляры сообщений доступны для программиста внутри messages_collected_t. Получить доступ к накопленным сообщениям можно посредством методов with_nth, for_each и for_each_with_index.

Метод with_nth дает доступ к i-ому сообщению по индексу. На вход with_nth принимает индекс (от 0 до size()-1) и лямда-функцию/функтор. Эта лямбда-функция вызывается внутри with_nth и ей в качестве единственного параметра отдается mhood_t<COLLECTING_MSG>. Возвращенное лямбдой значение возвращается как результат работы with_nth. Например:

void my_agent::on_collected_msg(mhood_t<my_mbox_type::messages_collected_t> cmd)
{
   auto v = cmd->with_nth([](mhood_t<my_msg> m) { return m->some_value(); });
   ...
}

Метод for_each предназначен для обхода всех накопленных сообщений. На вход for_each принимает лямбда-функцию/функтор такого же формата, как и with_nth: единственный аргумент типа mhood_t<COLLECTING_MSG>. Эта лямбда-функция будет вызвана для каждого сообщения внутри messages_collected_t. Все, что возвращается лямбда-функцией, игнорируется, т.к. сама for_each ничего не возвращает. Пример использования for_each:

void my_agent::on_collected_msg(mhood_t<my_mbox_type::messages_collected_t> cmd)
{
   std::vector<std::string> names;
   names.reserve(cmd->size());
   cmd->for_each([&names](mhood_t<my_msg> m) { names.push_back(m->some_value()); });
   ...
}

Метод for_each_with_index так же предназначен для обхода всех накопленных сообщений, но при этом вместе с очередным сообщением в лямбда-функцию/функтор передается так же и порядковый номер сообщения. Т.о. у лямбда-функции два аргумента: первый -- это индекс типа std::size_t, второй -- это mhood_t<COLLECTING_MSG>. Возвращаемое значение лямбда-функции игнорируется, for_each_with_index ничего не возвращает. Пример использования:

void my_agent::on_collected_msg(mhood_t<my_mbox_type::messages_collected_t> cmd)
{
   std::vector<std::string> names(cmd->size());
   cmd->for_each_with_index(
      [&names](std::size_t index, mhood_t<my_msg> m) {
         names[index] = m->some_value();
      });
   ...
}

collecting_mbox и mutable_msg

Для того, чтобы с помощью collecting_mbox собирать мутабельные сообщения, необходимо выполнение трех условий:

Во-первых, в качестве параметра COLLECTING_MSG следует указывать тип, заключенный внутрь маркера so_5::mutable_msg:

using my_mutable_mbox_type = so_5::extra::mboxes::collecting_mbox::mbox_template_t<
      // Тип сообщения. Явно указывается, что это мутабельное сообщение.
      so_5::mutable_msg<my_msg_type>>;

Во-вторых, в качестве target_mbox-а следует указывать только multi-producer/single-consumer mbox. Например, direct-mbox конкретного агента-получателя. Или же mchain. Попытка задать multi-producer/multi-consumer mbox в качестве целевого, будет приводить к выбросу исключения при создании mbox-а.

В-третьих, messages_collected_t будет отсылаться в target_mbox как so_5::mutable_msg<messages_collected_t>. Именно в таком виде и следует подписываться на messages_collected_t:

void my_agent::so_define_agent() override
{
   so_subscribe_self().event(&my_agent::on_collected_msg);
   ...
}
void my_agent::on_collected_msg(
   mhood_t<so_5::mutable_msg<my_mutable_mbox_type::messages_collected_t>> cmd)
{
   ...
}

// Или же:
void my_agent::on_collected_msg(
   mutable_mhood_t<my_mutable_mbox_type::messages_collected_t> cmd)
{
   ...
}

Так же нужно учитывать, что в лямбда-функции, которые передаются в методы with_nth, for_each, for_each_with_index, накопленные сообщения будут приходить в виде mhood_t<mutable_msg<MSG>>. Т.е.:

void my_agent::on_collected_msg(
   mutable_mhood_t<my_mutable_mbox_type::messages_collected_t> cmd)
{
   cmd->with_nth([](mutable_mhood_t<my_msg_type> m) {
      ...
   });
}

Более сложный пример

Рассмотрим более сложный пример использования collecting_mbox. Предположим, что нам нужно хранить информацию о книгах, но в разделенном виде: каждая часть информации хранится в своей шарде. Т.е. один агент-шарда хранит только автора книги, второй агент хранит заголовок книги, третий агент хранит краткое описание. Для того, чтобы сохранить описание книги нам нужно отослать сообщение с описанием книги всем агентам-шардам и дождаться от каждого из них подтверждение о том, что книга сохранена. Аналогично, для того, чтобы собрать полное описание книги, нужно сделать запросы к каждому из агентов-шард запрос, после чего дождаться, пока каждый из агентов-шард ответит.

Вот как это может выглядеть в коде.

Сперва определим все сообщения и сопутствующие инструменты. Начнем с сопутствующих инструментов

// Эти значения будут указывать, какая часть описания книги нужна.
enum class field_id_t
{
   author,
   title,
   summary
};

// Описание одной книги.
struct book_description_t
{
   std::string m_author;
   std::string m_title;
   std::string m_summary;
};

// Набор специализаций функции get, которая из описания книги
// извлекает нужную часть.
template<field_id_t Field>
std::string get(const book_description_t &);

template<>
std::string get<field_id_t::author>(const book_description_t & b)
{
   return b.m_author;
}

template<>
std::string get<field_id_t::title>(const book_description_t & b)
{
   return b.m_title;
}

template<>
std::string get<field_id_t::summary>(const book_description_t & b)
{
   return b.m_summary;
}

А теперь сами сообщения, которые будут использоваться в обмене между агентами-шардами и остальным миром:

// Команда для сохранения описания очередной книги.
struct store_book_t : public so_5::message_t
{
   // Уникальный ID для книги. По этому ID книга затем может быть запрошена.
   const int m_key;
   // Описание новой книги.
   const book_description_t m_book;
   // Почтовый ящик для подтверждения о том, что книга создана.
   const so_5::mbox_t m_ack_to;

   store_book_t(int key, book_description_t book, so_5::mbox_t ack_to)
      :  m_key(key), m_book(std::move(book)), m_ack_to(std::move(ack_to))
   {}
};

// Ответное сообщение с подтверждением того, что книга создана.
struct store_book_ack_t : public so_5::message_t
{
   // Уникальный ID созданной книги.
   const int m_key;

   store_book_ack_t(int key)
      :  m_key(key)
   {}
};

// Запрос описания книги.
struct request_data_t : public so_5::message_t
{
   // Уникальный ID запрашиваемой книги.
   const int m_key;
   // Почтовый ящик, на который данные о книге нужно прислать.
   const so_5::mbox_t m_reply_to;

   request_data_t(int key, so_5::mbox_t reply_to)
      :  m_key(key), m_reply_to(std::move(reply_to))
   {}
};

// Сообщение от агента-шарды с той информацией о книге, которая у шарды есть.
struct data_t : public so_5::message_t
{
   // Уникальный ID запрошенной книги.
   const int m_key;
   // Какая именно часть описания книги лежит в m_field.
   const field_id_t m_field;
   // Та часть описания, которая была у шарды.
   const std::string m_data;

   data_t(int key, field_id_t field, std::string data)
      :  m_key(key), m_field(field), m_data(std::move(data))
   {}
};

Теперь можно определить тип агента-шарды. Это будет шаблон, который параметризуется конкретным field_id:

template<
   // Параметр шаблона задает что именно будет храниться агентом:
   // автор, заголовок или краткое описание.
   field_id_t Field>
class shard_t final : public so_5::agent_t
{
public:
   shard_t(context_t ctx, so_5::mbox_t command_mbox)
      :  so_5::agent_t(std::move(ctx))
   {
      // Команды агенту будут поступать из специального почтового ящика.
      // Всего агенту потребуется реагировать на две команды, поэтому
      // подписываются два обработчика событий.
      so_subscribe(command_mbox)
         .event(&shard_t::on_store_book_t)
         .event(&shard_t::on_request_data);
   }

private:
   // Вся информация хранится в ассоциативном контейнере с доступом
   // по уникальному ID книги.
   std::map<int, std::string> m_data;

   void on_store_book_t(mhood_t<store_book_t> cmd)
   {
      // При получении команды на сохранение новой книги сохраняем у себя
      // часть описания книги и отсылаем подтверждение.
      m_data[cmd->m_key] = get<Field>(cmd->m_book);
      so_5::send<store_book_ack_t>(cmd->m_ack_to, cmd->m_key);
   }

   void on_request_data(mhood_t<request_data_t> cmd)
   {
      // При запросе отсылаем ту часть, которая хранится у агента.
      so_5::send<data_t>(cmd->m_reply_to, cmd->m_key, Field, m_data[cmd->m_key]);
   }
};

Ну и теперь можно показать агента, который общается с агентами-шардами используя функциональность collecting_mbox-ов.

Агент sample_performer_t при своем старте отсылает несколько команд store_book. Когда на очередную команду store_book он получает подтверждение (в виде сообщения store_book_ack) он запрашивает описание сохраненной книги. После того, как все описания будут получены, агент sample_performer_t завершает работу примера.

Прежде чем показать код sample_performer-а, нужно заострить внимание на паре моментов:

  • поскольку количество агентов-шард у нас известно заранее и в процессе работы примера оно измениться не может в принципе, то используются collecting_mbox-ы со свойствами constexpr_size_traits_t;
  • ответы на команды store_book и request_book запросто могут перемешаться. Например, агент-шарда может получить два сообщения store_book подряд и сразу же ответить на них store_book_ack. И только после этого эти две первые store_book дойдут до двух остальных агентов-шард. Соответственно, нам нужно не просто накапливать ответные store_book_ack и data, но и группировать их по уникальному ID книги. Если мы этого не сделаем, то запросто можем получить messages_collected_t с тремя экземплярами сообщения data_t внутри, однако каждый экземпляр будет относиться к разным книгам! Для того, чтобы избавиться от такой проблемы агент sample_performer создает новый collecting_mbox для каждой своей команды. Получается, что ответы на конкретную команду будут идти в отдельный mbox и, поэтому, эти ответы не будут перемешиваться. Создание нового collecting_mbox-а -- это дешевая операция, да и уничтожается collecting_mbox автоматически после того, как перестает использоваться.

Ну а теперь сам код агента sample_performer:

class sample_performer_t final : public so_5::agent_t
{
   // Количество книг, которые используются в примере.
   static constexpr std::size_t total_books = 3;

public:
   // Управляющий mbox, через который команды отсылаются агентам-шардам
   // создается снаружи и передается через конструктор.
   sample_performer_t(context_t ctx, so_5::mbox_t command_mbox)
      :  so_5::agent_t(std::move(ctx))
      ,  m_command_mbox(std::move(command_mbox))
   {
      // Нам нужно ловить всего два типа сообщений, которые будут
      // прилетать на персональный почтовый ящик агента.
      so_subscribe_self()
         .event(&sample_performer_t::on_store_ack)
         .event(&sample_performer_t::on_data);
   }

   virtual void so_evt_start() override
   {
      // В начале своей работы регистрируем описания нескольких книг.
      std::array<book_description_t, total_books> books{{
         { "Miguel De Cervantes""Don Quixote",
            "The story of the gentle knight and his servant Sancho Panza has "
            "entranced readers for centuries. " },
         { "Jonathan Swift""Gulliver's Travels",
            "A wonderful satire that still works for all ages, despite the "
            "savagery of Swift's vision." },
         { "Stendhal""The Charterhouse of Parma",
            "Penetrating and compelling chronicle of life in an Italian "
            "court in post-Napoleonic France." }
      }};

      int key = 0;
      forconst auto & b : books )
      {
         so_5::send<store_book_t>(m_command_mbox,
               key,
               b,
               // Под каждую команду создаем новый collecting_mbox для
               // накопления ответов. Накопленные ответы будут отсылаться
               // на персональный mbox агента.
               store_ack_mbox_t::make(so_environment(), so_direct_mbox()));

         ++key;
      }
   }

private:
   // Определяем тип collecting_mbox-а для подтверждений сохранения новой книги.
   using store_ack_mbox_t = so_5::extra::mboxes::collecting_mbox::mbox_template_t<
         store_book_ack_t,
         // Количество накапливаемых ответов известно в compile-time,
         // поэтому используем соответствующий trait.
         so_5::extra::mboxes::collecting_mbox::constexpr_size_traits_t<3>>;

   // Определяем тип collecting_mbox-а для ответов на запрос информации о книге.
   using data_mbox_t = so_5::extra::mboxes::collecting_mbox::mbox_template_t<
         data_t,
         so_5::extra::mboxes::collecting_mbox::constexpr_size_traits_t<3>>;

   // Почтовый ящик, через который отдаются команды агентам-шардам.
   const so_5::mbox_t m_command_mbox;

   std::size_t m_books_received = 0;

   void on_store_ack(mhood_t<store_ack_mbox_t::messages_collected_t> cmd)
   {
      // У всех накопленных сообщений внутри одинаковый ID.
      // Берем его значение из первого сообщения.
      const auto key = cmd->with_nth(0, [](auto m) { return m->m_key; });
      std::cout << "Book with key=" << key << " is stored" << std::endl;

      // Раз книга успешно сохранена, то можно запросить ее описание.
      so_5::send<request_data_t>(m_command_mbox,
            key,
            // Для каждого запроса создаем новый collecting_mbox.
            data_mbox_t::make(so_environment(), so_direct_mbox()));
   }

   void on_data(mhood_t<data_mbox_t::messages_collected_t> cmd)
   {
      // Опять же, у всех сообщений внутри одинаковый ID.
      // Чтобы получить его достаточно обратиться к первому сообщению.
      const auto key = cmd->with_nth(0, [](auto m) { return m->m_key; });

      // Каждый агент-шарда прислал свою часть описания книги.
      // Для того, чтобы собрать полное описание нам потребуется пройтись
      // по всем накопленным сообщениям. Для чего воспользуемся for_each-ем.
      book_description_t book;
      cmd->for_each([&book](auto m) {
         // Тут придется понять, что за часть к нам пришла.
         if(field_id_t::author == m->m_field)
            book.m_author = m->m_data;
         else if(field_id_t::title == m->m_field)
            book.m_title = m->m_data;
         else if(field_id_t::summary == m->m_field)
            book.m_summary = m->m_data;
      });

      std::cout << "Book with key=" << key << " is {"
            << book.m_author << ", '" << book.m_title << "', "
            << book.m_summary << "}" << std::endl;

      // Если получили все книги, то пример можно завершить.
      ++m_books_received;
      if(total_books == m_books_received)
         so_deregister_agent_coop_normally();
   }
};

Заключение

У нас у самих пока еще нет большого опыта работы с collecting_mbox, поэтому первая реализация может казаться примитивной и ограниченной. Отчасти так и есть, поскольку хотелось сделать лишь самый базовый функционал, который казался очевидным. А дальше уже посмотреть на использование collecting_mbox-а и расширять его на основании набитых в реальных проектах шишек.

Так что если кто-то видит какие-то проблемы в первой версии collecting_mbox или у кого-то есть идеи, как это можно расширить и углубить, то мы будем рады услышать ваше мнение дабы учесть его в дальнейшем.

Отправить комментарий