понедельник, 8 июня 2015 г.

[prog.c++11] Промежуточный итог экспериментов с конвейерами и реактивным программированием поверх SO-5

Эксперименты с реактивными программированием и выстраиванием обработчиков сообщений в конвейеры начались из-за обсуждения релиза SO-5.5.5 на LOR-е. То, что получилось на данный момент -- это чистой воды черновой proof-of-concept, который развивается как один из примеров в дистрибутиве SO-5. Каких-либо планов по включению pipelines в состав штатных средств SObjectizer пока нет. Более того...

Сейчас вовсе не очевидно, нужно ли раскручивать эту тему дальше. Прямо сейчас идет выбор между тем, чтобы:

  1. Просто написать комментарии в коде примера, чтобы затем можно было разобраться что к чему, зафиксировать версию SO-5.5.6 и приступить к работе над SO-5.6.0;
  2. Попробовать в рамках этого примера расширить функциональность pipelines настолько, насколько это возможно. Затем таки написать комментарии, зафиксировать версию SO-5.5.6 и приступить к работе над SO-5.6.0 :)

Проблема первого варианта в том, что "нет ничего более постоянного, чем временное". Т.е. если упустить возможность сделать хорошо сейчас, потом возможности вернуться и переделать "по-уму", уже не будет.

Проблема второго варианта в том, что если расширять уже имеющуюся функциональность pipelines, скажем, такими вещами, как switcher и joiner, то совершенно не понятно, во что это выльется по трудозатратам. Т.е. может осенить сразу и красивое рабочее решение найдется буквально сегодня-завтра. А можно убить неделю-две и ничего путного не придумать.

Поэтому я пока просто покажу то, что есть уже сейчас. Ну а потом посмотрю, есть ли к этому какой-нибудь интерес. Если интереса не будет, то можно смело идти по первому варианту (т.е. консервировать работы в этом направлении).

Итак, в качестве примера с конвейером взят пример с обработкой данных от датчика температуры из вчерашнего поста:

RAW_val => validation => conversion ==> archiving
                                    `=> distribution
                                    `=> range_checking => alarm_detection ==> alarm_initiation
                                                                          `=> alarm_distribution

Т.е. "сырые" данные, представляющие из себя два 8-ми битовых значения из регистров датчика, поступают на стадию валидации, дабы устранить эффект "дребезга" аппаратуры. Если данные признаны валидными, то они идут на стадию конвертации, т.е. преобразования двух 8-ми битовых полей в одно поле типа float с температурой в градусах Цельсия.

Сконвертированные данные поступают сразу на три параллельных, независимых друг от друга ветки. Первые две просто архивируют (например, в БД) и распространяют данные во внешний мир (например, посредством DDS). А вот третья ветка является еще одним конвейером из нескольких стадий.

Первая стадия третьей ветки -- это проверка на выход температуры за переделы разрешенного диапазона. Затем, если такой выход состоялся, следует вторая стадия -- проверка того, что это не единичный случайный выход, а реальный признак опасности. И если такая опасность обнаружена, то далее опять два параллельных, независимых действия: архивирование сигнала тревоги (например, в БД) и его распространение во внешний мир (например, посредством DDS).

В коде организация и использование такого конвейера записывается буквально вот так:

virtual void so_evt_start() override
{
   // Создание конвейера.
   // Возвращается mbox самой первой стадии созданного конвейера.
   // Отсылая сообщения на этот mbox инициируется цепочка событий
   // по обработке отосланного сообщения.
   auto pipeline = make_pipeline( *this,
         src | stage(validation) | stage(conversion) | broadcast(
            src | stage(archivation),
            src | stage(distribution),
            src | stage(range_checking) | stage(alarm_detector{}) | broadcast(
               src | stage(alarm_initiator),
               src | stage( []( const alarm_detected & v ) {
                     alarm_distribution( cerr, v );
                  } )
               )
            ),
         autoname );

   send_delayed_to_agent< shutdown >( *this, chrono::seconds(1) );

   // Использование конвейера.
   // Посредством отложенных сообщений имитируется поступление
   // данных от датчика температуры.
   foruint8_t i = 0; i < static_castuint8_t >(250); i += 10 )
      send_delayed< raw_value >(
            so_environment(),
            pipeline,
            chrono::milliseconds( i ),
            raw_measure{ 00, i } );
}

Здесь каждая цепочка начинается со специального макера src. В принципе, посредством перегрузки operator| можно было бы от этого вообще отказаться. Но пока src оставлен по двум основным причинам:

  • так намного проще отслеживать, где именно начинается каждая из цепочек;
  • со временем в src можно передавать какие-то параметры для конвейера (например, поведение конвейера в случае перегрузки, дефолтный диспетчер для стадий обработки и т.д.)

Стадии конвейера создаются либо вспомогательной функцией stage(), либо функцией broadcast(). Инструкция broadcast говорит о том, что результат обработки данной стадии будет размножен по всем заданным внутри broadcast-а параллельным веткам.

В качестве аргументов для stage() могут выступать обычные свободные функции (большинство стадий в этом примере являются таковыми), stateful-объекты с перегруженным operator() (здесь таковым является стадия alarm_detector), а так же лямбды (на самом деле это ни что иное, как те же самые stateful-объекты, но генерируемые автоматически компилятором). В этом примере есть все из вышеперечисленного. Впрочем, наверное проще привести полный текст всех обработчиков стадий конвейера:

msg_ptr_t< valid_raw_value >
validation( const raw_value & v )
{
   if0x7 >= v.m_data.m_high_bits )
      return make_message< valid_raw_value >( v.m_data );
   else
      return make_empty< valid_raw_value >();
}

msg_ptr_t< sensor_value >
conversion( const valid_raw_value & v )
{
   return make_message< sensor_value >(
      calculated_measure{ v.m_data.m_meter_id,
         0.5f * ((static_castuint16_t >( v.m_data.m_high_bits ) << 8) +
            v.m_data.m_low_bits) } );
}

void
archivation( const sensor_value & v )
{
   clog << "archiving (" << v.m_data.m_meter_id << ","
      << v.m_data.m_measure << ")" << endl;
}

void
distribution( const sensor_value & v )
{
   clog << "distributing (" << v.m_data.m_meter_id << ","
      << v.m_data.m_measure << ")" << endl;
}

msg_ptr_t< suspicion_value >
range_checking( const sensor_value & v )
{
   if( v.m_data.m_measure >= 45.0f )
      return make_message< suspicion_value >( v.m_data );
   else
      return make_empty< suspicion_value >();
}

class alarm_detector
{
   using clock = chrono::steady_clock;

public :
   msg_ptr_t< alarm_detected >
   operator()( const suspicion_value & v )
   {
      if( m_has_value )
         if( m_previous + chrono::milliseconds(25) > clock::now() )
         {
            m_has_value = false;
            return make_message< alarm_detected >( v.m_data.m_meter_id );
         }

      m_previous = clock::now();
      m_has_value = true;
      return make_empty< alarm_detected >();
   }

private :
   clock::time_point m_previous;
   bool m_has_value = false;
};

void
alarm_initiator( const alarm_detected & v )
{
   clog << "=== alarm (" << v.m_meter_id << ") ===" << endl;
}

void
alarm_distribution( ostream & to, const alarm_detected & v )
{
   to << "alarm_distribution (" << v.m_meter_id << ")" << endl;
}

Собственно, каждая стадия, за исключением alarm_detector, -- это простейшая функция. Alarm_detector чуть сложнее, т.к. он должен сохранять свое состояние между вызовами. Да и alarm_detector мог бы быть чуть проще, но не хотелось ради optional подключать Boost к проекту (а в MSVC++ компиляторах experimental/optional может и не быть).

Понятное дело, что если бы нужно было организовывать конвейеры в реальных приложениях, то кода в каждой стадии обработки было бы больше. Но, очевидно, что если бы каждую из вышеперечисленных стадий пришлось описывать в виде отдельного класса агента, с опеределением so_define_agent(), с провязыванием агентов в цепочки, то кода было бы еще больше. Так что в этом плане эксперимент оказался не только интересным, но и удачным.

Еще большим плюсом, нежели сокращение объема кода, является типобезопасность. Типы входного аргумента и результирующего значения каждой стадии конвейера выводятся шаблонной магией в compile-time. И в compile-time же происходит контроль за совместимостью двух соседних стадий по типам. Т.е. стадию conversion не получится соединить со стадией alarm_detector, поскольку выход conversion не совместим со входом для alarm_detector. И вот как раз ради такого строгого контроля со стороны компилятора весь этот эксперимент и затевался.

Текущая версия примера make_pipeline может быть найдена в репозитории. Сразу хочу предупредить, что код там черновой, непричесанный, без комментариев. Да и возни с C++ными шаблонами там порядком. А т.к. с шаблонной магией я на "Вы", то и отличного качества шаблонных наворотов ожидать не стоит.

Вот такая ситуация на сегодня. Как она изменится в последующие дни -- не знаю. С одной стороны, хочется реализовать такой тип обработчиков, как switcher. Но и застревать над ним на неделю-две нет желания. Так что буду думать.

Комментариев нет: