Line data Source code
1 : #include <iostream>
2 : #include <exception>
3 : #include "server_tcp.hpp"
4 : #include "tcp_acceptor.hpp"
5 : #include <wfc/logger.hpp>
6 : #include <iow/ip/tcp/server/server.hpp>
7 :
8 :
9 : namespace wfc{ namespace io{
10 :
11 : typedef iinterface::io_id_t io_id_t;
12 : typedef iinterface::output_handler_t output_handler_t;
13 : typedef iinterface::data_ptr data_ptr;
14 :
15 :
16 0 : class server_tcp::impl
17 : : public ::iow::ip::tcp::server::server<tcp_acceptor>
18 : {
19 : public:
20 : typedef server::io_service_type io_service_type;
21 0 : explicit impl(io_service_type& io)
22 0 : : server(io)
23 0 : {}
24 : };
25 :
26 0 : server_tcp::~server_tcp()
27 : {
28 0 : }
29 :
30 0 : server_tcp::server_tcp()
31 : {
32 0 : }
33 :
34 0 : void server_tcp::initialize()
35 : {
36 0 : if ( auto g = this->global() )
37 : {
38 0 : auto target = this->options().target;
39 0 : _target = std::make_shared<winterface>( this->get_target<iinterface>(target), this->suspended() );
40 0 : }
41 0 : }
42 :
43 0 : void server_tcp::start()
44 : {
45 0 : this->run_();
46 0 : }
47 :
48 0 : void server_tcp::reconfigure_basic()
49 : {
50 0 : _target->set_suspend(this->suspended());
51 0 : }
52 :
53 0 : void server_tcp::restart()
54 : {
55 0 : auto opt = this->options();
56 0 : auto shutdown = std::move(_impl);
57 0 : if ( _port!=opt.port || _addr!=opt.addr )
58 : {
59 0 : this->get_workflow()->post([shutdown](){
60 0 : shutdown->stop();
61 0 : });
62 : }
63 : else
64 : {
65 0 : shutdown->stop();
66 : }
67 0 : this->run_();
68 0 : }
69 :
70 :
71 0 : void server_tcp::run_()
72 : {
73 0 : auto opt = this->options();
74 0 : _port = opt.port;
75 0 : _addr = opt.addr;
76 0 : bool keep_alive = opt.keep_alive;
77 0 : auto ptarget = _target;
78 :
79 0 : if ( auto g = this->global() )
80 0 : _impl = std::make_shared<impl>( g->io_service );
81 : else
82 0 : return;
83 :
84 0 : opt.connection.input_handler =
85 0 : [ptarget, keep_alive](data_ptr d, io_id_t id, output_handler_t cb )
86 : {
87 0 : if ( !keep_alive )
88 : {
89 0 : cb = [cb](data_ptr d1) {
90 0 : cb(std::move(d1));
91 0 : cb(nullptr);
92 0 : };
93 : }
94 0 : ptarget->perform_io(std::move(d), id, cb);
95 0 : };
96 :
97 : // По дефолту nonblocking=true, но мы вешаем accept на поток, поэтому нужен блокируемый
98 0 : opt.nonblocking = false;
99 0 : opt.connection.target = ptarget;
100 0 : opt.thread_startup = std::bind( &server_tcp::reg_thread, this );
101 0 : opt.thread_shutdown = std::bind( &server_tcp::unreg_thread, this );
102 :
103 0 : if ( opt.rn )
104 : {
105 0 : if ( opt.connection.reader.sep.empty() ) opt.connection.reader.sep = "\r\n";
106 0 : if ( opt.connection.writer.sep.empty() ) opt.connection.writer.sep = "\r\n";
107 : }
108 :
109 0 : this->stat_init_(&opt);
110 :
111 : try
112 : {
113 0 : _impl->start( opt );
114 : }
115 0 : catch(const std::exception& e)
116 : {
117 0 : DOMAIN_LOG_FATAL( "server_tcp: " << this->options().port << " error: " << e.what() )
118 0 : }
119 : }
120 :
121 0 : void server_tcp::stat_init_(options_type* opt)
122 : {
123 0 : if ( auto stat = this->get_statistics() )
124 : {
125 0 : std::weak_ptr<server_tcp> wthis = this->shared_from_this();
126 0 : value_meter proto_time;
127 0 : value_meter proto_total;
128 0 : auto tcount = std::make_shared< std::atomic<int> >();
129 :
130 0 : size_t id = size_t(tcount->fetch_add(1));
131 0 : std::stringstream ss;
132 0 : ss << this->name() << ".thread" << id;
133 0 : proto_time = stat->create_value_meter( ss.str());
134 0 : std::stringstream ss1;
135 0 : ss1 << this->name() << ".threads";
136 0 : proto_total = stat->create_value_meter( ss1.str());
137 :
138 0 : opt->thread_statistics= [wthis, proto_time, tcount, proto_total](std::thread::id, size_t count, workflow_options::statistics_duration span) mutable
139 : {
140 0 : if ( auto pthis = wthis.lock() )
141 : {
142 0 : if ( pthis->get_statistics()!=nullptr )
143 : {
144 0 : auto span_mcs = std::chrono::duration_cast<std::chrono::microseconds>(span).count();
145 0 : proto_time.create(span_mcs, count );
146 0 : proto_total.create(span_mcs, count );
147 : }
148 0 : }
149 0 : };
150 0 : }
151 0 : }
152 :
153 0 : void server_tcp::stop()
154 : {
155 0 : if ( _impl != nullptr )
156 : {
157 0 : _impl->stop();
158 0 : _impl=nullptr;
159 : }
160 0 : }
161 :
162 3 : }}
|