Electroneum
Loading...
Searching...
No Matches
levin_client_async.h
Go to the documentation of this file.
1// Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are met:
6// * Redistributions of source code must retain the above copyright
7// notice, this list of conditions and the following disclaimer.
8// * Redistributions in binary form must reproduce the above copyright
9// notice, this list of conditions and the following disclaimer in the
10// documentation and/or other materials provided with the distribution.
11// * Neither the name of the Andrey N. Sabelnikov nor the
12// names of its contributors may be used to endorse or promote products
13// derived from this software without specific prior written permission.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
16// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER BE LIABLE FOR ANY
19// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25//
26
27
28#pragma once
29
30#include ""
31#include "net_helper.h"
32#include "levin_base.h"
33
34#undef ELECTRONEUM_DEFAULT_LOG_CATEGORY
35#define ELECTRONEUM_DEFAULT_LOG_CATEGORY "net"
36
37
38namespace epee
39{
40namespace levin
41{
42
43 /************************************************************************
44 * levin_client_async - probably it is not really fast implementation,
45 * each handler thread could make up to 30 ms latency.
46 * But, handling events in reader thread will cause dead locks in
47 * case of recursive call (call invoke() to the same connection
48 * on reader thread on remote invoke() handler)
49 ***********************************************************************/
50
51
52 class levin_client_async
53 {
54 levin_commands_handler* m_pcommands_handler;
55 void (*commands_handler_destroy)(levin_commands_handler*);
56 volatile uint32_t m_is_stop;
57 volatile uint32_t m_threads_count;
58 ::critical_section m_send_lock;
59
60 std::string m_local_invoke_buff;
61 ::critical_section m_local_invoke_buff_lock;
62 volatile int m_invoke_res;
63
64 volatile uint32_t m_invoke_data_ready;
65 volatile uint32_t m_invoke_is_active;
66
67 boost::mutex m_invoke_event;
68 boost::condition_variable m_invoke_cond;
69 size_t m_timeout;
70
71 ::critical_section m_recieved_packets_lock;
72 struct packet_entry
73 {
74 bucket_head m_hd;
75 std::string m_body;
76 uint32_t m_connection_index;
77 };
78 std::list<packet_entry> m_recieved_packets;
79 /*
80 m_current_connection_index needed when some connection was broken and reconnected - in this
81 case we could have some received packets in que, which shoud not be handled
82 */
83 volatile uint32_t m_current_connection_index;
84 ::critical_section m_invoke_lock;
85 ::critical_section m_reciev_packet_lock;
86 ::critical_section m_connection_lock;
87 net_utils::blocked_mode_client m_transport;
88 public:
89 levin_client_async():m_pcommands_handler(NULL), commands_handler_destroy(NULL), m_is_stop(0), m_threads_count(0), m_invoke_data_ready(0), m_invoke_is_active(0)
90 {}
91 levin_client_async(const levin_client_async& /*v*/):m_pcommands_handler(NULL), commands_handler_destroy(NULL), m_is_stop(0), m_threads_count(0), m_invoke_data_ready(0), m_invoke_is_active(0)
92 {}
93 ~levin_client_async()
94 {
95 boost::interprocess::ipcdetail::atomic_write32(&m_is_stop, 1);
96 disconnect();
97
98
99 while(boost::interprocess::ipcdetail::atomic_read32(&m_threads_count))
100 ::Sleep(100);
101
102 set_handler(NULL);
103 }
104
105 void set_handler(levin_commands_handler* phandler, void (*destroy)(levin_commands_handler*) = NULL)
106 {
107 if (commands_handler_destroy && m_pcommands_handler)
108 (*commands_handler_destroy)(m_pcommands_handler);
109 m_pcommands_handler = phandler;
110 m_pcommands_handler_destroy = destroy;
111 }
112
113 bool connect(uint32_t ip, uint32_t port, uint32_t timeout)
114 {
115 loop_call_guard();
116 critical_region cr(m_connection_lock);
117
118 m_timeout = timeout;
119 bool res = false;
120 CRITICAL_REGION_BEGIN(m_reciev_packet_lock);
121 CRITICAL_REGION_BEGIN(m_send_lock);
122 res = levin_client_impl::connect(ip, port, timeout);
123 boost::interprocess::ipcdetail::atomic_inc32(&m_current_connection_index);
124 CRITICAL_REGION_END();
125 CRITICAL_REGION_END();
126 if(res && !boost::interprocess::ipcdetail::atomic_read32(&m_threads_count) )
127 {
128 //boost::interprocess::ipcdetail::atomic_write32(&m_is_stop, 0);//m_is_stop = false;
129 boost::thread( boost::bind(&levin_duplex_client::reciever_thread, this) );
130 boost::thread( boost::bind(&levin_duplex_client::handler_thread, this) );
131 boost::thread( boost::bind(&levin_duplex_client::handler_thread, this) );
132 }
133
134 return res;
135 }
136 bool is_connected()
137 {
138 loop_call_guard();
139 critical_region cr(m_cs);
140 return levin_client_impl::is_connected();
141 }
142
143 inline
144 bool check_connection()
145 {
146 loop_call_guard();
147 critical_region cr(m_cs);
148
149 if(!is_connected())
150 {
151 if( !reconnect() )
152 {
153 LOG_ERROR("Reconnect Failed. Failed to invoke() because not connected!");
154 return false;
155 }
156 }
157 return true;
158 }
159
160 //------------------------------------------------------------------------------
161 inline
162 bool recv_n(SOCKET s, char* pbuff, size_t cb)
163 {
164 while(cb)
165 {
166 int res = ::recv(m_socket, pbuff, (int)cb, 0);
167
168 if(SOCKET_ERROR == res)
169 {
170 if(!m_connected)
171 return false;
172
173 int err = ::WSAGetLastError();
174 LOG_ERROR("Failed to recv(), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
175 disconnect();
176 //reconnect();
177 return false;
178 }else if(res == 0)
179 {
180 disconnect();
181 //reconnect();
182 return false;
183 }
184 LOG_PRINT_L4("[" << m_socket <<"] RECV " << res);
185 cb -= res;
186 pbuff += res;
187 }
188
189 return true;
190 }
191
192 //------------------------------------------------------------------------------
193 inline
194 bool recv_n(SOCKET s, std::string& buff)
195 {
196 size_t cb_remain = buff.size();
197 char* m_current_ptr = (char*)buff.data();
198 return recv_n(s, m_current_ptr, cb_remain);
199 }
200
202 {
203 //boost::interprocess::ipcdetail::atomic_write32(&m_is_stop, 1);//m_is_stop = true;
205 critical_region cr(m_cs);
207
208 CRITICAL_REGION_BEGIN(m_local_invoke_buff_lock);
209 m_local_invoke_buff.clear();
212 boost::interprocess::ipcdetail::atomic_write32(&m_invoke_data_ready, 1); //m_invoke_data_ready = true;
213 m_invoke_cond.notify_all();
214 return true;
215 }
216
218 {
219
220 }
221
223 {
224 boost::interprocess::ipcdetail::atomic_write32(&m_invoke_is_active, 0);
225 }
226
227 int invoke(const GUID& target, int command, const std::string& in_buff, std::string& buff_out)
228 {
229
230 critical_region cr_invoke(m_invoke_lock);
231
232 boost::interprocess::ipcdetail::atomic_write32(&m_invoke_is_active, 1);
233 boost::interprocess::ipcdetail::atomic_write32(&m_invoke_data_ready, 0);
234 misc_utils::destr_ptr hdlr = misc_utils::add_exit_scope_handler(boost::bind(&levin_duplex_client::on_leave_invoke, this));
235
237
238 if(!check_connection())
240
241
242 bucket_head head = {0};
243 head.m_signature = LEVIN_SIGNATURE;
244 head.m_cb = in_buff.size();
245 head.m_have_to_return_data = true;
246 head.m_id = target;
247#ifdef TRACE_LEVIN_PACKETS_BY_GUIDS
248 ::UuidCreate(&head.m_id);
249#endif
250 head.m_command = command;
251 head.m_protocol_version = LEVIN_PROTOCOL_VER_1;
252 head.m_flags = LEVIN_PACKET_REQUEST;
253 LOG_PRINT("[" << m_socket <<"] Sending invoke data", LOG_LEVEL_4);
254
255 CRITICAL_REGION_BEGIN(m_send_lock);
256 LOG_PRINT_L4("[" << m_socket <<"] SEND " << sizeof(head));
257 int res = ::send(m_socket, (const char*)&head, sizeof(head), 0);
258 if(SOCKET_ERROR == res)
259 {
260 int err = ::WSAGetLastError();
261 LOG_ERROR("Failed to send(), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
262 disconnect();
264 }
265 LOG_PRINT_L4("[" << m_socket <<"] SEND " << (int)in_buff.size());
266 res = ::send(m_socket, in_buff.data(), (int)in_buff.size(), 0);
267 if(SOCKET_ERROR == res)
268 {
269 int err = ::WSAGetLastError();
270 LOG_ERROR("Failed to send(), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
271 disconnect();
273 }
275 LOG_PRINT_L4("LEVIN_PACKET_SENT. [len=" << head.m_cb << ", flags=" << head.m_flags << ", is_cmd=" << head.m_have_to_return_data <<", cmd_id = " << head.m_command << ", pr_v=" << head.m_protocol_version << ", uid=" << string_tools::get_str_from_guid_a(head.m_id) << "]");
276
277 //hard coded timeout in 10 minutes for maximum invoke period. if it happens, it could mean only some real troubles.
278 boost::system_time timeout = boost::get_system_time()+ boost::posix_time::milliseconds(100);
279 size_t timeout_count = 0;
280 boost::unique_lock<boost::mutex> lock(m_invoke_event);
281
282 while(!boost::interprocess::ipcdetail::atomic_read32(&m_invoke_data_ready))
283 {
284 if(!m_invoke_cond.timed_wait(lock, timeout))
285 {
286 if(timeout_count < 10)
287 {
288 //workaround to avoid freezing at timed_wait called after notify_all.
289 timeout = boost::get_system_time()+ boost::posix_time::milliseconds(100);
290 ++timeout_count;
291 continue;
292 }else if(timeout_count == 10)
293 {
294 //workaround to avoid freezing at timed_wait called after notify_all.
295 timeout = boost::get_system_time()+ boost::posix_time::minutes(10);
296 ++timeout_count;
297 continue;
298 }else
299 {
300 LOG_PRINT("[" << m_socket <<"] Timeout on waiting invoke result. ", LOG_LEVEL_0);
301 //disconnect();
303 }
304 }
305 }
306
307
308 CRITICAL_REGION_BEGIN(m_local_invoke_buff_lock);
309 buff_out.swap(m_local_invoke_buff);
310 m_local_invoke_buff.clear();
312 return m_invoke_res;
313 }
314
315 int notify(const GUID& target, int command, const std::string& in_buff)
316 {
317 if(!check_connection())
319
320 bucket_head head = {0};
321 head.m_signature = LEVIN_SIGNATURE;
322 head.m_cb = in_buff.size();
323 head.m_have_to_return_data = false;
324 head.m_id = target;
325#ifdef TRACE_LEVIN_PACKETS_BY_GUIDS
326 ::UuidCreate(&head.m_id);
327#endif
328 head.m_command = command;
329 head.m_protocol_version = LEVIN_PROTOCOL_VER_1;
330 head.m_flags = LEVIN_PACKET_REQUEST;
331 CRITICAL_REGION_BEGIN(m_send_lock);
332 LOG_PRINT_L4("[" << m_socket <<"] SEND " << sizeof(head));
333 int res = ::send(m_socket, (const char*)&head, sizeof(head), 0);
334 if(SOCKET_ERROR == res)
335 {
336 int err = ::WSAGetLastError();
337 LOG_ERROR("Failed to send(), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
338 disconnect();
340 }
341 LOG_PRINT_L4("[" << m_socket <<"] SEND " << (int)in_buff.size());
342 res = ::send(m_socket, in_buff.data(), (int)in_buff.size(), 0);
343 if(SOCKET_ERROR == res)
344 {
345 int err = ::WSAGetLastError();
346 LOG_ERROR("Failed to send(), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
347 disconnect();
349 }
351 LOG_PRINT_L4("LEVIN_PACKET_SENT. [len=" << head.m_cb << ", flags=" << head.m_flags << ", is_cmd=" << head.m_have_to_return_data <<", cmd_id = " << head.m_command << ", pr_v=" << head.m_protocol_version << ", uid=" << string_tools::get_str_from_guid_a(head.m_id) << "]");
352
353 return 1;
354 }
355
356
357 private:
358 bool have_some_data(SOCKET sock, int interval = 1)
359 {
360 fd_set fds;
361 FD_ZERO(&fds);
362 FD_SET(sock, &fds);
363
364 fd_set fdse;
365 FD_ZERO(&fdse);
366 FD_SET(sock, &fdse);
367
368
369 timeval tv;
370 tv.tv_sec = interval;
371 tv.tv_usec = 0;
372
373 int sel_res = select(0, &fds, 0, &fdse, &tv);
374 if(0 == sel_res)
375 return false;
376 else if(sel_res == SOCKET_ERROR)
377 {
378 if(m_is_stop)
379 return false;
380 int err_code = ::WSAGetLastError();
381 LOG_ERROR("Filed to call select, err code = " << err_code);
382 disconnect();
383 }else
384 {
385 if(fds.fd_array[0])
386 {//some read operations was performed
387 return true;
388 }else if(fdse.fd_array[0])
389 {//some error was at the socket
390 return true;
391 }
392 }
393 return false;
394 }
395
396
397 bool reciev_and_process_incoming_data()
398 {
399 bucket_head head = {0};
400 uint32_t conn_index = 0;
401 bool is_request = false;
402 std::string local_buff;
403 CRITICAL_REGION_BEGIN(m_reciev_packet_lock);//to protect from socket reconnect between head and body
404
405 if(!recv_n(m_socket, (char*)&head, sizeof(head)))
406 {
407 if(m_is_stop)
408 return false;
409 LOG_ERROR("Failed to recv_n");
410 return false;
411 }
412
413 conn_index = boost::interprocess::ipcdetail::atomic_read32(&m_current_connection_index);
414
415 if(head.m_signature!=LEVIN_SIGNATURE)
416 {
417 LOG_ERROR("Signature mismatch in response");
418 return false;
419 }
420
421 is_request = (head.m_protocol_version == LEVIN_PROTOCOL_VER_1 && head.m_flags&LEVIN_PACKET_REQUEST);
422
423
424 local_buff.resize((size_t)head.m_cb);
425 if(!recv_n(m_socket, local_buff))
426 {
427 if(m_is_stop)
428 return false;
429 LOG_ERROR("Filed to reciev");
430 return false;
431 }
433
434 LOG_PRINT_L4("LEVIN_PACKET_RECEIVED. [len=" << head.m_cb << ", flags=" << head.m_flags << ", is_cmd=" << head.m_have_to_return_data <<", cmd_id = " << head.m_command << ", pr_v=" << head.m_protocol_version << ", uid=" << string_tools::get_str_from_guid_a(head.m_id) << "]");
435
436 if(is_request)
437 {
438 CRITICAL_REGION_BEGIN(m_recieved_packets_lock);
439 m_recieved_packets.resize(m_recieved_packets.size() + 1);
440 m_recieved_packets.back().m_hd = head;
441 m_recieved_packets.back().m_body.swap(local_buff);
442 m_recieved_packets.back().m_connection_index = conn_index;
444 /*
445
446 */
447 }else
448 {//this is some response
449
450 CRITICAL_REGION_BEGIN(m_local_invoke_buff_lock);
451 m_local_invoke_buff.swap(local_buff);
452 m_invoke_res = head.m_return_code;
454 boost::interprocess::ipcdetail::atomic_write32(&m_invoke_data_ready, 1); //m_invoke_data_ready = true;
455 m_invoke_cond.notify_all();
456
457 }
458 return true;
459 }
460
461 bool reciever_thread()
462 {
463 LOG_PRINT_L3("[" << m_socket <<"] Socket reciever thread started.[m_threads_count=" << m_threads_count << "]");
464 log_space::log_singletone::set_thread_log_prefix("RECIEVER_WORKER");
465 boost::interprocess::ipcdetail::atomic_inc32(&m_threads_count);
466
467 while(!m_is_stop)
468 {
469 if(!m_connected)
470 {
471 Sleep(100);
472 continue;
473 }
474
475 if(have_some_data(m_socket, 1))
476 {
477 if(!reciev_and_process_incoming_data())
478 {
479 if(m_is_stop)
480 {
481 break;//boost::interprocess::ipcdetail::atomic_dec32(&m_threads_count);
482 //return true;
483 }
484 LOG_ERROR("Failed to reciev_and_process_incoming_data. shutting down");
485 //boost::interprocess::ipcdetail::atomic_dec32(&m_threads_count);
486 //disconnect_no_wait();
487 //break;
488 }
489 }
490 }
491
492 boost::interprocess::ipcdetail::atomic_dec32(&m_threads_count);
493 LOG_PRINT_L3("[" << m_socket <<"] Socket reciever thread stopped.[m_threads_count=" << m_threads_count << "]");
494 return true;
495 }
496
497 bool process_recieved_packet(bucket_head& head, const std::string& local_buff, uint32_t conn_index)
498 {
499
500 net_utils::connection_context_base conn_context;
501 conn_context.m_remote_address = m_address;
502 if(head.m_have_to_return_data)
503 {
504 std::string return_buff;
505 if(m_pcommands_handler)
506 head.m_return_code = m_pcommands_handler->invoke(head.m_id, head.m_command, local_buff, return_buff, conn_context);
507 else
509
510
511
512 head.m_cb = return_buff.size();
513 head.m_have_to_return_data = false;
514 head.m_protocol_version = LEVIN_PROTOCOL_VER_1;
515 head.m_flags = LEVIN_PACKET_RESPONSE;
516
517 std::string send_buff((const char*)&head, sizeof(head));
518 send_buff += return_buff;
519 CRITICAL_REGION_BEGIN(m_send_lock);
520 if(conn_index != boost::interprocess::ipcdetail::atomic_read32(&m_current_connection_index))
521 {//there was reconnect, send response back is not allowed
522 return true;
523 }
524 int res = ::send(m_socket, (const char*)send_buff.data(), send_buff.size(), 0);
525 if(res == SOCKET_ERROR)
526 {
527 int err_code = ::WSAGetLastError();
528 LOG_ERROR("Failed to send, err = " << err_code);
529 return false;
530 }
532 LOG_PRINT_L4("LEVIN_PACKET_SENT. [len=" << head.m_cb << ", flags=" << head.m_flags << ", is_cmd=" << head.m_have_to_return_data <<", cmd_id = " << head.m_command << ", pr_v=" << head.m_protocol_version << ", uid=" << string_tools::get_str_from_guid_a(head.m_id) << "]");
533
534 }
535 else
536 {
537 if(m_pcommands_handler)
538 m_pcommands_handler->notify(head.m_id, head.m_command, local_buff, conn_context);
539 }
540
541 return true;
542 }
543
544 bool handler_thread()
545 {
546 LOG_PRINT_L3("[" << m_socket <<"] Socket handler thread started.[m_threads_count=" << m_threads_count << "]");
547 log_space::log_singletone::set_thread_log_prefix("HANDLER_WORKER");
548 boost::interprocess::ipcdetail::atomic_inc32(&m_threads_count);
549
550 while(!m_is_stop)
551 {
552 bool have_some_work = false;
553 std::string local_buff;
554 bucket_head bh = {0};
555 uint32_t conn_index = 0;
556
557 CRITICAL_REGION_BEGIN(m_recieved_packets_lock);
558 if(m_recieved_packets.size())
559 {
560 bh = m_recieved_packets.begin()->m_hd;
561 conn_index = m_recieved_packets.begin()->m_connection_index;
562 local_buff.swap(m_recieved_packets.begin()->m_body);
563 have_some_work = true;
564 m_recieved_packets.pop_front();
565 }
567
568 if(have_some_work)
569 {
570 process_recieved_packet(bh, local_buff, conn_index);
571 }else
572 {
573 //Idle when no work
574 Sleep(30);
575 }
576 }
577
578 boost::interprocess::ipcdetail::atomic_dec32(&m_threads_count);
579 LOG_PRINT_L3("[" << m_socket <<"] Socket handler thread stopped.[m_threads_count=" << m_threads_count << "]");
580 return true;
581 }
582 };
583
584}
585}
else if(0==res)
bool recv_n(SOCKET s, char *pbuff, size_t cb)
bool recv_n(SOCKET s, std::string &buff)
int invoke(const GUID &target, int command, const std::string &in_buff, std::string &buff_out)
int notify(const GUID &target, int command, const std::string &in_buff)
const char * res
#define LEVIN_PACKET_RESPONSE
Definition levin_base.h:74
#define LEVIN_PROTOCOL_VER_1
Definition levin_base.h:78
#define LEVIN_PACKET_REQUEST
Definition levin_base.h:73
#define LEVIN_ERROR_CONNECTION_DESTROYED
Definition levin_base.h:96
#define LEVIN_ERROR_CONNECTION_HANDLER_NOT_DEFINED
Definition levin_base.h:99
#define LEVIN_SIGNATURE
Definition levin_base.h:34
#define LEVIN_ERROR_CONNECTION_TIMEDOUT
Definition levin_base.h:97
#define SOCKET
#define LOG_PRINT_L4(x)
#define LOG_PRINT_L3(x)
#define LOG_ERROR(x)
Definition misc_log_ex.h:98
struct rule_list head
#define false
unsigned int uint32_t
Definition stdint.h:126
#define CRITICAL_REGION_END()
Definition syncobj.h:233
#define CRITICAL_REGION_BEGIN(x)
Definition syncobj.h:229