LCOV - code coverage report
Current view: top level - package/statistics/statistics_aggregator - statistics_domain.cpp (source / functions) Hit Total Coverage
Test: wfc_core-coverage.info Lines: 1 137 0.7 %
Date: 2019-09-05 Functions: 2 27 7.4 %

          Line data    Source code
       1             : 
       2             : #include <wfc/iinterface.hpp>
       3             : #include "statistics_domain.hpp"
       4             : #include <wfc/statistics/statistics.hpp>
       5             : #include <fas/utility/ignore_args.hpp>
       6             : #include <ctime>
       7             : 
       8             : namespace wfc{ namespace core{
       9             :  
      10             : 
      11           0 : class statistics_domain::stat_impl
      12             :   : public ::wfc::statistics::statistics
      13             :   , public ::wfc::iinterface
      14             : {
      15             : public:
      16           0 :   explicit stat_impl(const wrtstat::wrtstat_options& opt )
      17           0 :     : statistics( opt)
      18           0 :   {}
      19             : };
      20             : 
      21             : 
      22           0 : statistics_domain::~statistics_domain()
      23             : {
      24           0 : }
      25             : 
      26           0 : void statistics_domain::reconfigure_basic()
      27             : {
      28           0 :   std::lock_guard<mutex_type> lk(_mutex);
      29           0 :   _stat->enable( !this->suspended()  );
      30           0 :   for (auto st : _stat_list)
      31           0 :     st->enable( !this->suspended()  );
      32           0 : }
      33             : 
      34           0 : void statistics_domain::reconfigure()
      35             : {
      36           0 :   auto opt =  this->options();
      37           0 :   _suspend_push = opt.suspend_push;
      38           0 :   std::lock_guard<mutex_type> lk(_mutex);
      39           0 :   _stat = std::make_shared<stat_impl>( opt );
      40           0 :   _stat->enable( !this->suspended()  );
      41           0 :   _stat_list.clear();
      42           0 :   for (auto w : _workflow_list)
      43           0 :     w->stop();
      44           0 :   _workflow_list.clear();
      45           0 :   _stat_list.reserve(opt.workers);
      46           0 :   _workflow_list.reserve(opt.workers);
      47           0 :   workflow_options wopt;
      48           0 :   wopt.threads = 1;
      49           0 :   wopt.startup_handler=[this](std::thread::id){ this->reg_thread();};
      50           0 :   wopt.finish_handler=[this](std::thread::id){ this->unreg_thread();};
      51           0 :   for (size_t i = 0 ; i < opt.workers; ++i)
      52             :   {
      53           0 :     _stat_list.push_back( std::make_shared<stat_push>( opt ) );
      54           0 :     _stat_list.back()->enable( !this->suspended() );
      55           0 :     _workflow_list.push_back( std::make_shared<workflow_type>(this->global()->io_service, wopt) );
      56           0 :     _workflow_list.back()->start();
      57             :   }
      58             :   
      59           0 :   this->reg_object( "statistics", this->name(), _stat, false);
      60           0 : }
      61             : 
      62           0 : void statistics_domain::initialize() 
      63             : {
      64           0 :   std::lock_guard<mutex_type> lk(_mutex);
      65           0 :   _targets.reserve(64);
      66           0 :   _targets.clear();
      67           0 :   for ( auto target: this->options().targets )
      68           0 :     _targets.push_back( this->get_target<istatistics>(target) );
      69           0 : }
      70             : 
      71           0 : void statistics_domain::restart() 
      72             : {
      73           0 :   auto opt = this->options();
      74           0 :   if ( auto wf = this->get_workflow() )
      75             :   {
      76           0 :     auto st = _stat;
      77           0 :     wf->release_timer(_timer_id);
      78           0 :     _timer_id = wf->create_timer( 
      79             :       std::chrono::milliseconds(opt.aggregate_timeout_ms), 
      80             :       std::bind(&statistics_domain::handler_<stat_ptr>, this, st, 0, 1) 
      81           0 :     );
      82           0 :   }
      83             :   
      84           0 :   for (size_t i = 0 ; i < opt.workers; ++i)
      85             :   {
      86           0 :     if (auto wf = _workflow_list[i] )
      87             :     {
      88           0 :       auto st = _stat_list[i];
      89           0 :       wf->create_timer( 
      90             :         std::chrono::milliseconds(opt.aggregate_timeout_ms), 
      91             :         std::bind(&statistics_domain::handler_<stat_push_ptr>, this, st, i, opt.workers) 
      92           0 :       );
      93           0 :     }
      94             :   }
      95             :   
      96           0 :   if ( auto st = this->get_statistics() )
      97             :   {
      98           0 :     _push_meter = st->create_time_meter("push.time");
      99           0 :     _count_meter = st->create_size_meter("push.values");
     100           0 :   }
     101           0 : }
     102             : 
     103           0 : void statistics_domain::start() 
     104             : {
     105           0 :   _start_point = std::chrono::system_clock::now();
     106           0 :   this->restart();
     107           0 : }
     108             : 
     109           0 : void statistics_domain::stop() 
     110             : {
     111           0 :   if ( auto wf = this->get_workflow() ) 
     112             :   {
     113           0 :     for ( int id : _timers)
     114           0 :       wf->release_timer(id);
     115           0 :     _timers.clear();
     116           0 :   }
     117             : 
     118           0 :   if ( auto g = this->global() )
     119             :   {
     120           0 :     g->registry.erase( "statistics", this->name());
     121           0 :   }
     122           0 : }
     123             : 
     124           0 : void statistics_domain::push( wfc::statistics::request::push::ptr req, wfc::statistics::response::push::handler cb) 
     125             : {
     126           0 :   if ( this->bad_request(req, cb) )
     127           0 :     return;
     128             :   
     129           0 :   time_point tm;
     130           0 :   size_point vm;
     131           0 :   if ( auto st = this->get_statistics() )
     132             :   {
     133           0 :     tm = _push_meter.create( static_cast<wrtstat::size_type>(1) );
     134           0 :     vm = _count_meter.create( static_cast<wrtstat::value_type>(req->data.size()) );
     135           0 :   }
     136             :   
     137           0 :   if ( req->ts == 0 )
     138           0 :     req->ts = time(nullptr) * 1000000;
     139             :   
     140           0 :   auto res = this->create_response(cb);
     141             :   {
     142           0 :     read_lock<mutex_type> lk(_mutex);
     143           0 :     size_t pos = std::hash<std::string>()(req->name) % _stat_list.size();
     144           0 :     if (auto wf = _workflow_list[pos] )
     145             :     {
     146           0 :       auto st = _stat_list[pos];
     147           0 :       auto preq = std::make_shared<wfc::statistics::request::push>( std::move(*req) );
     148           0 :       wf->post([st, preq](){ 
     149           0 :         st->add( preq->name, *preq); 
     150           0 :       }, nullptr);
     151           0 :     }
     152             :   }
     153             : 
     154           0 :   this->send_response( std::move(res), std::move(cb) );
     155           0 :   fas::ignore_args(tm, vm);
     156             : }
     157             : 
     158           0 : void statistics_domain::del( wfc::statistics::request::del::ptr req, wfc::statistics::response::del::handler cb) 
     159             : {
     160           0 :   if ( this->bad_request(req, cb) )
     161           0 :     return;
     162             : 
     163           0 :   auto res = this->create_response(cb);
     164             :   
     165             :   {
     166           0 :     read_lock<mutex_type> lk(_mutex);
     167           0 :     size_t pos = std::hash<std::string>()(req->name) % _stat_list.size();
     168           0 :     if (auto wf = _workflow_list[pos] )
     169             :     {
     170           0 :       auto st = _stat_list[pos];
     171           0 :       auto preq = std::make_shared<wfc::statistics::request::del>( std::move(*req) );
     172           0 :       wf->post([st, preq](){ st->del( preq->name); }, nullptr);
     173           0 :     }
     174             :   }
     175             : 
     176           0 :   if ( res != nullptr )
     177           0 :     res->status = true;
     178             :   /*
     179             :   if ( auto st = this->get_stat_(req->name) )
     180             :   {
     181             :     res->status = st->del(req->name);
     182             :   }
     183             :   */
     184           0 :   this->send_response( std::move(res), std::move(cb) );
     185             :   
     186           0 :   for ( auto wt : _targets ) if ( auto t = wt.lock() )
     187             :   {
     188           0 :     auto rreq = std::make_unique<wfc::statistics::request::del>( *req );
     189           0 :     t->del( std::move(rreq), nullptr );
     190           0 :   }
     191             :   /*if ( auto t = _target.lock() )
     192             :   {
     193             :     t->del( std::move(req), nullptr );
     194             :   }*/
     195             : }
     196             : 
     197             : 
     198             : template<typename StatPtr>
     199           0 : bool statistics_domain::handler_(StatPtr st, size_t offset, size_t step)
     200             : {
     201           0 :   auto opt = this->options();
     202           0 :   if ( !_started )
     203             :   {
     204           0 :     auto now = std::chrono::system_clock::now();
     205           0 :     auto diff = std::chrono::duration_cast<std::chrono::milliseconds>( now - _start_point ).count();
     206           0 :     _started = diff > opt.startup_ignore_ms;
     207           0 :     if ( !_started ) return true;
     208             :   }
     209             :    
     210           0 :   size_t count = st->aggregators_count();
     211           0 :   for ( size_t i = offset; i < count; i+=step)
     212             :   {
     213           0 :     std::string sname = st->get_name(i);
     214           0 :     while (auto ag = st->pop(i) )
     215             :     {
     216             :       typedef wrtstat::aggregated_data aggregated;
     217           0 :       auto req = std::make_unique<statistics::request::push>();
     218           0 :       req->name = sname;
     219           0 :       static_cast<aggregated&>(*req) = std::move(*ag);
     220             :         
     221           0 :       if ( !_targets.empty() )
     222             :       {
     223           0 :         for ( size_t j = 1; j < _targets.size(); ++j ) if ( auto t = _targets[j].lock() )
     224             :         {
     225           0 :           t->push(std::make_unique<wfc::statistics::request::push>(*req), nullptr);
     226             :         }
     227             :         
     228           0 :         if ( auto t = _targets[0].lock() )
     229             :         {
     230           0 :           t->push(std::move(req), nullptr);
     231             :         }
     232             :       }
     233             :     }
     234             :   }
     235           0 :   return true;
     236             : }
     237             : 
     238             : /*
     239             : statistics_domain::stat_ptr statistics_domain::get_stat_(const std::string& name)
     240             : {
     241             :   read_lock<mutex_type> lk(_mutex);
     242             :   if ( _stat_list.empty() )
     243             :     return _stat;
     244             :   size_t pos = std::hash<std::string>()(name) % _stat_list.size();
     245             :   return _stat_list[pos];
     246             : }
     247             : */
     248             : 
     249             : /*
     250             : statistics_domain::workflow_ptr statistics_domain::get_workflow_(const std::string& name)
     251             : {
     252             :   read_lock<mutex_type> lk(_mutex);
     253             :   if ( _wor_list.empty() )
     254             :     return _stat;
     255             :   size_t pos = std::hash<std::string>()(name) % _stat_list.size();
     256             :   return _stat_list[pos];
     257             : }
     258             : */
     259             : 
     260             : 
     261           3 : }}

Generated by: LCOV version 1.10