Line data Source code
1 : #include "server_udp.hpp"
2 : #include <wfc/logger.hpp>
3 : #include <iow/ip/udp/server/server.hpp>
4 :
5 :
6 : namespace wfc{ namespace io{
7 :
8 : typedef iinterface::io_id_t io_id_t;
9 : typedef iinterface::output_handler_t output_handler_t;
10 : typedef iinterface::data_ptr data_ptr;
11 :
12 0 : class server_udp::impl
13 : : public ::iow::ip::udp::server::server_base<>
14 : {
15 : public:
16 : typedef server::io_service_type io_service_type;
17 0 : explicit impl(io_service_type& io)
18 0 : : server(io)
19 : {
20 0 : }
21 : };
22 :
23 0 : server_udp::~server_udp()
24 : {
25 0 : }
26 :
27 0 : server_udp::server_udp()
28 : {
29 0 : }
30 :
31 0 : void server_udp::initialize()
32 : {
33 0 : if ( auto g = this->global() )
34 : {
35 0 : auto target = this->options().target_name;
36 0 : _target = std::make_shared<winterface>( this->get_target<iinterface>(target), this->suspended() );
37 0 : }
38 0 : }
39 :
40 0 : void server_udp::start()
41 : {
42 0 : this->run_();
43 0 : }
44 :
45 0 : void server_udp::reconfigure_basic()
46 : {
47 0 : _target->set_suspend(this->suspended());
48 0 : }
49 :
50 0 : void server_udp::restart()
51 : {
52 0 : auto opt = this->options();
53 0 : auto shutdown = std::move(_impl);
54 0 : if ( _port!=opt.port || _addr!=opt.addr )
55 : {
56 0 : this->get_workflow()->post([shutdown](){
57 0 : shutdown->stop();
58 0 : });
59 : }
60 : else
61 : {
62 0 : shutdown->stop();
63 : }
64 0 : this->run_();
65 0 : }
66 :
67 :
68 0 : void server_udp::run_()
69 : {
70 0 : auto opt = this->options();
71 0 : _port = opt.port;
72 0 : _addr = opt.addr;
73 0 : auto ptarget = _target;
74 :
75 0 : if ( auto g = this->global() )
76 0 : _impl = std::make_shared<impl>( g->io_service );
77 : else
78 0 : return;
79 : using namespace std::placeholders;
80 0 : opt.input_handler = std::bind(&winterface::perform_io, ptarget, _1, _2, _3);
81 0 : opt.nonblocking = false;
82 0 : opt.target = ptarget;
83 0 : opt.thread_startup = std::bind( &server_udp::reg_thread, this );
84 0 : opt.thread_shutdown = std::bind( &server_udp::unreg_thread, this );
85 0 : if ( opt.rn )
86 : {
87 0 : if ( opt.reader.sep.empty() ) opt.reader.sep = "\r\n";
88 0 : if ( opt.writer.sep.empty() ) opt.writer.sep = "\r\n";
89 : }
90 :
91 0 : this->stat_init_(&opt);
92 :
93 : try
94 : {
95 0 : _impl->start( opt );
96 : }
97 0 : catch(const std::exception& e)
98 : {
99 0 : DOMAIN_LOG_FATAL( "server_udp: " << this->options().port << " error: " << e.what() )
100 0 : }
101 : }
102 :
103 0 : void server_udp::stat_init_(options_type* opt)
104 : {
105 0 : if ( auto stat = this->get_statistics() )
106 : {
107 0 : std::weak_ptr<server_udp> wthis = this->shared_from_this();
108 0 : value_meter proto_time;
109 0 : value_meter proto_total;
110 0 : std::stringstream ss;
111 0 : proto_time = stat->create_value_meter( ss.str());
112 0 : std::stringstream ss1;
113 0 : proto_total = stat->create_value_meter( ss1.str());
114 :
115 : //auto tcount = std::make_shared< std::atomic<int> >();
116 0 : opt->thread_statistics=
117 0 : [wthis, proto_time, /*tcount,*/ proto_total]
118 0 : (std::thread::id, size_t count, workflow_options::statistics_duration span) mutable
119 : {
120 0 : if ( auto pthis = wthis.lock() )
121 : {
122 0 : if ( pthis->get_statistics()!=nullptr )
123 : {
124 0 : auto span_mcs = std::chrono::duration_cast<std::chrono::microseconds>(span).count();
125 0 : proto_time.create(span_mcs, count );
126 0 : proto_total.create(span_mcs, count );
127 : }
128 0 : }
129 0 : };
130 0 : }
131 0 : }
132 :
133 0 : void server_udp::stop()
134 : {
135 0 : if ( _impl != nullptr )
136 : {
137 0 : _impl->stop();
138 0 : _impl=nullptr;
139 : }
140 0 : }
141 :
142 3 : }}
|