Line data Source code
1 : #include "client_tcp_map.hpp"
2 : #include <wfc/logger.hpp>
3 :
4 : namespace wfc{ namespace io{
5 :
6 : class client_tcp_map::handler_wrapper: public iinterface
7 : {
8 : public:
9 : explicit handler_wrapper(output_handler_t handler): _handler(handler) {}
10 : virtual void perform_io( iinterface::data_ptr d, io_id_t /*id*/, output_handler_t /*handler*/) override
11 : {
12 : _handler( std::move(d) );
13 : }
14 : private:
15 : output_handler_t _handler;
16 : };
17 :
18 0 : client_tcp_map::client_tcp_map( io_service_type& io)
19 0 : : _io(io)
20 : {
21 0 : }
22 :
23 0 : void client_tcp_map::reconfigure(const options_type& opt)
24 : {
25 : /*
26 : client_list_t client_list;
27 :
28 : {
29 : read_lock<mutex_type> lk(_mutex);
30 : for ( auto& item : _clients )
31 : client_list.push_back(item.second);
32 : for ( auto& cli : _primary_pool )
33 : client_list.push_back(cli);
34 : for ( auto& cli : _secondary_pool)
35 : client_list.push_back(cli);
36 : for ( auto& cli : _startup_pool )
37 : client_list.push_back(cli);
38 : }
39 :
40 : for ( auto& cli : client_list )
41 : cli->stop();
42 : client_list.clear();
43 : */
44 0 : this->stop_all_clients();
45 0 : client_list_t client_list;
46 : {
47 0 : std::lock_guard<mutex_type> lk(_mutex);
48 0 : _opt = opt;
49 0 : auto shutdown_handler = opt.connection.shutdown_handler;
50 0 : _opt.connection.shutdown_handler = [shutdown_handler](io_id_t id)
51 : {
52 0 : if ( shutdown_handler!=nullptr )
53 0 : shutdown_handler(id);
54 0 : };
55 :
56 0 : for ( auto& item : _clients )
57 : {
58 0 : item.second = std::make_shared<client_type>(_io);
59 0 : client_list.push_back(item.second);
60 0 : item.second->start(opt);
61 : }
62 :
63 0 : _secondary_pool.clear();
64 :
65 0 : _primary_pool.clear();
66 0 : while ( _primary_pool.size() < opt.primary_pool )
67 : {
68 0 : auto cli = std::make_shared<client_type>(_io);
69 0 : _primary_pool.push_back(cli);
70 0 : client_list.push_back(cli);
71 0 : }
72 :
73 0 : _startup_pool.clear();
74 0 : if ( _startup_flag )
75 : {
76 0 : while ( _startup_pool.size() < opt.startup_pool )
77 : {
78 0 : auto cli = std::make_shared<client_type>(_io);
79 0 : _startup_pool.push_back(cli);
80 0 : client_list.push_back(cli);
81 0 : }
82 :
83 0 : _startup_flag = false;
84 0 : }
85 : }
86 :
87 0 : for ( auto& cli : client_list )
88 0 : cli->start(_opt);
89 :
90 0 : _stop_flag = false;
91 0 : }
92 :
93 0 : void client_tcp_map::stop()
94 : {
95 0 : _stop_flag = true;
96 0 : this->stop_all_clients();
97 :
98 0 : std::lock_guard<mutex_type> lk(_mutex);
99 0 : _clients.clear();
100 0 : _primary_pool.clear();
101 0 : _secondary_pool.clear();
102 0 : _startup_pool.clear();
103 : /*
104 : client_list_t client_list;
105 : {
106 : read_lock<mutex_type> lk(_mutex);
107 : for ( auto& item : _clients )
108 : client_list.push_back(item.second);
109 : for ( auto& cli : _primary_pool )
110 : client_list.push_back(cli);
111 : for ( auto& cli : _secondary_pool)
112 : client_list.push_back(cli);
113 : for ( auto& cli : _startup_pool )
114 : client_list.push_back(cli);
115 : _clients.clear();
116 : _primary_pool.clear();
117 : _secondary_pool.clear();
118 : _startup_pool.clear();
119 : }
120 :
121 :
122 : for ( auto& cli : client_list )
123 : cli->stop();
124 : */
125 0 : }
126 :
127 0 : client_tcp_map::client_ptr client_tcp_map::find( io_id_t id ) const
128 : {
129 0 : read_lock<mutex_type> lk(_mutex);
130 0 : return this->find_(id);
131 : }
132 :
133 0 : client_tcp_map::client_ptr client_tcp_map::upsert(io_id_t id)
134 : {
135 : {
136 0 : read_lock<mutex_type> lk(_mutex);
137 0 : if ( client_ptr cli = this->find_(id) )
138 0 : return cli;
139 : }
140 :
141 0 : auto cli = this->create_();
142 0 : std::lock_guard<mutex_type> lk(_mutex);
143 0 : _clients.insert( std::make_pair(id, cli) );
144 0 : return cli;
145 : }
146 :
147 : // iinterface
148 :
149 0 : void client_tcp_map::reg_io( io_id_t id, std::weak_ptr<iinterface> holder)
150 : {
151 0 : if ( _stop_flag ) return;
152 0 : if ( client_ptr cli = this->upsert(id) )
153 : {
154 0 : cli->reg_io(id, holder);
155 0 : }
156 : }
157 :
158 0 : void client_tcp_map::unreg_io( io_id_t id)
159 : {
160 0 : if ( _stop_flag ) return;
161 0 : if ( client_ptr cli = this->find(id) )
162 : {
163 0 : cli->unreg_io(id);
164 : {
165 0 : std::lock_guard<mutex_type> lk(_mutex);
166 0 : _clients.erase(id);
167 0 : if ( _secondary_pool.size() < _opt.secondary_pool )
168 0 : _secondary_pool.push_back( std::move(cli) );
169 : }
170 :
171 : // Если не перенесен в пул
172 0 : if ( cli!=nullptr )
173 0 : cli->stop();
174 0 : }
175 : }
176 :
177 0 : client_tcp_map::client_ptr client_tcp_map::create()
178 : {
179 0 : std::lock_guard<mutex_type> lk(_mutex);
180 0 : return this->create_();
181 : }
182 0 : void client_tcp_map::perform_io( data_ptr d, io_id_t id, output_handler_t handler)
183 : {
184 0 : if ( _stop_flag ) return;
185 :
186 : // Для клиента reg_io обязателен
187 0 : if ( auto cli1 = this->find(id ) )
188 : {
189 0 : cli1->perform_io( std::move(d), id, std::move(handler) );
190 : }
191 0 : else if ( client_ptr cli2 = this->create() )
192 : {
193 : // Опционально
194 0 : cli2->perform_io( std::move(d), 0, [this, cli2, handler](data_ptr d2) mutable
195 : {
196 0 : this->free(cli2);
197 : //cli2 = nullptr;
198 0 : if (handler!=nullptr)
199 0 : handler(std::move(d2));
200 0 : });
201 : }
202 0 : else if ( handler != nullptr )
203 : {
204 : // Если напрямую подключили server-tcp в режиме direct_mode или server-udp
205 0 : SYSTEM_LOG_FATAL("client_tcp_map::perform_io: object (io_id=" << id << ") not registered. Probably a configuration error.")
206 0 : handler(nullptr);
207 0 : }
208 : }
209 :
210 : // private
211 :
212 0 : client_tcp_map::client_ptr client_tcp_map::find_( io_id_t id ) const
213 : {
214 0 : auto itr = _clients.find(id);
215 0 : if ( itr!=_clients.end() )
216 0 : return itr->second;
217 0 : return nullptr;
218 : }
219 :
220 0 : void client_tcp_map::stop_all_clients()
221 : {
222 0 : client_list_t client_list;
223 :
224 : {
225 0 : read_lock<mutex_type> lk(_mutex);
226 : std::transform(std::begin(_clients), std::end(_clients), std::back_inserter(client_list),
227 0 : [](const client_map_t::value_type& p){return p.second;});
228 0 : std::copy(std::begin(_primary_pool), std::end(_primary_pool), std::back_inserter(client_list));
229 0 : std::copy(std::begin(_secondary_pool), std::end(_secondary_pool), std::back_inserter(client_list));
230 0 : std::copy(std::begin(_startup_pool), std::end(_startup_pool), std::back_inserter(client_list));
231 : }
232 :
233 0 : for ( auto& cli : client_list )
234 0 : cli->stop();
235 0 : client_list.clear();
236 0 : }
237 :
238 0 : client_tcp_map::client_ptr client_tcp_map::create_()
239 : {
240 0 : client_ptr cli;
241 :
242 0 : if ( !_startup_pool.empty() )
243 : {
244 0 : cli = _startup_pool.front();
245 0 : _startup_pool.pop_front();
246 : }
247 0 : else if ( !_secondary_pool.empty() )
248 : {
249 0 : cli = _secondary_pool.front();
250 0 : _secondary_pool.pop_front();
251 : }
252 0 : else if ( !_primary_pool.empty() )
253 : {
254 0 : cli = _primary_pool.front();
255 0 : _primary_pool.pop_front();
256 0 : client_ptr new_cli = std::make_shared<client_type>(_io);
257 0 : new_cli->start(this->_opt);
258 0 : _primary_pool.push_back(new_cli);
259 : }
260 : else
261 : {
262 0 : cli = std::make_shared<client_type>(_io);
263 0 : cli->start(this->_opt);
264 : }
265 0 : return cli;
266 : }
267 :
268 0 : void client_tcp_map::free(client_ptr cli)
269 : {
270 0 : std::lock_guard<mutex_type> lk(_mutex);
271 0 : if ( _secondary_pool.size() < _opt.secondary_pool )
272 0 : _secondary_pool.push_back( std::move(cli) );
273 : // Если не перенесен в пул
274 0 : if ( cli!=nullptr )
275 0 : cli->stop();
276 0 : }
277 :
278 3 : }}
|