Electroneum
Loading...
Searching...
No Matches
levin_protocol_handler_async.h
Go to the documentation of this file.
1// Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are met:
6// * Redistributions of source code must retain the above copyright
7// notice, this list of conditions and the following disclaimer.
8// * Redistributions in binary form must reproduce the above copyright
9// notice, this list of conditions and the following disclaimer in the
10// documentation and/or other materials provided with the distribution.
11// * Neither the name of the Andrey N. Sabelnikov nor the
12// names of its contributors may be used to endorse or promote products
13// derived from this software without specific prior written permission.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
16// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER BE LIABLE FOR ANY
19// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25//
26
27#pragma once
28#include <boost/asio/deadline_timer.hpp>
29#include <boost/uuid/uuid_generators.hpp>
30#include <boost/unordered_map.hpp>
31#include <boost/interprocess/detail/atomic.hpp>
32#include <boost/smart_ptr/make_shared.hpp>
33
34#include <atomic>
35
36#include "levin_base.h"
37#include "buffer.h"
38#include "misc_language.h"
39#include "syncobj.h"
40#include "misc_os_dependent.h"
41#include "int-util.h"
42
43#include <random>
44#include <chrono>
45
46#undef ELECTRONEUM_DEFAULT_LOG_CATEGORY
47#define ELECTRONEUM_DEFAULT_LOG_CATEGORY "net"
48
49#ifndef MIN_BYTES_WANTED
50#define MIN_BYTES_WANTED 512
51#endif
52
53namespace epee
54{
55namespace levin
56{
57
58/************************************************************************/
59/* */
60/************************************************************************/
61template<class t_connection_context>
62class async_protocol_handler;
63
64template<class t_connection_context>
66{
67 typedef boost::unordered_map<boost::uuids::uuid, async_protocol_handler<t_connection_context>* > connections_map;
68 critical_section m_connects_lock;
69 connections_map m_connects;
70
71 void add_connection(async_protocol_handler<t_connection_context>* pc);
72 void del_connection(async_protocol_handler<t_connection_context>* pc);
73
74 async_protocol_handler<t_connection_context>* find_connection(boost::uuids::uuid connection_id) const;
75 int find_and_lock_connection(boost::uuids::uuid connection_id, async_protocol_handler<t_connection_context>*& aph);
76
77 friend class async_protocol_handler<t_connection_context>;
78
80 void (*m_pcommands_handler_destroy)(levin_commands_handler<t_connection_context>*);
81
82 void delete_connections (size_t count, bool incoming);
83
84public:
85 typedef t_connection_context connection_context;
88
89 int invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out, boost::uuids::uuid connection_id);
90 template<class callback_t>
91 int invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED);
92
93 int notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id);
94 bool close(boost::uuids::uuid connection_id);
95 bool update_connection_context(const t_connection_context& contxt);
96 bool request_callback(boost::uuids::uuid connection_id);
97 template<class callback_t>
98 bool foreach_connection(const callback_t &cb);
99 template<class callback_t>
100 bool for_connection(const boost::uuids::uuid &connection_id, const callback_t &cb);
103
107 void del_out_connections(size_t count);
108 void del_in_connections(size_t count);
109};
110
111
112/************************************************************************/
113/* */
114/************************************************************************/
115template<class t_connection_context = net_utils::connection_context_base>
117{
118public:
119 typedef t_connection_context connection_context;
121
127
128 std::atomic<bool> m_deletion_initiated;
129 std::atomic<bool> m_protocol_released;
131
133
135 std::string m_local_inv_buff;
136
139
145 t_connection_context& m_connection_context;
146
149
152
154 {
155 virtual bool handle(int res, const epee::span<const uint8_t> buff, connection_context& context)=0;
156 virtual bool is_timer_started() const=0;
157 virtual void cancel()=0;
158 virtual bool cancel_timer()=0;
159 virtual void reset_timer()=0;
160 };
161 template <class callback_t>
163 {
164 anvoke_handler(const callback_t& cb, uint64_t timeout, async_protocol_handler& con, int command)
165 :m_cb(cb), m_timeout(timeout), m_con(con), m_timer(con.m_pservice_endpoint->get_io_service()), m_timer_started(false),
167 {
168 if(m_con.start_outer_call())
169 {
170 MDEBUG(con.get_context_ref() << "anvoke_handler, timeout: " << timeout);
171 m_timer.expires_from_now(boost::posix_time::milliseconds(timeout));
172 m_timer.async_wait([&con, command, cb, timeout](const boost::system::error_code& ec)
173 {
174 if(ec == boost::asio::error::operation_aborted)
175 return;
176 MINFO(con.get_context_ref() << "Timeout on invoke operation happened, command: " << command << " timeout: " << timeout);
177 epee::span<const uint8_t> fake;
178 cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, con.get_context_ref());
179 con.close();
180 con.finish_outer_call();
181 });
182 m_timer_started = true;
183 }
184 }
186 {}
187 callback_t m_cb;
189 boost::asio::deadline_timer m_timer;
196 {
197 if(!cancel_timer())
198 return false;
199 m_cb(res, buff, context);
200 m_con.finish_outer_call();
201 return true;
202 }
203 virtual bool is_timer_started() const
204 {
205 return m_timer_started;
206 }
207 virtual void cancel()
208 {
209 if(cancel_timer())
210 {
212 m_cb(LEVIN_ERROR_CONNECTION_DESTROYED, fake, m_con.get_context_ref());
213 m_con.finish_outer_call();
214 }
215 }
216 virtual bool cancel_timer()
217 {
219 {
221 boost::system::error_code ignored_ec;
222 m_timer_cancelled = 1 == m_timer.cancel(ignored_ec);
223 }
224 return m_timer_cancelled;
225 }
226 virtual void reset_timer()
227 {
228 boost::system::error_code ignored_ec;
229 if (!m_cancel_timer_called && m_timer.cancel(ignored_ec) > 0)
230 {
231 callback_t& cb = m_cb;
232 uint64_t timeout = m_timeout;
234 int command = m_command;
235 m_timer.expires_from_now(boost::posix_time::milliseconds(m_timeout));
236 m_timer.async_wait([&con, cb, command, timeout](const boost::system::error_code& ec)
237 {
238 if(ec == boost::asio::error::operation_aborted)
239 return;
240 MINFO(con.get_context_ref() << "Timeout on invoke operation happened, command: " << command << " timeout: " << timeout);
243 con.close();
244 con.finish_outer_call();
245 });
246 }
247 }
248 };
250 std::list<boost::shared_ptr<invoke_response_handler_base> > m_invoke_response_handlers;
251
252 template<class callback_t>
253 bool add_invoke_response_handler(const callback_t &cb, uint64_t timeout, async_protocol_handler& con, int command)
254 {
256 boost::shared_ptr<invoke_response_handler_base> handler(boost::make_shared<anvoke_handler<callback_t>>(cb, timeout, con, command));
257 m_invoke_response_handlers.push_back(handler);
258 return handler->is_timer_started();
259 }
260 template<class callback_t> friend struct anvoke_handler;
261public:
264 t_connection_context& conn_context):
266 m_pservice_endpoint(psnd_hndlr),
268 m_connection_context(conn_context),
269 m_cache_in_buffer(4 * 1024),
271 {
272 m_close_called = 0;
273 m_deletion_initiated = false;
274 m_protocol_released = false;
275 m_wait_count = 0;
280 }
282 {
283 try
284 {
285
288 {
289 m_config.del_connection(this);
290 }
291
292 for (size_t i = 0; i < 60 * 1000 / 100 && 0 != boost::interprocess::ipcdetail::atomic_read32(&m_wait_count); ++i)
293 {
295 }
296 CHECK_AND_ASSERT_MES_NO_RET(0 == boost::interprocess::ipcdetail::atomic_read32(&m_wait_count), "Failed to wait for operation completion. m_wait_count = " << m_wait_count);
297
298 MTRACE(m_connection_context << "~async_protocol_handler()");
299
300 }
301 catch (...) { /* ignore */ }
302 }
303
305 {
306 MTRACE(m_connection_context << "[levin_protocol] -->> start_outer_call");
307 if(!m_pservice_endpoint->add_ref())
308 {
309 MERROR(m_connection_context << "[levin_protocol] -->> start_outer_call failed");
310 return false;
311 }
312 boost::interprocess::ipcdetail::atomic_inc32(&m_wait_count);
313 return true;
314 }
316 {
317 MTRACE(m_connection_context << "[levin_protocol] <<-- finish_outer_call");
318 boost::interprocess::ipcdetail::atomic_dec32(&m_wait_count);
319 m_pservice_endpoint->release();
320 return true;
321 }
322
324 {
325 decltype(m_invoke_response_handlers) local_invoke_response_handlers;
327 local_invoke_response_handlers.swap(m_invoke_response_handlers);
328 m_protocol_released = true;
330
331 // Never call callback inside critical section, that can cause deadlock. Callback can be called when
332 // invoke_response_handler_base is cancelled
333 std::for_each(local_invoke_response_handlers.begin(), local_invoke_response_handlers.end(), [](const boost::shared_ptr<invoke_response_handler_base>& pinv_resp_hndlr) {
334 pinv_resp_hndlr->cancel();
335 });
336
337 return true;
338 }
339
340 bool close()
341 {
342 boost::interprocess::ipcdetail::atomic_inc32(&m_close_called);
343
344 m_pservice_endpoint->close();
345 return true;
346 }
347
349 {
350 m_connection_context = contxt;
351 }
352
354 {
357
358 m_pservice_endpoint->request_callback();
359 }
360
362 {
363 m_config.m_pcommands_handler->callback(m_connection_context);
364 }
365
366 virtual bool handle_recv(const void* ptr, size_t cb)
367 {
368 if(boost::interprocess::ipcdetail::atomic_read32(&m_close_called))
369 return false; //closing connections
370
371 if(!m_config.m_pcommands_handler)
372 {
373 MERROR(m_connection_context << "Commands handler not set!");
374 return false;
375 }
376
377 if(m_cache_in_buffer.size() + cb > m_config.m_max_packet_size)
378 {
379 MWARNING(m_connection_context << "Maximum packet size exceed!, m_max_packet_size = " << m_config.m_max_packet_size
380 << ", packet received " << m_cache_in_buffer.size() + cb
381 << ", connection will be closed.");
382 return false;
383 }
384
385 m_cache_in_buffer.append((const char*)ptr, cb);
386
387 bool is_continue = true;
388 while(is_continue)
389 {
390 switch(m_state)
391 {
393 if(m_cache_in_buffer.size() < m_current_head.m_cb)
394 {
395 is_continue = false;
396 if(cb >= MIN_BYTES_WANTED)
397 {
399 if (!m_invoke_response_handlers.empty())
400 {
401 //async call scenario
402 boost::shared_ptr<invoke_response_handler_base> response_handler = m_invoke_response_handlers.front();
403 response_handler->reset_timer();
404 MDEBUG(m_connection_context << "LEVIN_PACKET partial msg received. len=" << cb);
405 }
406 }
407 break;
408 }
409 {
410 epee::span<const uint8_t> buff_to_invoke = m_cache_in_buffer.carve((std::string::size_type)m_current_head.m_cb);
411
413
414 MDEBUG(m_connection_context << "LEVIN_PACKET_RECEIVED. [len=" << m_current_head.m_cb
415 << ", flags" << m_current_head.m_flags
416 << ", r?=" << m_current_head.m_have_to_return_data
417 <<", cmd = " << m_current_head.m_command
418 << ", v=" << m_current_head.m_protocol_version);
419
420 if(is_response)
421 {//response to some invoke
422
424 if(!m_invoke_response_handlers.empty())
425 {//async call scenario
426 boost::shared_ptr<invoke_response_handler_base> response_handler = m_invoke_response_handlers.front();
427 bool timer_cancelled = response_handler->cancel_timer();
428 // Don't pop handler, to avoid destroying it
429 if(timer_cancelled)
430 m_invoke_response_handlers.pop_front();
431 invoke_response_handlers_guard.unlock();
432
433 if(timer_cancelled)
434 response_handler->handle(m_current_head.m_return_code, buff_to_invoke, m_connection_context);
435 }
436 else
437 {
438 invoke_response_handlers_guard.unlock();
439 //use sync call scenario
440 if(!boost::interprocess::ipcdetail::atomic_read32(&m_wait_count) && !boost::interprocess::ipcdetail::atomic_read32(&m_close_called))
441 {
442 MERROR(m_connection_context << "no active invoke when response came, wtf?");
443 return false;
444 }else
445 {
447 m_local_inv_buff = std::string((const char*)buff_to_invoke.data(), buff_to_invoke.size());
448 buff_to_invoke = epee::span<const uint8_t>((const uint8_t*)NULL, 0);
449 m_invoke_result_code = m_current_head.m_return_code;
451 boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 1);
452 }
453 }
454 }else
455 {
456 if(m_current_head.m_have_to_return_data)
457 {
458 std::string return_buff;
459 m_current_head.m_return_code = m_config.m_pcommands_handler->invoke(
460 m_current_head.m_command,
461 buff_to_invoke,
462 return_buff,
464 m_current_head.m_cb = return_buff.size();
465 m_current_head.m_have_to_return_data = false;
466 m_current_head.m_protocol_version = LEVIN_PROTOCOL_VER_1;
468#if BYTE_ORDER == LITTLE_ENDIAN
469 std::string send_buff((const char*)&m_current_head, sizeof(m_current_head));
470#else
472 head.m_signature = SWAP64LE(head.m_signature);
473 head.m_cb = SWAP64LE(head.m_cb);
474 head.m_command = SWAP32LE(head.m_command);
475 head.m_return_code = SWAP32LE(head.m_return_code);
476 head.m_flags = SWAP32LE(head.m_flags);
477 head.m_protocol_version = SWAP32LE(head.m_protocol_version);
478 std::string send_buff((const char*)&head, sizeof(head));
479#endif
480 send_buff += return_buff;
482 if(!m_pservice_endpoint->do_send(send_buff.data(), send_buff.size()))
483 return false;
485 MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << m_current_head.m_cb
486 << ", flags" << m_current_head.m_flags
487 << ", r?=" << m_current_head.m_have_to_return_data
488 <<", cmd = " << m_current_head.m_command
489 << ", ver=" << m_current_head.m_protocol_version);
490 }
491 else
492 m_config.m_pcommands_handler->notify(m_current_head.m_command, buff_to_invoke, m_connection_context);
493 }
494 }
496 break;
498 {
499 if(m_cache_in_buffer.size() < sizeof(bucket_head2))
500 {
501 if(m_cache_in_buffer.size() >= sizeof(uint64_t) && *((uint64_t*)m_cache_in_buffer.span(8).data()) != SWAP64LE(LEVIN_SIGNATURE))
502 {
503 MWARNING(m_connection_context << "Signature mismatch, connection will be closed");
504 return false;
505 }
506 is_continue = false;
507 break;
508 }
509
510#if BYTE_ORDER == LITTLE_ENDIAN
511 bucket_head2& phead = *(bucket_head2*)m_cache_in_buffer.span(sizeof(bucket_head2)).data();
512#else
513 bucket_head2 phead = *(bucket_head2*)m_cache_in_buffer.span(sizeof(bucket_head2)).data();
514 phead.m_signature = SWAP64LE(phead.m_signature);
515 phead.m_cb = SWAP64LE(phead.m_cb);
516 phead.m_command = SWAP32LE(phead.m_command);
517 phead.m_return_code = SWAP32LE(phead.m_return_code);
518 phead.m_flags = SWAP32LE(phead.m_flags);
520#endif
521 if(LEVIN_SIGNATURE != phead.m_signature)
522 {
523 LOG_ERROR_CC(m_connection_context, "Signature mismatch, connection will be closed");
524 return false;
525 }
526 m_current_head = phead;
527
528 m_cache_in_buffer.erase(sizeof(bucket_head2));
530 m_oponent_protocol_ver = m_current_head.m_protocol_version;
531 if(m_current_head.m_cb > m_config.m_max_packet_size)
532 {
533 LOG_ERROR_CC(m_connection_context, "Maximum packet size exceed!, m_max_packet_size = " << m_config.m_max_packet_size
534 << ", packet header received " << m_current_head.m_cb
535 << ", connection will be closed.");
536 return false;
537 }
538 }
539 break;
540 default:
541 LOG_ERROR_CC(m_connection_context, "Undefined state in levin_server_impl::connection_handler, m_state=" << m_state);
542 return false;
543 }
544 }
545
546 return true;
547 }
548
550 {
552 {
554 m_config.add_connection(this);
555 }
556 return true;
557 }
558
559 template<class callback_t>
560 bool async_invoke(int command, const epee::span<const uint8_t> in_buff, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
561 {
564
566 timeout = m_config.m_invoke_timeout;
567
568 int err_code = LEVIN_OK;
569 do
570 {
572 {
574 break;
575 }
576
578
580 {
582 break;
583 }
584
585 bucket_head2 head = {0};
586 head.m_signature = SWAP64LE(LEVIN_SIGNATURE);
587 head.m_cb = SWAP64LE(in_buff.size());
588 head.m_have_to_return_data = true;
589
591 head.m_command = SWAP32LE(command);
592 head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1);
593
594 boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 0);
597 if(!m_pservice_endpoint->do_send(&head, sizeof(head)))
598 {
599 LOG_ERROR_CC(m_connection_context, "Failed to do_send");
600 err_code = LEVIN_ERROR_CONNECTION;
601 break;
602 }
603
604 if(!m_pservice_endpoint->do_send(in_buff.data(), in_buff.size()))
605 {
606 LOG_ERROR_CC(m_connection_context, "Failed to do_send");
607 err_code = LEVIN_ERROR_CONNECTION;
608 break;
609 }
610
611 if(!add_invoke_response_handler(cb, timeout, *this, command))
612 {
614 break;
615 }
617 } while (false);
618
619 if (LEVIN_OK != err_code)
620 {
621 epee::span<const uint8_t> stub_buff{(const uint8_t*)"", 0};
622 // Never call callback inside critical section, that can cause deadlock
623 cb(err_code, stub_buff, m_connection_context);
624 return false;
625 }
626
627 return true;
628 }
629
630 int invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out)
631 {
634
637
639
642
643 bucket_head2 head = {0};
644 head.m_signature = SWAP64LE(LEVIN_SIGNATURE);
645 head.m_cb = SWAP64LE(in_buff.size());
646 head.m_have_to_return_data = true;
647
649 head.m_command = SWAP32LE(command);
650 head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1);
651
652 boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 0);
654 if(!m_pservice_endpoint->do_send(&head, sizeof(head)))
655 {
656 LOG_ERROR_CC(m_connection_context, "Failed to do_send");
658 }
659
660 if(!m_pservice_endpoint->do_send(in_buff.data(), in_buff.size()))
661 {
662 LOG_ERROR_CC(m_connection_context, "Failed to do_send");
664 }
666
667 MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb
668 << ", f=" << head.m_flags
669 << ", r?=" << head.m_have_to_return_data
670 << ", cmd = " << head.m_command
671 << ", ver=" << head.m_protocol_version);
672
673 uint64_t ticks_start = misc_utils::get_tick_count();
674 size_t prev_size = 0;
675
676 while(!boost::interprocess::ipcdetail::atomic_read32(&m_invoke_buf_ready) && !m_deletion_initiated && !m_protocol_released)
677 {
678 if(m_cache_in_buffer.size() - prev_size >= MIN_BYTES_WANTED)
679 {
680 prev_size = m_cache_in_buffer.size();
681 ticks_start = misc_utils::get_tick_count();
682 }
683 if(misc_utils::get_tick_count() - ticks_start > m_config.m_invoke_timeout)
684 {
685 MWARNING(m_connection_context << "invoke timeout (" << m_config.m_invoke_timeout << "), closing connection ");
686 close();
688 }
689 if(!m_pservice_endpoint->call_run_once_service_io())
691 }
692
695
697 buff_out.swap(m_local_inv_buff);
698 m_local_inv_buff.clear();
700
702 }
703
704 int notify(int command, const epee::span<const uint8_t> in_buff)
705 {
708
711
713
716
717 bucket_head2 head = {0};
718 head.m_signature = SWAP64LE(LEVIN_SIGNATURE);
719 head.m_have_to_return_data = false;
720 head.m_cb = SWAP64LE(in_buff.size());
721
722 head.m_command = SWAP32LE(command);
723 head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1);
726 if(!m_pservice_endpoint->do_send(&head, sizeof(head)))
727 {
728 LOG_ERROR_CC(m_connection_context, "Failed to do_send()");
729 return -1;
730 }
731
732 if(!m_pservice_endpoint->do_send(in_buff.data(), in_buff.size()))
733 {
734 LOG_ERROR_CC(m_connection_context, "Failed to do_send()");
735 return -1;
736 }
738 LOG_DEBUG_CC(m_connection_context, "LEVIN_PACKET_SENT. [len=" << head.m_cb <<
739 ", f=" << head.m_flags <<
740 ", r?=" << head.m_have_to_return_data <<
741 ", cmd = " << head.m_command <<
742 ", ver=" << head.m_protocol_version);
743
744 return 1;
745 }
746 //------------------------------------------------------------------------------------------
747 boost::uuids::uuid get_connection_id() {return m_connection_context.m_connection_id;}
748 //------------------------------------------------------------------------------------------
749 t_connection_context& get_context_ref() {return m_connection_context;}
750};
751//------------------------------------------------------------------------------------------
752template<class t_connection_context>
753void async_protocol_handler_config<t_connection_context>::del_connection(async_protocol_handler<t_connection_context>* pconn)
754{
755 CRITICAL_REGION_BEGIN(m_connects_lock);
756 m_connects.erase(pconn->get_connection_id());
758 m_pcommands_handler->on_connection_close(pconn->m_connection_context);
759}
760//------------------------------------------------------------------------------------------
761template<class t_connection_context>
762void async_protocol_handler_config<t_connection_context>::delete_connections(size_t count, bool incoming)
763{
764 std::vector <boost::uuids::uuid> connections;
765 CRITICAL_REGION_BEGIN(m_connects_lock);
766 for (auto& c: m_connects)
767 {
768 if (c.second->m_connection_context.m_is_income == incoming)
769 connections.push_back(c.first);
770 }
771
772 // close random connections from the provided set
773 // TODO or better just keep removing random elements (performance)
774 unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
775 shuffle(connections.begin(), connections.end(), std::default_random_engine(seed));
776 while (count > 0 && connections.size() > 0)
777 {
778 try
779 {
780 auto i = connections.end() - 1;
781 async_protocol_handler<t_connection_context> *conn = m_connects.at(*i);
782 del_connection(conn);
783 close(*i);
784 connections.erase(i);
785 }
786 catch (const std::out_of_range &e)
787 {
788 MWARNING("Connection not found in m_connects, continuing");
789 }
790 --count;
791 }
792
794}
795//------------------------------------------------------------------------------------------
796template<class t_connection_context>
798{
799 delete_connections(count, false);
800}
801//------------------------------------------------------------------------------------------
802template<class t_connection_context>
804{
805 delete_connections(count, true);
806}
807//------------------------------------------------------------------------------------------
808template<class t_connection_context>
809void async_protocol_handler_config<t_connection_context>::add_connection(async_protocol_handler<t_connection_context>* pconn)
810{
811 CRITICAL_REGION_BEGIN(m_connects_lock);
812 m_connects[pconn->get_connection_id()] = pconn;
814 m_pcommands_handler->on_connection_new(pconn->m_connection_context);
815}
816//------------------------------------------------------------------------------------------
817template<class t_connection_context>
818async_protocol_handler<t_connection_context>* async_protocol_handler_config<t_connection_context>::find_connection(boost::uuids::uuid connection_id) const
819{
820 auto it = m_connects.find(connection_id);
821 return it == m_connects.end() ? 0 : it->second;
822}
823//------------------------------------------------------------------------------------------
824template<class t_connection_context>
825int async_protocol_handler_config<t_connection_context>::find_and_lock_connection(boost::uuids::uuid connection_id, async_protocol_handler<t_connection_context>*& aph)
826{
827 CRITICAL_REGION_LOCAL(m_connects_lock);
828 aph = find_connection(connection_id);
829 if(0 == aph)
831 if(!aph->start_outer_call())
833 return LEVIN_OK;
834}
835//------------------------------------------------------------------------------------------
836template<class t_connection_context>
837int async_protocol_handler_config<t_connection_context>::invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out, boost::uuids::uuid connection_id)
838{
840 int r = find_and_lock_connection(connection_id, aph);
841 return LEVIN_OK == r ? aph->invoke(command, in_buff, buff_out) : r;
842}
843//------------------------------------------------------------------------------------------
844template<class t_connection_context> template<class callback_t>
845int async_protocol_handler_config<t_connection_context>::invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout)
846{
848 int r = find_and_lock_connection(connection_id, aph);
849 return LEVIN_OK == r ? aph->async_invoke(command, in_buff, cb, timeout) : r;
850}
851//------------------------------------------------------------------------------------------
852template<class t_connection_context> template<class callback_t>
854{
855 CRITICAL_REGION_LOCAL(m_connects_lock);
856 for(auto& c: m_connects)
857 {
859 if(!cb(aph->get_context_ref()))
860 return false;
861 }
862 return true;
863}
864//------------------------------------------------------------------------------------------
865template<class t_connection_context> template<class callback_t>
866bool async_protocol_handler_config<t_connection_context>::for_connection(const boost::uuids::uuid &connection_id, const callback_t &cb)
867{
868 CRITICAL_REGION_LOCAL(m_connects_lock);
869 async_protocol_handler<t_connection_context>* aph = find_connection(connection_id);
870 if (!aph)
871 return false;
872 if(!cb(aph->get_context_ref()))
873 return false;
874 return true;
875}
876//------------------------------------------------------------------------------------------
877template<class t_connection_context>
879{
880 CRITICAL_REGION_LOCAL(m_connects_lock);
881 return m_connects.size();
882}
883//------------------------------------------------------------------------------------------
884template<class t_connection_context>
886{
887 if (m_pcommands_handler && m_pcommands_handler_destroy)
888 (*m_pcommands_handler_destroy)(m_pcommands_handler);
889 m_pcommands_handler = handler;
890 m_pcommands_handler_destroy = destroy;
891}
892//------------------------------------------------------------------------------------------
893template<class t_connection_context>
894int async_protocol_handler_config<t_connection_context>::notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id)
895{
897 int r = find_and_lock_connection(connection_id, aph);
898 return LEVIN_OK == r ? aph->notify(command, in_buff) : r;
899}
900//------------------------------------------------------------------------------------------
901template<class t_connection_context>
903{
904 CRITICAL_REGION_LOCAL(m_connects_lock);
905 async_protocol_handler<t_connection_context>* aph = find_connection(connection_id);
906 return 0 != aph ? aph->close() : false;
907}
908//------------------------------------------------------------------------------------------
909template<class t_connection_context>
911{
912 CRITICAL_REGION_LOCAL(m_connects_lock);
913 async_protocol_handler<t_connection_context>* aph = find_connection(contxt.m_connection_id);
914 if(0 == aph)
915 return false;
916 aph->update_connection_context(contxt);
917 return true;
918}
919//------------------------------------------------------------------------------------------
920template<class t_connection_context>
922{
924 int r = find_and_lock_connection(connection_id, aph);
925 if(LEVIN_OK == r)
926 {
927 aph->request_callback();
928 return true;
929 }
930 else
931 {
932 return false;
933 }
934}
935}
936}
bool request_callback(boost::uuids::uuid connection_id)
int invoke_async(int command, const epee::span< const uint8_t > in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout=LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
bool for_connection(const boost::uuids::uuid &connection_id, const callback_t &cb)
void set_handler(levin_commands_handler< t_connection_context > *handler, void(*destroy)(levin_commands_handler< t_connection_context > *)=NULL)
bool update_connection_context(const t_connection_context &contxt)
int invoke(int command, const epee::span< const uint8_t > in_buff, std::string &buff_out, boost::uuids::uuid connection_id)
int notify(int command, const epee::span< const uint8_t > in_buff, boost::uuids::uuid connection_id)
bool add_invoke_response_handler(const callback_t &cb, uint64_t timeout, async_protocol_handler &con, int command)
int notify(int command, const epee::span< const uint8_t > in_buff)
void update_connection_context(const connection_context &contxt)
virtual bool handle_recv(const void *ptr, size_t cb)
int invoke(int command, const epee::span< const uint8_t > in_buff, std::string &buff_out)
std::list< boost::shared_ptr< invoke_response_handler_base > > m_invoke_response_handlers
async_protocol_handler_config< t_connection_context > config_type
async_protocol_handler(net_utils::i_service_endpoint *psnd_hndlr, config_type &config, t_connection_context &conn_context)
bool async_invoke(int command, const epee::span< const uint8_t > in_buff, const callback_t &cb, size_t timeout=LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
Non-owning sequence of data. Does not deep copy.
Definition span.h:57
constexpr std::size_t size() const noexcept
Definition span.h:111
constexpr pointer data() const noexcept
Definition span.h:110
const char * res
#define SWAP64LE
Definition int-util.h:232
#define SWAP32LE
Definition int-util.h:224
#define LEVIN_PACKET_RESPONSE
Definition levin_base.h:74
#define LEVIN_PROTOCOL_VER_1
Definition levin_base.h:78
#define LEVIN_ERROR_CONNECTION
Definition levin_base.h:94
#define LEVIN_PACKET_REQUEST
Definition levin_base.h:73
#define LEVIN_DEFAULT_MAX_PACKET_SIZE
Definition levin_base.h:71
#define LEVIN_OK
Definition levin_base.h:93
#define LEVIN_ERROR_CONNECTION_DESTROYED
Definition levin_base.h:96
#define LEVIN_ERROR_CONNECTION_NOT_FOUND
Definition levin_base.h:95
#define LEVIN_SIGNATURE
Definition levin_base.h:34
#define LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED
Definition levin_base.h:70
#define LEVIN_ERROR_CONNECTION_TIMEDOUT
Definition levin_base.h:97
#define MIN_BYTES_WANTED
#define MERROR(x)
Definition misc_log_ex.h:73
#define CHECK_AND_ASSERT_MES_NO_RET(expr, message)
#define MWARNING(x)
Definition misc_log_ex.h:74
#define MDEBUG(x)
Definition misc_log_ex.h:76
#define MTRACE(x)
Definition misc_log_ex.h:77
#define MINFO(x)
Definition misc_log_ex.h:75
boost::shared_ptr< call_befor_die_base > auto_scope_leave_caller
uint64_t get_tick_count()
bool sleep_no_w(long ms)
auto_scope_leave_caller create_scope_leave_handler(t_scope_leave_handler f)
mdb_size_t count(MDB_cursor *cur)
#define LOG_DEBUG_CC(ct, message)
#define LOG_ERROR_CC(ct, message)
struct rule_list head
#define false
unsigned int uint32_t
Definition stdint.h:126
signed int int32_t
Definition stdint.h:123
unsigned char uint8_t
Definition stdint.h:124
unsigned __int64 uint64_t
Definition stdint.h:136
virtual bool handle(int res, const epee::span< const uint8_t > buff, typename async_protocol_handler::connection_context &context)
anvoke_handler(const callback_t &cb, uint64_t timeout, async_protocol_handler &con, int command)
virtual bool handle(int res, const epee::span< const uint8_t > buff, connection_context &context)=0
#define CRITICAL_REGION_LOCAL1(x)
Definition syncobj.h:230
#define CRITICAL_REGION_LOCAL(x)
Definition syncobj.h:228
#define CRITICAL_REGION_END()
Definition syncobj.h:233
#define CRITICAL_REGION_BEGIN(x)
Definition syncobj.h:229