Line data Source code
1 :
2 : #include "client_tcp_adapter.hpp"
3 : #include <wfc/logger.hpp>
4 : #include <wfc/wfc_exit.hpp>
5 : #include <iow/ip/tcp/client/client.hpp>
6 : #include <iow/io/io_id.hpp>
7 : #include <wfc/mutex.hpp>
8 :
9 : namespace wfc{ namespace io{
10 :
11 0 : class client_tcp_adapter::handler_wrapper: public iinterface
12 : {
13 : public:
14 0 : explicit handler_wrapper(output_handler_t handler): _handler(handler) {}
15 0 : virtual void perform_io( iinterface::data_ptr d, io_id_t /*id*/, output_handler_t /*handler*/) override
16 : {
17 0 : _handler( std::move(d) );
18 0 : }
19 : private:
20 : output_handler_t _handler;
21 : };
22 :
23 0 : class client_tcp_adapter::impl
24 : : public ::iow::ip::tcp::client::multi_thread<>
25 : {
26 : typedef ::iow::ip::tcp::client::multi_thread<> super;
27 : public:
28 0 : explicit impl( io_service_type& io)
29 0 : : super(io){}
30 : };
31 :
32 0 : client_tcp_adapter::client_tcp_adapter( io_service_type& io)
33 : : _io (io)
34 0 : , _id ( ::iow::io::create_id<io_id_t>() )
35 : {
36 0 : _holder_id = 0;
37 0 : _client = std::make_shared<client_type>(io);
38 0 : }
39 :
40 0 : client_tcp_adapter::~client_tcp_adapter()
41 : {
42 0 : if ( auto h = _holder.lock() )
43 : {
44 0 : h->unreg_io(_id);
45 0 : }
46 0 : }
47 :
48 0 : void client_tcp_adapter::stop()
49 : {
50 0 : _client->stop();
51 0 : }
52 :
53 0 : void client_tcp_adapter::start( options_type opt)
54 : {
55 0 : auto pthis = this->shared_from_this();
56 :
57 0 : if ( opt.connection.input_handler == nullptr )
58 : {
59 0 : opt.connection.input_handler = [pthis](data_ptr d, io_id_t, output_handler_t handler)
60 : {
61 0 : if ( auto holder = pthis->get_holder() )
62 : {
63 0 : holder->perform_io(std::move(d), pthis->_id, std::move(handler) );
64 0 : }
65 0 : };
66 : }
67 :
68 0 : auto connect_handler = opt.args.connect_handler;
69 0 : opt.args.connect_handler = [pthis, connect_handler]()
70 : {
71 0 : if ( auto holder = pthis->get_holder() )
72 : {
73 0 : holder->reg_io( pthis->_id, pthis );
74 0 : }
75 0 : if ( connect_handler!=nullptr )
76 0 : connect_handler();
77 0 : };
78 :
79 0 : auto shutdown_handler = opt.connection.shutdown_handler;
80 0 : opt.connection.shutdown_handler = [pthis, shutdown_handler](io_id_t id)
81 : {
82 0 : if ( auto holder = pthis->get_holder() )
83 : {
84 0 : auto this_id = pthis->_id;
85 0 : pthis->_io.post([holder, this_id]()
86 : {
87 0 : holder->unreg_io(this_id);
88 0 : });
89 :
90 0 : }
91 0 : if ( shutdown_handler != nullptr )
92 0 : shutdown_handler( id );
93 0 : };
94 :
95 : try
96 : {
97 0 : _client->start(opt);
98 : }
99 0 : catch(const std::exception& e)
100 : {
101 0 : if ( opt.abort_if_error )
102 : {
103 0 : DOMAIN_LOG_FATAL( "client-tcp: " << opt.port << " error: " << e.what() )
104 : }
105 : else
106 : {
107 0 : DOMAIN_LOG_ERROR( "client-tcp: " << opt.port << " error: " << e.what() )
108 : }
109 0 : }
110 0 : }
111 :
112 0 : std::shared_ptr<iinterface> client_tcp_adapter::get_holder() const
113 : {
114 0 : read_lock<mutex_type> lk(_mutex);
115 0 : return _holder.lock();
116 : }
117 :
118 : // iinterface
119 :
120 0 : void client_tcp_adapter::reg_io(io_id_t io_id, std::weak_ptr<iinterface> itf)
121 : {
122 0 : std::lock_guard<mutex_type> lk(_mutex);
123 0 : if ( _holder_id > 0 && _holder_id!=io_id )
124 : {
125 0 : DOMAIN_LOG_FATAL("client-tcp configuration error! Several sources are unacceptable _holder_id=" << _holder_id <<", io_id="<< io_id)
126 : }
127 0 : _holder_id = io_id;
128 0 : _holder = itf;
129 0 : _wrapper = nullptr;
130 0 : }
131 :
132 0 : void client_tcp_adapter::unreg_io(io_id_t io_id)
133 : {
134 0 : std::lock_guard<mutex_type> lk(_mutex);
135 :
136 0 : if ( _holder_id!=io_id )
137 0 : return;
138 :
139 0 : _holder_id = 0;
140 0 : _wrapper = nullptr;
141 : }
142 :
143 :
144 0 : void client_tcp_adapter::perform_io( iinterface::data_ptr d, io_id_t io_id, output_handler_t handler)
145 : {
146 0 : auto pitf = this->get_holder();
147 0 : if ( pitf == nullptr || io_id==0)
148 : {
149 0 : std::lock_guard<mutex_type> lk(_mutex);
150 0 : if ( _holder.lock() == nullptr || io_id==0)
151 : {
152 0 : if ( handler!=nullptr || io_id==0 )
153 : {
154 0 : _wrapper = std::make_shared<handler_wrapper>(handler);
155 0 : _holder = _wrapper;
156 0 : _holder_id = io_id;
157 : }
158 0 : }
159 : }
160 :
161 0 : if ( auto rd = _client->send( std::move(d) ) )
162 : {
163 0 : if (handler!=nullptr)
164 : {
165 0 : handler( nullptr );
166 : }
167 0 : }
168 0 : }
169 :
170 3 : }}
|