Line data Source code
1 :
2 : #include <wfc/iinterface.hpp>
3 : #include "statistics_domain.hpp"
4 : #include <wfc/statistics/statistics.hpp>
5 : #include <fas/utility/ignore_args.hpp>
6 : #include <ctime>
7 :
8 : namespace wfc{ namespace core{
9 :
10 :
11 0 : class statistics_domain::stat_impl
12 : : public ::wfc::statistics::statistics
13 : , public ::wfc::iinterface
14 : {
15 : public:
16 0 : explicit stat_impl(const wrtstat::wrtstat_options& opt )
17 0 : : statistics( opt)
18 0 : {}
19 : };
20 :
21 :
22 0 : statistics_domain::~statistics_domain()
23 : {
24 0 : }
25 :
26 0 : void statistics_domain::reconfigure_basic()
27 : {
28 0 : std::lock_guard<mutex_type> lk(_mutex);
29 0 : _stat->enable( !this->suspended() );
30 0 : for (auto st : _stat_list)
31 0 : st->enable( !this->suspended() );
32 0 : }
33 :
34 0 : void statistics_domain::reconfigure()
35 : {
36 0 : auto opt = this->options();
37 0 : _suspend_push = opt.suspend_push;
38 0 : std::lock_guard<mutex_type> lk(_mutex);
39 0 : _stat = std::make_shared<stat_impl>( opt );
40 0 : _stat->enable( !this->suspended() );
41 0 : _stat_list.clear();
42 0 : for (auto w : _workflow_list)
43 0 : w->stop();
44 0 : _workflow_list.clear();
45 0 : _stat_list.reserve(opt.workers);
46 0 : _workflow_list.reserve(opt.workers);
47 0 : workflow_options wopt;
48 0 : wopt.threads = 1;
49 0 : wopt.startup_handler=[this](std::thread::id){ this->reg_thread();};
50 0 : wopt.finish_handler=[this](std::thread::id){ this->unreg_thread();};
51 0 : for (size_t i = 0 ; i < opt.workers; ++i)
52 : {
53 0 : _stat_list.push_back( std::make_shared<stat_push>( opt ) );
54 0 : _stat_list.back()->enable( !this->suspended() );
55 0 : _workflow_list.push_back( std::make_shared<workflow_type>(this->global()->io_service, wopt) );
56 0 : _workflow_list.back()->start();
57 : }
58 :
59 0 : this->reg_object( "statistics", this->name(), _stat, false);
60 0 : }
61 :
62 0 : void statistics_domain::initialize()
63 : {
64 0 : std::lock_guard<mutex_type> lk(_mutex);
65 0 : _targets.reserve(64);
66 0 : _targets.clear();
67 0 : for ( auto target: this->options().targets )
68 0 : _targets.push_back( this->get_target<istatistics>(target) );
69 0 : }
70 :
71 0 : void statistics_domain::restart()
72 : {
73 0 : auto opt = this->options();
74 0 : if ( auto wf = this->get_workflow() )
75 : {
76 0 : auto st = _stat;
77 0 : wf->release_timer(_timer_id);
78 0 : _timer_id = wf->create_timer(
79 : std::chrono::milliseconds(opt.aggregate_timeout_ms),
80 : std::bind(&statistics_domain::handler_<stat_ptr>, this, st, 0, 1)
81 0 : );
82 0 : }
83 :
84 0 : for (size_t i = 0 ; i < opt.workers; ++i)
85 : {
86 0 : if (auto wf = _workflow_list[i] )
87 : {
88 0 : auto st = _stat_list[i];
89 0 : wf->create_timer(
90 : std::chrono::milliseconds(opt.aggregate_timeout_ms),
91 : std::bind(&statistics_domain::handler_<stat_push_ptr>, this, st, i, opt.workers)
92 0 : );
93 0 : }
94 : }
95 :
96 0 : if ( auto st = this->get_statistics() )
97 : {
98 0 : _push_meter = st->create_time_meter("push.time");
99 0 : _count_meter = st->create_size_meter("push.values");
100 0 : }
101 0 : }
102 :
103 0 : void statistics_domain::start()
104 : {
105 0 : _start_point = std::chrono::system_clock::now();
106 0 : this->restart();
107 0 : }
108 :
109 0 : void statistics_domain::stop()
110 : {
111 0 : if ( auto wf = this->get_workflow() )
112 : {
113 0 : for ( int id : _timers)
114 0 : wf->release_timer(id);
115 0 : _timers.clear();
116 0 : }
117 :
118 0 : if ( auto g = this->global() )
119 : {
120 0 : g->registry.erase( "statistics", this->name());
121 0 : }
122 0 : }
123 :
124 0 : void statistics_domain::push( wfc::statistics::request::push::ptr req, wfc::statistics::response::push::handler cb)
125 : {
126 0 : if ( this->bad_request(req, cb) )
127 0 : return;
128 :
129 0 : time_point tm;
130 0 : size_point vm;
131 0 : if ( auto st = this->get_statistics() )
132 : {
133 0 : tm = _push_meter.create( static_cast<wrtstat::size_type>(1) );
134 0 : vm = _count_meter.create( static_cast<wrtstat::value_type>(req->data.size()) );
135 0 : }
136 :
137 0 : if ( req->ts == 0 )
138 0 : req->ts = time(nullptr) * 1000000;
139 :
140 0 : auto res = this->create_response(cb);
141 : {
142 0 : read_lock<mutex_type> lk(_mutex);
143 0 : size_t pos = std::hash<std::string>()(req->name) % _stat_list.size();
144 0 : if (auto wf = _workflow_list[pos] )
145 : {
146 0 : auto st = _stat_list[pos];
147 0 : auto preq = std::make_shared<wfc::statistics::request::push>( std::move(*req) );
148 0 : wf->post([st, preq](){
149 0 : st->add( preq->name, *preq);
150 0 : }, nullptr);
151 0 : }
152 : }
153 :
154 0 : this->send_response( std::move(res), std::move(cb) );
155 0 : fas::ignore_args(tm, vm);
156 : }
157 :
158 0 : void statistics_domain::del( wfc::statistics::request::del::ptr req, wfc::statistics::response::del::handler cb)
159 : {
160 0 : if ( this->bad_request(req, cb) )
161 0 : return;
162 :
163 0 : auto res = this->create_response(cb);
164 :
165 : {
166 0 : read_lock<mutex_type> lk(_mutex);
167 0 : size_t pos = std::hash<std::string>()(req->name) % _stat_list.size();
168 0 : if (auto wf = _workflow_list[pos] )
169 : {
170 0 : auto st = _stat_list[pos];
171 0 : auto preq = std::make_shared<wfc::statistics::request::del>( std::move(*req) );
172 0 : wf->post([st, preq](){ st->del( preq->name); }, nullptr);
173 0 : }
174 : }
175 :
176 0 : if ( res != nullptr )
177 0 : res->status = true;
178 : /*
179 : if ( auto st = this->get_stat_(req->name) )
180 : {
181 : res->status = st->del(req->name);
182 : }
183 : */
184 0 : this->send_response( std::move(res), std::move(cb) );
185 :
186 0 : for ( auto wt : _targets ) if ( auto t = wt.lock() )
187 : {
188 0 : auto rreq = std::make_unique<wfc::statistics::request::del>( *req );
189 0 : t->del( std::move(rreq), nullptr );
190 0 : }
191 : /*if ( auto t = _target.lock() )
192 : {
193 : t->del( std::move(req), nullptr );
194 : }*/
195 : }
196 :
197 :
198 : template<typename StatPtr>
199 0 : bool statistics_domain::handler_(StatPtr st, size_t offset, size_t step)
200 : {
201 0 : auto opt = this->options();
202 0 : if ( !_started )
203 : {
204 0 : auto now = std::chrono::system_clock::now();
205 0 : auto diff = std::chrono::duration_cast<std::chrono::milliseconds>( now - _start_point ).count();
206 0 : _started = diff > opt.startup_ignore_ms;
207 0 : if ( !_started ) return true;
208 : }
209 :
210 0 : size_t count = st->aggregators_count();
211 0 : for ( size_t i = offset; i < count; i+=step)
212 : {
213 0 : std::string sname = st->get_name(i);
214 0 : while (auto ag = st->pop(i) )
215 : {
216 : typedef wrtstat::aggregated_data aggregated;
217 0 : auto req = std::make_unique<statistics::request::push>();
218 0 : req->name = sname;
219 0 : static_cast<aggregated&>(*req) = std::move(*ag);
220 :
221 0 : if ( !_targets.empty() )
222 : {
223 0 : for ( size_t j = 1; j < _targets.size(); ++j ) if ( auto t = _targets[j].lock() )
224 : {
225 0 : t->push(std::make_unique<wfc::statistics::request::push>(*req), nullptr);
226 : }
227 :
228 0 : if ( auto t = _targets[0].lock() )
229 : {
230 0 : t->push(std::move(req), nullptr);
231 : }
232 : }
233 : }
234 : }
235 0 : return true;
236 : }
237 :
238 : /*
239 : statistics_domain::stat_ptr statistics_domain::get_stat_(const std::string& name)
240 : {
241 : read_lock<mutex_type> lk(_mutex);
242 : if ( _stat_list.empty() )
243 : return _stat;
244 : size_t pos = std::hash<std::string>()(name) % _stat_list.size();
245 : return _stat_list[pos];
246 : }
247 : */
248 :
249 : /*
250 : statistics_domain::workflow_ptr statistics_domain::get_workflow_(const std::string& name)
251 : {
252 : read_lock<mutex_type> lk(_mutex);
253 : if ( _wor_list.empty() )
254 : return _stat;
255 : size_t pos = std::hash<std::string>()(name) % _stat_list.size();
256 : return _stat_list[pos];
257 : }
258 : */
259 :
260 :
261 3 : }}
|