Line data Source code
1 : //
2 : // Author: Vladimir Migashko <migashko@gmail.com>, (C) 2013-2015
3 : //
4 : // Copyright: See COPYING file that comes with this distribution
5 : //
6 :
7 : #include "demo_domain.hpp"
8 : #include <wfc/logger.hpp>
9 : #include <wfc/memory.hpp>
10 : #include <wrtstat/wrtstat.hpp>
11 : #include <iostream>
12 : #include <functional>
13 :
14 : namespace demo{
15 :
16 1 : void storage_domain::initialize()
17 : {
18 1 : std::string hash_name = this->options().hash_target;
19 1 : _hash = this->get_target<ihash>(hash_name);
20 1 : }
21 :
22 1 : void storage_domain::set(request::set::ptr req, response::set::handler cb )
23 : {
24 1 : if ( this->bad_request(req, cb) )
25 1 : return;
26 :
27 1 : auto res = this->create_response(cb);
28 :
29 1 : bool status = _storage.set(req->key, req->value);
30 1 : if ( res!=nullptr )
31 1 : res->status = status;
32 :
33 1 : this->send_response( std::move(res), std::move(cb) );
34 : }
35 :
36 1 : void storage_domain::get(request::get::ptr req, response::get::handler cb )
37 : {
38 1 : if ( this->notify_ban(req, cb) )
39 1 : return;
40 :
41 1 : auto res = std::make_unique<response::get>();
42 1 : res->status = _storage.get(req->key, &res->value);
43 1 : cb( std::move(res) );
44 : }
45 :
46 0 : void storage_domain::multiget(request::multiget::ptr req, response::multiget::handler cb )
47 : {
48 0 : if ( this->notify_ban(req, cb) )
49 0 : return;
50 :
51 0 : auto res = std::make_unique<response::multiget>();
52 0 : std::shared_ptr<std::string> pval;
53 0 : for (const std::string& key : req->keys )
54 : {
55 0 : if ( pval == nullptr )
56 0 : pval = std::make_shared<std::string>();
57 0 : if ( _storage.get(key, &*pval) )
58 0 : res->values[key] = std::move(pval);
59 : else
60 0 : res->values[key] = nullptr;
61 : }
62 0 : cb( std::move(res) );
63 : }
64 :
65 2 : void storage_domain::get_hashed( request::get_hashed::ptr req, response::get_hashed::handler cb )
66 : {
67 2 : if ( this->notify_ban(req, cb) )
68 1 : return;
69 :
70 2 : if ( _hash == nullptr )
71 1 : return cb(nullptr);
72 :
73 1 : std::string value;
74 1 : if ( _storage.get( req->key, &value) )
75 : {
76 : typedef ::demo::hash::request::get_hash hash_request;
77 : typedef ::demo::hash::response::get_hash hash_response;
78 1 : auto req_hash = std::make_unique< hash_request >();
79 1 : req_hash->value = value;
80 8 : _hash->get_hash( std::move(req_hash), this->callback([cb]( hash_response::ptr res_hash)
81 : {
82 1 : if ( res_hash == nullptr )
83 : {
84 0 : cb( nullptr );
85 1 : return;
86 : }
87 :
88 1 : auto res = std::make_unique<response::get_hashed>();
89 1 : res->status = true;
90 1 : res->value = res_hash->value;
91 1 : cb( std::move(res) );
92 3 : }));
93 : }
94 : else
95 : {
96 0 : auto res = std::make_unique<response::get_hashed>();
97 0 : res->status = false;
98 0 : cb( std::move(res) );
99 1 : }
100 : }
101 :
102 0 : void storage_domain::multiget_hashed( request::multiget_hashed::ptr req, response::multiget_hashed::handler cb)
103 : {
104 0 : if ( this->notify_ban(req, cb) )
105 0 : return;
106 :
107 0 : if ( _hash==nullptr )
108 0 : return cb(nullptr);
109 :
110 0 : auto presp = std::make_shared<response::multiget_hashed::ptr>();
111 0 : *presp = std::make_unique<response::multiget_hashed>();
112 : typedef ::demo::hash::request::get_hash hash_request;
113 : typedef std::unique_ptr<hash_request> hash_request_ptr;
114 0 : std::map<std::string, hash_request_ptr> hash_request_list;
115 0 : std::string value;
116 0 : for ( auto key : req->keys )
117 : {
118 0 : if ( _storage.get(key, &value) )
119 : {
120 0 : hash_request_list[key] = std::make_unique<hash_request>();
121 0 : hash_request_list[key]->value = value;
122 : }
123 : else
124 0 : (*presp)->values[key] = nullptr;
125 0 : }
126 :
127 0 : if ( hash_request_list.empty() )
128 0 : cb( std::move(*presp) );
129 :
130 0 : auto psize = std::make_shared<size_t>( hash_request_list.size() );
131 0 : auto pmutex = std::make_shared<std::recursive_mutex>();
132 :
133 0 : for (auto& req_hash: hash_request_list)
134 : {
135 0 : std::string key = req_hash.first;
136 : typedef ::demo::hash::response::get_hash hash_response;
137 0 : _hash->get_hash( std::move(req_hash.second), this->callback([key, pmutex, psize, presp, cb](hash_response::ptr res_hash) mutable
138 : {
139 0 : std::unique_lock<std::recursive_mutex> lk(*pmutex);
140 :
141 0 : auto& ref_size = *psize;
142 :
143 0 : if ( ref_size == 0 )
144 0 : return;
145 :
146 0 : --(ref_size);
147 :
148 0 : if ( res_hash!=nullptr )
149 : {
150 0 : (*presp)->values[key] = std::make_shared<size_t>( res_hash->value );
151 0 : if ( ref_size == 0 )
152 : {
153 0 : cb( std::move(*presp) );
154 : }
155 : }
156 : else
157 : {
158 0 : ref_size = 0;
159 0 : cb( nullptr );
160 : }
161 0 : }));
162 0 : }
163 : }
164 :
165 0 : void storage_domain::multiget_hashed2( request::multiget_hashed2::ptr req, response::multiget_hashed2::handler cb)
166 : {
167 0 : if ( this->notify_ban(req, cb) )
168 0 : return;
169 :
170 0 : if ( _hash==nullptr )
171 0 : return cb(nullptr);
172 :
173 0 : auto presp = std::make_shared<response::multiget_hashed2::ptr>();
174 0 : *presp = std::make_unique<response::multiget_hashed2>();
175 : typedef ::demo::hash::request::get_hash hash_request;
176 : typedef std::unique_ptr<hash_request> hash_request_ptr;
177 0 : std::vector< std::pair< std::string, hash_request_ptr > > hash_request_list;
178 0 : hash_request_list.reserve( req->keys.size() );
179 0 : std::string value;
180 0 : for ( auto key : req->keys )
181 : {
182 0 : if ( _storage.get(key, &value) )
183 : {
184 0 : hash_request_list.push_back( std::make_pair(key, std::make_unique<hash_request>() ) );
185 0 : hash_request_list.back().second->value = value;
186 : /*hash_request_list[key] = std::make_unique<hash_request>();
187 : hash_request_list[key]->value = value;*/
188 : }
189 : else
190 0 : (*presp)->values.push_back(std::make_pair(key, nullptr) );
191 0 : }
192 :
193 0 : if ( hash_request_list.empty() )
194 0 : cb( std::move(*presp) );
195 :
196 0 : auto psize = std::make_shared<size_t>( hash_request_list.size() );
197 0 : auto pmutex = std::make_shared<std::recursive_mutex>();
198 :
199 0 : for (auto& req_hash: hash_request_list)
200 : {
201 0 : std::string key = req_hash.first;
202 : typedef ::demo::hash::response::get_hash hash_response;
203 0 : _hash->get_hash( std::move(req_hash.second), this->callback([key, pmutex, psize, presp, cb](hash_response::ptr res_hash) mutable
204 : {
205 0 : std::unique_lock<std::recursive_mutex> lk(*pmutex);
206 0 : auto& ref_size = *psize;
207 :
208 0 : if ( ref_size == 0 )
209 0 : return;
210 :
211 0 : --ref_size;
212 :
213 0 : if ( res_hash!=nullptr )
214 : {
215 0 : (*presp)->values.push_back(std::make_pair(key, std::make_shared<size_t>( res_hash->value ) ) );
216 0 : if ( ref_size == 0 )
217 : {
218 0 : cb( std::move(*presp) );
219 : }
220 : }
221 : else
222 : {
223 0 : ref_size = 0;
224 0 : cb( nullptr );
225 : }
226 0 : }));
227 0 : }
228 : }
229 :
230 3 : }
|