LCOV - code coverage report
Current view: top level - package/server/server_tcp - server_tcp.cpp (source / functions) Hit Total Coverage
Test: wfc_io-coverage.info Lines: 1 89 1.1 %
Date: 2019-09-05 Functions: 2 30 6.7 %

          Line data    Source code
       1             : #include <iostream>
       2             : #include <exception>
       3             : #include "server_tcp.hpp"
       4             : #include "tcp_acceptor.hpp"
       5             : #include <wfc/logger.hpp>
       6             : #include <iow/ip/tcp/server/server.hpp>
       7             : 
       8             : 
       9             : namespace wfc{ namespace io{
      10             :   
      11             : typedef iinterface::io_id_t io_id_t;
      12             : typedef iinterface::output_handler_t output_handler_t;
      13             : typedef iinterface::data_ptr data_ptr;
      14             : 
      15             : 
      16           0 : class server_tcp::impl
      17             :   : public ::iow::ip::tcp::server::server<tcp_acceptor>
      18             : {
      19             : public:
      20             :   typedef server::io_service_type io_service_type;
      21           0 :   explicit impl(io_service_type& io)
      22           0 :     : server(io)
      23           0 :   {}
      24             : };
      25             : 
      26           0 : server_tcp::~server_tcp()
      27             : {
      28           0 : }
      29             : 
      30           0 : server_tcp::server_tcp()
      31             : {
      32           0 : }
      33             : 
      34           0 : void server_tcp::initialize()
      35             : {
      36           0 :   if ( auto g = this->global() )
      37             :   {
      38           0 :     auto target = this->options().target;
      39           0 :     _target = std::make_shared<winterface>( this->get_target<iinterface>(target), this->suspended() );
      40           0 :   }
      41           0 : }
      42             : 
      43           0 : void server_tcp::start()
      44             : {
      45           0 :   this->run_();
      46           0 : }
      47             : 
      48           0 : void server_tcp::reconfigure_basic() 
      49             : {
      50           0 :   _target->set_suspend(this->suspended());
      51           0 : }
      52             : 
      53           0 : void server_tcp::restart()
      54             : {
      55           0 :   auto opt = this->options();
      56           0 :   auto shutdown = std::move(_impl);
      57           0 :   if ( _port!=opt.port || _addr!=opt.addr )
      58             :   {
      59           0 :     this->get_workflow()->post([shutdown](){
      60           0 :       shutdown->stop();
      61           0 :     });
      62             :   }
      63             :   else
      64             :   {
      65           0 :     shutdown->stop();
      66             :   }
      67           0 :   this->run_();
      68           0 : }
      69             : 
      70             : 
      71           0 : void server_tcp::run_()
      72             : {
      73           0 :   auto opt = this->options();
      74           0 :   _port = opt.port;
      75           0 :   _addr = opt.addr;
      76           0 :   bool keep_alive = opt.keep_alive;
      77           0 :   auto ptarget = _target;
      78             : 
      79           0 :   if ( auto g = this->global() )
      80           0 :     _impl = std::make_shared<impl>( g->io_service );
      81             :   else
      82           0 :     return;
      83             :     
      84           0 :   opt.connection.input_handler = 
      85           0 :     [ptarget, keep_alive](data_ptr d, io_id_t id, output_handler_t cb )
      86             :   {
      87           0 :     if ( !keep_alive )
      88             :     {
      89           0 :       cb = [cb](data_ptr d1) {
      90           0 :         cb(std::move(d1));
      91           0 :         cb(nullptr);
      92           0 :       };
      93             :     }
      94           0 :     ptarget->perform_io(std::move(d), id, cb);
      95           0 :   };
      96             :   
      97             :   // По дефолту nonblocking=true, но мы вешаем accept на поток, поэтому нужен блокируемый 
      98           0 :   opt.nonblocking = false;
      99           0 :   opt.connection.target = ptarget;
     100           0 :   opt.thread_startup = std::bind( &server_tcp::reg_thread, this );
     101           0 :   opt.thread_shutdown = std::bind( &server_tcp::unreg_thread, this );
     102             : 
     103           0 :   if ( opt.rn )
     104             :   {
     105           0 :     if ( opt.connection.reader.sep.empty() ) opt.connection.reader.sep = "\r\n";
     106           0 :     if ( opt.connection.writer.sep.empty() ) opt.connection.writer.sep = "\r\n";
     107             :   }
     108             : 
     109           0 :   this->stat_init_(&opt);
     110             : 
     111             :   try
     112             :   {
     113           0 :     _impl->start( opt );
     114             :   }
     115           0 :   catch(const std::exception& e)
     116             :   {
     117           0 :     DOMAIN_LOG_FATAL( "server_tcp: " << this->options().port << " error: " << e.what() )
     118           0 :   }
     119             : }
     120             : 
     121           0 : void server_tcp::stat_init_(options_type* opt)
     122             : {
     123           0 :   if ( auto stat = this->get_statistics() )
     124             :   {
     125           0 :     std::weak_ptr<server_tcp> wthis = this->shared_from_this();
     126           0 :     value_meter proto_time;
     127           0 :     value_meter proto_total;
     128           0 :     auto tcount = std::make_shared< std::atomic<int> >();
     129             :       
     130           0 :     size_t id = size_t(tcount->fetch_add(1));
     131           0 :     std::stringstream ss;
     132           0 :     ss << this->name() << ".thread" << id;
     133           0 :     proto_time = stat->create_value_meter( ss.str());
     134           0 :     std::stringstream ss1;
     135           0 :     ss1 << this->name() << ".threads";
     136           0 :     proto_total = stat->create_value_meter( ss1.str());
     137             :       
     138           0 :     opt->thread_statistics= [wthis, proto_time,  tcount, proto_total](std::thread::id, size_t count, workflow_options::statistics_duration span) mutable
     139             :     {
     140           0 :       if ( auto pthis = wthis.lock() )
     141             :       {
     142           0 :         if ( pthis->get_statistics()!=nullptr )
     143             :         {
     144           0 :           auto span_mcs = std::chrono::duration_cast<std::chrono::microseconds>(span).count();
     145           0 :           proto_time.create(span_mcs, count );
     146           0 :           proto_total.create(span_mcs, count );
     147             :         }
     148           0 :       }
     149           0 :     };
     150           0 :   }
     151           0 : }
     152             : 
     153           0 : void server_tcp::stop() 
     154             : {
     155           0 :   if ( _impl != nullptr )
     156             :   {
     157           0 :     _impl->stop();
     158           0 :     _impl=nullptr;
     159             :   }
     160           0 : }
     161             : 
     162           3 : }}

Generated by: LCOV version 1.10