Line data Source code
1 : //
2 : // Author: Vladimir Migashko <migashko@gmail.com>, (C) 2016
3 : //
4 : // Copyright: See COPYING file that comes with this distribution
5 : //
6 :
7 : #include "tank.hpp"
8 : #include <wfc/logger.hpp>
9 : #include <wfc/memory.hpp>
10 : #include <condition_variable>
11 :
12 : #include <iostream>
13 : #include <atomic>
14 : #include <memory>
15 : #include <chrono>
16 : #include <iomanip>
17 :
18 : #define TANK_LOG_MESSAGE(message) WFC_LOG_MESSAGE("tank", message)
19 : // #define TANK_LOG_DEBUG(message) WFC_LOG_DEBUG("tank", message)
20 :
21 : namespace demo{ namespace pingpong{
22 :
23 0 : void tank::reconfigure()
24 : {
25 0 : _discharge = this->options().discharge;
26 0 : _power = this->options().power;
27 0 : }
28 :
29 0 : void tank::initialize()
30 : {
31 0 : _target = this->get_target<ipinger>( this->options().target );
32 0 : }
33 :
34 0 : void tank::stop()
35 : {
36 0 : _thread.join();
37 0 : }
38 :
39 0 : void tank::start()
40 : {
41 0 : this->global()->after_start.insert([this]() -> bool
42 : {
43 0 : this->get_workflow()->post(
44 : std::chrono::seconds(3),
45 0 : [this]()
46 : {
47 0 : this->_thread = std::thread( std::bind( &tank::fire, this) );
48 0 : },
49 0 : nullptr);
50 0 : return false;
51 0 : });
52 0 : }
53 :
54 0 : void tank::fire()
55 : {
56 0 : this->reg_thread();
57 0 : time_t show_time = time(nullptr);
58 0 : long tatal_rate = 0;
59 0 : long discharge_count = 0;
60 0 : std::mutex m;
61 0 : std::condition_variable cond_var;
62 0 : while( !this->system_is_stopped() )
63 : {
64 0 : ++discharge_count;
65 0 : std::atomic<long> messages_count;
66 0 : messages_count = 0;
67 0 : std::atomic<long> dcount;
68 0 : dcount = _discharge.load();
69 0 : auto start_discharge = clock_t::now();
70 0 : if ( auto t = _target.lock() )
71 : {
72 0 : for ( long i = 0 ; i < _discharge && !this->system_is_stopped(); ++i )
73 : {
74 : using namespace std::placeholders;
75 0 : auto req = std::make_unique<ball>();
76 0 : req->power = _power;
77 0 : auto tp = clock_t::now();
78 0 : if ( dcount == 0) break; // зависаетт иногда приостановке
79 0 : t->play( std::move(req), this->callback([this, &cond_var, &show_time, tp, &dcount, &messages_count](ball::ptr res)
80 : {
81 0 : if ( this->system_is_stopped() || dcount == 0 )
82 0 : return;
83 :
84 0 : if ( res==nullptr )
85 : {
86 0 : DOMAIN_LOG_FATAL("Bad Gateway");
87 0 : return;
88 : }
89 :
90 0 : --dcount;
91 0 : if ( dcount == 0 )
92 0 : cond_var.notify_one();
93 :
94 0 : auto now = clock_t::now();
95 0 : long ms = std::chrono::duration_cast<std::chrono::microseconds>( now - tp).count();
96 0 : long count = -1;
97 0 : if ( res != nullptr )
98 : {
99 0 : count = static_cast<long>(res->count * 2);
100 0 : messages_count += count;
101 : }
102 :
103 0 : long rate = 0;
104 0 : if ( ms != 0)
105 0 : rate = count * std::chrono::microseconds::period::den/ ms;
106 0 : if ( show_time!=time(nullptr) )
107 : {
108 0 : if ( count != 0 )
109 : {
110 0 : TANK_LOG_MESSAGE("One request. Time " << ms << " microseconds for " << count << " messages. Rate " << rate << " persec")
111 : }
112 : else
113 : {
114 0 : TANK_LOG_MESSAGE("One request. Time " << ms << " microseconds Bad Gateway.")
115 0 : show_time=time(nullptr);
116 : }
117 : }
118 : })
119 0 : );
120 0 : }
121 0 : }
122 :
123 0 : while ( dcount!=0 )
124 : {
125 0 : if ( this->system_is_stopped() )
126 0 : break;
127 0 : std::this_thread::sleep_for( std::chrono::microseconds(1000) );
128 : }
129 :
130 0 : auto finish_discharge = clock_t::now();
131 0 : long discharge_ms = std::chrono::duration_cast<std::chrono::microseconds>( finish_discharge - start_discharge).count();
132 0 : long discharge_rate = 0;
133 0 : if ( discharge_ms != 0)
134 0 : discharge_rate = _discharge * std::chrono::microseconds::period::den/ discharge_ms;
135 0 : tatal_rate += discharge_rate;
136 0 : long middle_rate = tatal_rate / discharge_count;
137 0 : TANK_LOG_MESSAGE("Discharge time " << discharge_ms << " microseconds for " << _discharge
138 : << " messages. Rate " << discharge_rate << " persec ( middle: " << middle_rate << ")" )
139 0 : TANK_LOG_MESSAGE("Messages count " << messages_count << " messages rps: " << discharge_rate*messages_count);
140 0 : if ( discharge_ms < std::chrono::microseconds::period::den )
141 : {
142 0 : std::this_thread::sleep_for( std::chrono::microseconds( std::chrono::microseconds::period::den - discharge_ms ) );
143 : }
144 0 : } //while
145 0 : }
146 :
147 3 : }}
|