Monero
Loading...
Searching...
No Matches
levin_protocol_handler_async.h
Go to the documentation of this file.
1// Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are met:
6// * Redistributions of source code must retain the above copyright
7// notice, this list of conditions and the following disclaimer.
8// * Redistributions in binary form must reproduce the above copyright
9// notice, this list of conditions and the following disclaimer in the
10// documentation and/or other materials provided with the distribution.
11// * Neither the name of the Andrey N. Sabelnikov nor the
12// names of its contributors may be used to endorse or promote products
13// derived from this software without specific prior written permission.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
16// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER BE LIABLE FOR ANY
19// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25//
26
27#pragma once
28#include <boost/asio/deadline_timer.hpp>
29#include <boost/uuid/uuid_generators.hpp>
30#include <boost/unordered_map.hpp>
31#include <boost/smart_ptr/make_shared.hpp>
32
33#include <atomic>
34#include <deque>
35
36#include "levin_base.h"
37#include "buffer.h"
38#include "misc_language.h"
39#include "syncobj.h"
40#include "time_helper.h"
41#include "int-util.h"
42
43#include <random>
44#include <chrono>
45
46#undef MONERO_DEFAULT_LOG_CATEGORY
47#define MONERO_DEFAULT_LOG_CATEGORY "net"
48
49#ifndef MIN_BYTES_WANTED
50#define MIN_BYTES_WANTED 512
51#endif
52
53template<typename context_t>
54void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, const char* category)
55{
56 MCINFO("net.p2p.traffic", context << bytes << " bytes " << (sent ? "sent" : "received") << (error ? "/corrupt" : "")
57 << " for category " << category << " initiated by " << (initiator ? "us" : "peer"));
58}
59
60template<typename context_t>
61void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, int command)
62{
63 char buf[32];
64 snprintf(buf, sizeof(buf), "command-%u", command);
65 on_levin_traffic(context, initiator, sent, error, bytes, buf);
66}
67
68namespace epee
69{
70namespace levin
71{
72
73/************************************************************************/
74/* */
75/************************************************************************/
76template<class t_connection_context>
78
79template<class t_connection_context>
81{
82 typedef boost::unordered_map<boost::uuids::uuid, async_protocol_handler<t_connection_context>* > connections_map;
85
88
89 async_protocol_handler<t_connection_context>* find_connection(boost::uuids::uuid connection_id) const;
90 int find_and_lock_connection(boost::uuids::uuid connection_id, async_protocol_handler<t_connection_context>*& aph);
91
92 friend class async_protocol_handler<t_connection_context>;
93
96
97 void delete_connections (size_t count, bool incoming);
98
99public:
100 typedef t_connection_context connection_context;
104
105 int invoke(int command, message_writer in_msg, std::string& buff_out, boost::uuids::uuid connection_id);
106 template<class callback_t>
107 int invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED);
108
109 int send(epee::byte_slice message, const boost::uuids::uuid& connection_id);
110 bool close(boost::uuids::uuid connection_id);
111 bool update_connection_context(const t_connection_context& contxt);
112 bool request_callback(boost::uuids::uuid connection_id);
113 template<class callback_t>
114 bool foreach_connection(const callback_t &cb);
115 template<class callback_t>
116 bool for_connection(const boost::uuids::uuid &connection_id, const callback_t &cb);
121
125 void del_out_connections(size_t count);
126 void del_in_connections(size_t count);
127};
128
129
130/************************************************************************/
131/* */
132/************************************************************************/
133template<class t_connection_context = net_utils::connection_context_base>
135{
136 std::string m_fragment_buffer;
137
139 {
140 if (message.size() < sizeof(message_writer::header))
141 return false;
142
144 std::memcpy(std::addressof(head), message.data(), sizeof(head));
145 if(!m_pservice_endpoint->do_send(std::move(message)))
146 return false;
147
148 on_levin_traffic(m_connection_context, true, true, false, head.m_cb, head.m_command);
149 MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb
150 << ", flags" << head.m_flags
151 << ", r?=" << head.m_have_to_return_data
152 <<", cmd = " << head.m_command
153 << ", ver=" << head.m_protocol_version);
154 return true;
155 }
156
157public:
158 typedef t_connection_context connection_context;
160
166
167 std::atomic<bool> m_protocol_released;
168 std::atomic<bool> m_invoke_buf_ready;
169
171
173 std::string m_local_inv_buff;
174
176
177 std::atomic<uint32_t> m_wait_count;
178 std::atomic<uint32_t> m_close_called;
182 t_connection_context& m_connection_context;
183 std::atomic<uint64_t> m_max_packet_size;
184
187
190
192 {
193 virtual bool handle(int res, const epee::span<const uint8_t> buff, connection_context& context)=0;
194 virtual bool is_timer_started() const=0;
195 virtual void cancel()=0;
196 virtual bool cancel_timer()=0;
197 virtual void reset_timer()=0;
198 };
199 template <class callback_t>
201 {
202 anvoke_handler(const callback_t& cb, uint64_t timeout, async_protocol_handler& con, int command)
203 :m_cb(cb), m_timeout(timeout), m_con(con), m_timer(con.m_pservice_endpoint->get_io_context()), m_timer_started(false),
205 {
206 if(m_con.start_outer_call())
207 {
208 MDEBUG(con.get_context_ref() << "anvoke_handler, timeout: " << timeout);
209 m_timer.expires_from_now(boost::posix_time::milliseconds(timeout));
210 m_timer.async_wait([&con, command, cb, timeout](const boost::system::error_code& ec)
211 {
212 if(ec == boost::asio::error::operation_aborted)
213 return;
214 MINFO(con.get_context_ref() << "Timeout on invoke operation happened, command: " << command << " timeout: " << timeout);
215 epee::span<const uint8_t> fake;
216 cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, con.get_context_ref());
217 con.close();
218 con.finish_outer_call();
219 });
220 m_timer_started = true;
221 }
222 }
224 {}
225 callback_t m_cb;
227 boost::asio::deadline_timer m_timer;
234 {
235 if(!cancel_timer())
236 return false;
237 m_cb(res, buff, context);
238 m_con.finish_outer_call();
239 return true;
240 }
241 virtual bool is_timer_started() const
242 {
243 return m_timer_started;
244 }
245 virtual void cancel()
246 {
247 if(cancel_timer())
248 {
250 m_cb(LEVIN_ERROR_CONNECTION_DESTROYED, fake, m_con.get_context_ref());
251 m_con.finish_outer_call();
252 }
253 }
254 virtual bool cancel_timer()
255 {
257 {
259 boost::system::error_code ignored_ec;
260 m_timer_cancelled = 1 == m_timer.cancel(ignored_ec);
261 }
262 return m_timer_cancelled;
263 }
264 virtual void reset_timer()
265 {
266 boost::system::error_code ignored_ec;
267 if (!m_cancel_timer_called && m_timer.cancel(ignored_ec) > 0)
268 {
269 callback_t& cb = m_cb;
270 uint64_t timeout = m_timeout;
272 int command = m_command;
273 m_timer.expires_from_now(boost::posix_time::milliseconds(m_timeout));
274 m_timer.async_wait([&con, cb, command, timeout](const boost::system::error_code& ec)
275 {
276 if(ec == boost::asio::error::operation_aborted)
277 return;
278 MINFO(con.get_context_ref() << "Timeout on invoke operation happened, command: " << command << " timeout: " << timeout);
281 con.close();
282 con.finish_outer_call();
283 });
284 }
285 }
286 };
288 std::list<boost::shared_ptr<invoke_response_handler_base> > m_invoke_response_handlers;
289
290 template<class callback_t>
291 bool add_invoke_response_handler(const callback_t &cb, uint64_t timeout, async_protocol_handler& con, int command)
292 {
295 {
296 MERROR("Adding response handler to a released object");
297 return false;
298 }
299 boost::shared_ptr<invoke_response_handler_base> handler(boost::make_shared<anvoke_handler<callback_t>>(cb, timeout, con, command));
300 m_invoke_response_handlers.push_back(handler);
301 return handler->is_timer_started();
302 }
303 template<class callback_t> friend struct anvoke_handler;
304public:
307 t_connection_context& conn_context):
309 m_pservice_endpoint(psnd_hndlr),
311 m_connection_context(conn_context),
312 m_max_packet_size(config.m_initial_max_packet_size),
313 m_cache_in_buffer(4 * 1024),
315 {
316 m_close_called = 0;
317 m_protocol_released = false;
318 m_wait_count = 0;
321 m_invoke_buf_ready = false;
323 }
325 {
326 try
327 {
328
330 {
331 m_config.del_connection(this);
332 }
333
334 for (size_t i = 0; i < 60 * 1000 / 100 && 0 != m_wait_count; ++i)
335 {
337 }
338 CHECK_AND_ASSERT_MES_NO_RET(0 == m_wait_count, "Failed to wait for operation completion. m_wait_count = " << m_wait_count.load());
339
340 MTRACE(m_connection_context << "~async_protocol_handler()");
341
342 }
343 catch (...) { /* ignore */ }
344 }
345
347 {
348 MTRACE(m_connection_context << "[levin_protocol] -->> start_outer_call");
349 if(!m_pservice_endpoint->add_ref())
350 {
351 MERROR(m_connection_context << "[levin_protocol] -->> start_outer_call failed");
352 return false;
353 }
354 ++m_wait_count;
355 return true;
356 }
358 {
359 MTRACE(m_connection_context << "[levin_protocol] <<-- finish_outer_call");
360 --m_wait_count;
361 m_pservice_endpoint->release();
362 return true;
363 }
364
366 {
367 decltype(m_invoke_response_handlers) local_invoke_response_handlers;
369 local_invoke_response_handlers.swap(m_invoke_response_handlers);
370 m_protocol_released = true;
372
373 // Never call callback inside critical section, that can cause deadlock. Callback can be called when
374 // invoke_response_handler_base is cancelled
375 std::for_each(local_invoke_response_handlers.begin(), local_invoke_response_handlers.end(), [](const boost::shared_ptr<invoke_response_handler_base>& pinv_resp_hndlr) {
376 pinv_resp_hndlr->cancel();
377 });
378
379 return true;
380 }
381
382 bool close()
383 {
385
386 m_pservice_endpoint->close();
387 return true;
388 }
389
391 {
392 m_connection_context = contxt;
393 }
394
396 {
399
400 m_pservice_endpoint->request_callback();
401 }
402
404 {
405 m_config.m_pcommands_handler->callback(m_connection_context);
406 }
407
408 virtual bool handle_recv(const void* ptr, size_t cb)
409 {
411 return false; //closing connections
412
413 if(!m_config.m_pcommands_handler)
414 {
415 MERROR(m_connection_context << "Commands handler not set!");
416 return false;
417 }
418
419 // these should never fail, but do runtime check for safety
420 const uint64_t max_packet_size = m_max_packet_size;
421 CHECK_AND_ASSERT_MES(max_packet_size >= m_cache_in_buffer.size(), false, "Bad m_cache_in_buffer.size()");
422 CHECK_AND_ASSERT_MES(max_packet_size - m_cache_in_buffer.size() >= m_fragment_buffer.size(), false, "Bad m_cache_in_buffer.size() + m_fragment_buffer.size()");
423
424 // flipped to subtraction; prevent overflow since m_max_packet_size is variable and public
425 if(cb > max_packet_size - m_cache_in_buffer.size() - m_fragment_buffer.size())
426 {
427 MWARNING(m_connection_context << "Maximum packet size exceed!, m_max_packet_size = " << max_packet_size
428 << ", packet received " << m_cache_in_buffer.size() + cb
429 << ", connection will be closed.");
430 return false;
431 }
432
433 m_cache_in_buffer.append((const char*)ptr, cb);
434
435 bool is_continue = true;
436 while(is_continue)
437 {
438 switch(m_state)
439 {
441 if(m_cache_in_buffer.size() < m_current_head.m_cb)
442 {
443 is_continue = false;
444 if(cb >= MIN_BYTES_WANTED)
445 {
447 if (!m_invoke_response_handlers.empty())
448 {
449 //async call scenario
450 boost::shared_ptr<invoke_response_handler_base> response_handler = m_invoke_response_handlers.front();
451 response_handler->reset_timer();
452 MDEBUG(m_connection_context << "LEVIN_PACKET partial msg received. len=" << cb << ", current total " << m_cache_in_buffer.size() << "/" << m_current_head.m_cb << " (" << (100.0f * m_cache_in_buffer.size() / (m_current_head.m_cb ? m_current_head.m_cb : 1)) << "%)");
453 }
454 }
455 break;
456 }
457
458 {
459 std::string temp{};
460 epee::span<const uint8_t> buff_to_invoke = m_cache_in_buffer.carve((std::string::size_type)m_current_head.m_cb);
462
463 // abstract_tcp_server2.h manages max bandwidth for a p2p link
465 {
466 // special noise/fragment command
467 static constexpr const uint32_t both_flags = (LEVIN_PACKET_BEGIN | LEVIN_PACKET_END);
468 if ((m_current_head.m_flags & both_flags) == both_flags)
469 break; // noise message, skip to next message
470
472 m_fragment_buffer.clear();
473
474 m_fragment_buffer.append(reinterpret_cast<const char*>(buff_to_invoke.data()), buff_to_invoke.size());
475 if (!(m_current_head.m_flags & LEVIN_PACKET_END))
476 break; // skip to next message
477
478 if (m_fragment_buffer.size() < sizeof(bucket_head2))
479 {
480 MERROR(m_connection_context << "Fragmented data too small for levin header");
481 return false;
482 }
483
484 temp = std::move(m_fragment_buffer);
485 m_fragment_buffer.clear();
486 std::memcpy(std::addressof(m_current_head), std::addressof(temp[0]), sizeof(bucket_head2));
487 const size_t max_bytes = m_connection_context.get_max_bytes(m_current_head.m_command);
488 if(m_current_head.m_cb > std::min<size_t>(max_packet_size, max_bytes))
489 {
490 MERROR(m_connection_context << "Maximum packet size exceed!, m_max_packet_size = " << std::min<size_t>(max_packet_size, max_bytes)
491 << ", packet header received " << m_current_head.m_cb << ", command " << m_current_head.m_command
492 << ", connection will be closed.");
493 return false;
494 }
495 buff_to_invoke = {reinterpret_cast<const uint8_t*>(temp.data()) + sizeof(bucket_head2), temp.size() - sizeof(bucket_head2)};
496 }
497
499
500 MDEBUG(m_connection_context << "LEVIN_PACKET_RECEIVED. [len=" << m_current_head.m_cb
501 << ", flags" << m_current_head.m_flags
502 << ", r?=" << m_current_head.m_have_to_return_data
503 <<", cmd = " << m_current_head.m_command
504 << ", v=" << m_current_head.m_protocol_version);
505
506 if(is_response)
507 {//response to some invoke
508
510 if(!m_invoke_response_handlers.empty())
511 {//async call scenario
512 boost::shared_ptr<invoke_response_handler_base> response_handler = m_invoke_response_handlers.front();
513 bool timer_cancelled = response_handler->cancel_timer();
514 // Don't pop handler, to avoid destroying it
515 if(timer_cancelled)
516 m_invoke_response_handlers.pop_front();
517 invoke_response_handlers_guard.unlock();
518
519 if(timer_cancelled)
520 response_handler->handle(m_current_head.m_return_code, buff_to_invoke, m_connection_context);
521 }
522 else
523 {
524 invoke_response_handlers_guard.unlock();
525 //use sync call scenario
527 {
528 MERROR(m_connection_context << "no active invoke when response came, wtf?");
529 return false;
530 }else
531 {
533 m_local_inv_buff = std::string((const char*)buff_to_invoke.data(), buff_to_invoke.size());
534 buff_to_invoke = epee::span<const uint8_t>((const uint8_t*)NULL, 0);
535 m_invoke_result_code = m_current_head.m_return_code;
537 m_invoke_buf_ready = true;
538 }
539 }
540 }else
541 {
542 if(m_current_head.m_have_to_return_data)
543 {
544 levin::message_writer return_message{32 * 1024};
545 const uint32_t return_code = m_config.m_pcommands_handler->invoke(
546 m_current_head.m_command, buff_to_invoke, return_message.buffer, m_connection_context
547 );
548
549 // peer_id remains unset if dropped
550 if (m_current_head.m_command == m_connection_context.handshake_command() && m_connection_context.handshake_complete())
551 m_max_packet_size = m_config.m_max_packet_size;
552
553 if(!send_message(return_message.finalize_response(m_current_head.m_command, return_code)))
554 return false;
555 }
556 else
557 m_config.m_pcommands_handler->notify(m_current_head.m_command, buff_to_invoke, m_connection_context);
558 }
559 // reuse small buffer
560 if (!temp.empty() && temp.capacity() <= 64 * 1024)
561 {
562 temp.clear();
563 m_fragment_buffer = std::move(temp);
564 }
565 }
566 break;
568 {
569 if(m_cache_in_buffer.size() < sizeof(bucket_head2))
570 {
571 if(m_cache_in_buffer.size() >= sizeof(uint64_t) && *((uint64_t*)m_cache_in_buffer.span(8).data()) != SWAP64LE(LEVIN_SIGNATURE))
572 {
573 MWARNING(m_connection_context << "Signature mismatch, connection will be closed");
574 return false;
575 }
576 is_continue = false;
577 break;
578 }
579
580#if BYTE_ORDER == LITTLE_ENDIAN
581 bucket_head2& phead = *(bucket_head2*)m_cache_in_buffer.span(sizeof(bucket_head2)).data();
582#else
583 bucket_head2 phead = *(bucket_head2*)m_cache_in_buffer.span(sizeof(bucket_head2)).data();
584 phead.m_signature = SWAP64LE(phead.m_signature);
585 phead.m_cb = SWAP64LE(phead.m_cb);
586 phead.m_command = SWAP32LE(phead.m_command);
587 phead.m_return_code = SWAP32LE(phead.m_return_code);
588 phead.m_flags = SWAP32LE(phead.m_flags);
590#endif
591 if(LEVIN_SIGNATURE != phead.m_signature)
592 {
593 LOG_ERROR_CC(m_connection_context, "Signature mismatch, connection will be closed");
594 return false;
595 }
596 m_current_head = phead;
597
598 m_cache_in_buffer.erase(sizeof(bucket_head2));
600 m_oponent_protocol_ver = m_current_head.m_protocol_version;
601 const size_t max_bytes = m_connection_context.get_max_bytes(m_current_head.m_command);
602 if(m_current_head.m_cb > std::min<size_t>(max_packet_size, max_bytes))
603 {
604 LOG_ERROR_CC(m_connection_context, "Maximum packet size exceed!, m_max_packet_size = " << std::min<size_t>(max_packet_size, max_bytes)
605 << ", packet header received " << m_current_head.m_cb << ", command " << m_current_head.m_command
606 << ", connection will be closed.");
607 return false;
608 }
609 }
610 break;
611 default:
612 LOG_ERROR_CC(m_connection_context, "Undefined state in levin_server_impl::connection_handler, m_state=" << m_state);
613 return false;
614 }
615 }
616
617 return true;
618 }
619
621 {
623 {
625 m_config.add_connection(this);
626 }
627 return true;
628 }
629
630 template<class callback_t>
631 bool async_invoke(int command, message_writer in_msg, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
632 {
635
637 timeout = m_config.m_invoke_timeout;
638
639 int err_code = LEVIN_OK;
640 do
641 {
643
644 m_invoke_buf_ready = false;
646
647 if (command == m_connection_context.handshake_command())
648 m_max_packet_size = m_config.m_max_packet_size;
649
650 if(!send_message(in_msg.finalize_invoke(command)))
651 {
652 LOG_ERROR_CC(m_connection_context, "Failed to do_send");
653 err_code = LEVIN_ERROR_CONNECTION;
654 break;
655 }
656
657 if(!add_invoke_response_handler(cb, timeout, *this, command))
658 {
660 break;
661 }
663 } while (false);
664
665 if (LEVIN_OK != err_code)
666 {
667 epee::span<const uint8_t> stub_buff = nullptr;
668 // Never call callback inside critical section, that can cause deadlock
669 cb(err_code, stub_buff, m_connection_context);
670 return false;
671 }
672
673 return true;
674 }
675
676 int invoke(int command, message_writer in_msg, std::string& buff_out)
677 {
680
682
683 m_invoke_buf_ready = false;
684
685 if (command == m_connection_context.handshake_command())
686 m_max_packet_size = m_config.m_max_packet_size;
687
688 if (!send_message(in_msg.finalize_invoke(command)))
689 {
690 LOG_ERROR_CC(m_connection_context, "Failed to send request");
692 }
693
694 uint64_t ticks_start = misc_utils::get_tick_count();
695 size_t prev_size = 0;
696
698 {
699 if(m_cache_in_buffer.size() - prev_size >= MIN_BYTES_WANTED)
700 {
701 prev_size = m_cache_in_buffer.size();
702 ticks_start = misc_utils::get_tick_count();
703 }
704 if(misc_utils::get_tick_count() - ticks_start > m_config.m_invoke_timeout)
705 {
706 MWARNING(m_connection_context << "invoke timeout (" << m_config.m_invoke_timeout << "), closing connection ");
707 close();
709 }
710 if(!m_pservice_endpoint->call_run_once_service_io())
712 }
713
716
718 buff_out.swap(m_local_inv_buff);
719 m_local_inv_buff.clear();
721
723 }
724
731 int send(byte_slice message)
732 {
735 );
736
737 if (!send_message(std::move(message)))
738 {
739 LOG_ERROR_CC(m_connection_context, "Failed to send message, dropping it");
740 return -1;
741 }
742 return 1;
743 }
744 //------------------------------------------------------------------------------------------
745 boost::uuids::uuid get_connection_id() {return m_connection_context.m_connection_id;}
746 //------------------------------------------------------------------------------------------
747 t_connection_context& get_context_ref() {return m_connection_context;}
748};
749//------------------------------------------------------------------------------------------
750template<class t_connection_context>
758//------------------------------------------------------------------------------------------
759template<class t_connection_context>
761{
762 std::vector<typename connections_map::mapped_type> connections;
763
764 auto scope_exit_handler = misc_utils::create_scope_leave_handler([&connections]{
765 for (auto &aph: connections)
766 aph->finish_outer_call();
767 });
768
770 for (auto& c: m_connects)
771 {
772 if (c.second->m_connection_context.m_is_income == incoming)
773 if (c.second->start_outer_call())
774 connections.push_back(c.second);
775 }
776
777 // close random connections from the provided set
778 // TODO or better just keep removing random elements (performance)
779 unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
780 shuffle(connections.begin(), connections.end(), std::default_random_engine(seed));
781 for (size_t i = 0; i < connections.size() && i < count; ++i)
782 m_connects.erase(connections[i]->get_connection_id());
783
785
786 for (size_t i = 0; i < connections.size() && i < count; ++i)
787 connections[i]->close();
788}
789//------------------------------------------------------------------------------------------
790template<class t_connection_context>
795//------------------------------------------------------------------------------------------
796template<class t_connection_context>
801//------------------------------------------------------------------------------------------
802template<class t_connection_context>
810//------------------------------------------------------------------------------------------
811template<class t_connection_context>
813{
814 auto it = m_connects.find(connection_id);
815 return it == m_connects.end() ? 0 : it->second;
816}
817//------------------------------------------------------------------------------------------
818template<class t_connection_context>
829//------------------------------------------------------------------------------------------
830template<class t_connection_context>
831int async_protocol_handler_config<t_connection_context>::invoke(int command, message_writer in_msg, std::string& buff_out, boost::uuids::uuid connection_id)
832{
834 int r = find_and_lock_connection(connection_id, aph);
835 return LEVIN_OK == r ? aph->invoke(command, std::move(in_msg), buff_out) : r;
836}
837//------------------------------------------------------------------------------------------
838template<class t_connection_context> template<class callback_t>
839int async_protocol_handler_config<t_connection_context>::invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout)
840{
842 int r = find_and_lock_connection(connection_id, aph);
843 return LEVIN_OK == r ? aph->async_invoke(command, std::move(in_msg), cb, timeout) : r;
844}
845//------------------------------------------------------------------------------------------
846template<class t_connection_context> template<class callback_t>
848{
849 std::vector<typename connections_map::mapped_type> conn;
850
851 auto scope_exit_handler = misc_utils::create_scope_leave_handler([&conn]{
852 for (auto &aph: conn)
853 aph->finish_outer_call();
854 });
855
857 conn.reserve(m_connects.size());
858 for (auto &e: m_connects)
859 if (e.second->start_outer_call())
860 conn.push_back(e.second);
862
863 for (auto &aph: conn)
864 if (!cb(aph->get_context_ref()))
865 return false;
866
867 return true;
868}
869//------------------------------------------------------------------------------------------
870template<class t_connection_context> template<class callback_t>
871bool async_protocol_handler_config<t_connection_context>::for_connection(const boost::uuids::uuid &connection_id, const callback_t &cb)
872{
874 if (find_and_lock_connection(connection_id, aph) != LEVIN_OK)
875 return false;
876 auto scope_exit_handler = misc_utils::create_scope_leave_handler(
878 if(!cb(aph->get_context_ref()))
879 return false;
880 return true;
881}
882//------------------------------------------------------------------------------------------
883template<class t_connection_context>
889//------------------------------------------------------------------------------------------
890template<class t_connection_context>
892{
894 size_t count = 0;
895 for (const auto &c: m_connects)
896 if (!c.second->m_connection_context.m_is_income)
897 ++count;
898 return count;
899}
900//------------------------------------------------------------------------------------------
901template<class t_connection_context>
903{
905 size_t count = 0;
906 for (const auto &c: m_connects)
907 if (c.second->m_connection_context.m_is_income)
908 ++count;
909 return count;
910}
911//------------------------------------------------------------------------------------------
912template<class t_connection_context>
920//------------------------------------------------------------------------------------------
921template<class t_connection_context>
922int async_protocol_handler_config<t_connection_context>::send(byte_slice message, const boost::uuids::uuid& connection_id)
923{
925 int r = find_and_lock_connection(connection_id, aph);
926 return LEVIN_OK == r ? aph->send(std::move(message)) : 0;
927}
928//------------------------------------------------------------------------------------------
929template<class t_connection_context>
931{
933 if (find_and_lock_connection(connection_id, aph) != LEVIN_OK)
934 return false;
935 auto scope_exit_handler = misc_utils::create_scope_leave_handler(
937 if (!aph->close())
938 return false;
940 m_connects.erase(connection_id);
941 return true;
942}
943//------------------------------------------------------------------------------------------
944template<class t_connection_context>
946{
949 if(0 == aph)
950 return false;
951 aph->update_connection_context(contxt);
952 return true;
953}
954//------------------------------------------------------------------------------------------
955template<class t_connection_context>
957{
959 int r = find_and_lock_connection(connection_id, aph);
960 if(LEVIN_OK == r)
961 {
962 aph->request_callback();
963 return true;
964 }
965 else
966 {
967 return false;
968 }
969}
970}
971}
static void close()
Definition blockchain_blackball.cpp:279
Definition byte_slice.h:69
Definition syncobj.h:125
Definition syncobj.h:82
Definition levin_protocol_handler_async.h:81
async_protocol_handler_config()
Definition levin_protocol_handler_async.h:122
bool request_callback(boost::uuids::uuid connection_id)
Definition levin_protocol_handler_async.h:956
~async_protocol_handler_config()
Definition levin_protocol_handler_async.h:124
size_t get_connections_count()
Definition levin_protocol_handler_async.h:884
int find_and_lock_connection(boost::uuids::uuid connection_id, async_protocol_handler< t_connection_context > *&aph)
Definition levin_protocol_handler_async.h:819
async_protocol_handler< t_connection_context > * find_connection(boost::uuids::uuid connection_id) const
Definition levin_protocol_handler_async.h:812
void(* m_pcommands_handler_destroy)(levin_commands_handler< detail::p2p_context > *)
Definition levin_protocol_handler_async.h:95
bool for_connection(const boost::uuids::uuid &connection_id, const callback_t &cb)
Definition levin_protocol_handler_async.h:871
bool foreach_connection(const callback_t &cb)
Definition levin_protocol_handler_async.h:847
levin_commands_handler< detail::p2p_context > * m_pcommands_handler
Definition levin_protocol_handler_async.h:94
int send(epee::byte_slice message, const boost::uuids::uuid &connection_id)
Definition levin_protocol_handler_async.h:922
boost::unordered_map< boost::uuids::uuid, async_protocol_handler< t_connection_context > * > connections_map
Definition levin_protocol_handler_async.h:82
uint64_t m_max_packet_size
Definition levin_protocol_handler_async.h:102
t_connection_context connection_context
Definition levin_protocol_handler_async.h:100
void delete_connections(size_t count, bool incoming)
Definition levin_protocol_handler_async.h:760
void del_out_connections(size_t count)
Definition levin_protocol_handler_async.h:791
void set_handler(levin_commands_handler< t_connection_context > *handler, void(*destroy)(levin_commands_handler< t_connection_context > *)=NULL)
Definition levin_protocol_handler_async.h:913
uint64_t m_invoke_timeout
Definition levin_protocol_handler_async.h:103
bool update_connection_context(const t_connection_context &contxt)
Definition levin_protocol_handler_async.h:945
critical_section m_connects_lock
Definition levin_protocol_handler_async.h:83
void add_connection(async_protocol_handler< t_connection_context > *pc)
Definition levin_protocol_handler_async.h:803
bool close(boost::uuids::uuid connection_id)
Definition levin_protocol_handler_async.h:930
size_t get_out_connections_count()
Definition levin_protocol_handler_async.h:891
void del_in_connections(size_t count)
Definition levin_protocol_handler_async.h:797
int invoke(int command, message_writer in_msg, std::string &buff_out, boost::uuids::uuid connection_id)
Definition levin_protocol_handler_async.h:831
int invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout=LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
Definition levin_protocol_handler_async.h:839
void del_connection(async_protocol_handler< t_connection_context > *pc)
Definition levin_protocol_handler_async.h:751
connections_map m_connects
Definition levin_protocol_handler_async.h:84
size_t get_in_connections_count()
Definition levin_protocol_handler_async.h:902
uint64_t m_initial_max_packet_size
Definition levin_protocol_handler_async.h:101
Definition levin_protocol_handler_async.h:135
std::string m_local_inv_buff
Definition levin_protocol_handler_async.h:173
bool send_message(byte_slice message)
Definition levin_protocol_handler_async.h:138
void handle_qued_callback()
Definition levin_protocol_handler_async.h:403
std::atomic< uint32_t > m_wait_count
Definition levin_protocol_handler_async.h:177
critical_section m_call_lock
Definition levin_protocol_handler_async.h:175
std::string m_fragment_buffer
Definition levin_protocol_handler_async.h:136
int32_t m_oponent_protocol_ver
Definition levin_protocol_handler_async.h:188
bool after_init_connection()
Definition levin_protocol_handler_async.h:620
bool async_invoke(int command, message_writer in_msg, const callback_t &cb, size_t timeout=LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
Definition levin_protocol_handler_async.h:631
bool release_protocol()
Definition levin_protocol_handler_async.h:365
bool add_invoke_response_handler(const callback_t &cb, uint64_t timeout, async_protocol_handler &con, int command)
Definition levin_protocol_handler_async.h:291
std::atomic< uint64_t > m_max_packet_size
Definition levin_protocol_handler_async.h:183
net_utils::i_service_endpoint * m_pservice_endpoint
Definition levin_protocol_handler_async.h:180
critical_section m_invoke_response_handlers_lock
Definition levin_protocol_handler_async.h:287
bool close()
Definition levin_protocol_handler_async.h:382
void update_connection_context(const connection_context &contxt)
Definition levin_protocol_handler_async.h:390
virtual bool handle_recv(const void *ptr, size_t cb)
Definition levin_protocol_handler_async.h:408
t_connection_context connection_context
Definition levin_protocol_handler_async.h:158
void request_callback()
Definition levin_protocol_handler_async.h:395
critical_section m_local_inv_buff_lock
Definition levin_protocol_handler_async.h:172
int send(byte_slice message)
Definition levin_protocol_handler_async.h:731
virtual ~async_protocol_handler()
Definition levin_protocol_handler_async.h:324
t_connection_context & get_context_ref()
Definition levin_protocol_handler_async.h:747
std::atomic< bool > m_invoke_buf_ready
Definition levin_protocol_handler_async.h:168
std::list< boost::shared_ptr< invoke_response_handler_base > > m_invoke_response_handlers
Definition levin_protocol_handler_async.h:288
bool m_connection_initialized
Definition levin_protocol_handler_async.h:189
stream_state m_state
Definition levin_protocol_handler_async.h:186
stream_state
Definition levin_protocol_handler_async.h:162
@ stream_state_body
Definition levin_protocol_handler_async.h:164
@ stream_state_head
Definition levin_protocol_handler_async.h:163
bool start_outer_call()
Definition levin_protocol_handler_async.h:346
test_connection_context & m_connection_context
Definition levin_protocol_handler_async.h:182
config_type & m_config
Definition levin_protocol_handler_async.h:181
boost::uuids::uuid get_connection_id()
Definition levin_protocol_handler_async.h:745
volatile int m_invoke_result_code
Definition levin_protocol_handler_async.h:170
async_protocol_handler_config< t_connection_context > config_type
Definition levin_protocol_handler_async.h:159
bucket_head2 m_current_head
Definition levin_protocol_handler_async.h:179
async_protocol_handler(net_utils::i_service_endpoint *psnd_hndlr, config_type &config, t_connection_context &conn_context)
Definition levin_protocol_handler_async.h:305
net_utils::buffer m_cache_in_buffer
Definition levin_protocol_handler_async.h:185
std::atomic< bool > m_protocol_released
Definition levin_protocol_handler_async.h:167
bool finish_outer_call()
Definition levin_protocol_handler_async.h:357
int invoke(int command, message_writer in_msg, std::string &buff_out)
Definition levin_protocol_handler_async.h:676
std::atomic< uint32_t > m_close_called
Definition levin_protocol_handler_async.h:178
Provides space for levin (p2p) header, so that payload can be sent without copy.
Definition levin_base.h:132
byte_slice finalize_invoke(uint32_t command)
Definition levin_base.h:151
byte_slice finalize_response(uint32_t command, uint32_t return_code)
Definition levin_base.h:153
byte_stream buffer
Has space for levin header until a finalize method is used.
Definition levin_base.h:159
bucket_head2 header
Definition levin_base.h:135
Definition buffer.h:47
Non-owning sequence of data. Does not deep copy.
Definition span.h:55
constexpr std::size_t size() const noexcept
Definition span.h:109
constexpr pointer data() const noexcept
Definition span.h:108
const uint8_t seed[32]
Definition code-generator.cpp:37
std::unique_ptr< test_connection > conn(new test_connection(io_service, m_handler_config))
#define false
const char * res
Definition hmac_keccak.cpp:42
#define SWAP64LE
Definition int-util.h:285
#define SWAP32LE
Definition int-util.h:277
#define LEVIN_PACKET_RESPONSE
Definition levin_base.h:80
#define LEVIN_PROTOCOL_VER_1
Definition levin_base.h:86
#define LEVIN_ERROR_CONNECTION
Definition levin_base.h:103
#define LEVIN_INITIAL_MAX_PACKET_SIZE
Definition levin_base.h:76
#define LEVIN_PACKET_END
Definition levin_base.h:82
#define LEVIN_PACKET_REQUEST
Definition levin_base.h:79
#define LEVIN_DEFAULT_MAX_PACKET_SIZE
Definition levin_base.h:77
#define LEVIN_OK
Definition levin_base.h:102
#define LEVIN_ERROR_CONNECTION_DESTROYED
Definition levin_base.h:105
#define LEVIN_PACKET_BEGIN
Definition levin_base.h:81
#define LEVIN_ERROR_CONNECTION_NOT_FOUND
Definition levin_base.h:104
#define LEVIN_SIGNATURE
Definition levin_base.h:38
#define LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED
Definition levin_base.h:75
#define LEVIN_ERROR_CONNECTION_TIMEDOUT
Definition levin_base.h:106
#define MIN_BYTES_WANTED
Definition levin_protocol_handler_async.h:50
void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, const char *category)
Definition levin_protocol_handler_async.h:54
Definition cryptonote_config.h:221
Definition levin_base.h:44
boost::shared_ptr< call_befor_die_base > auto_scope_leave_caller
Definition misc_language.h:80
uint64_t get_tick_count()
Definition time_helper.h:82
bool sleep_no_w(long ms)
Definition misc_language.cpp:35
auto_scope_leave_caller create_scope_leave_handler(t_scope_leave_handler f)
Definition misc_language.h:97
TODO: (mj-xmr) This will be reduced in an another PR.
Definition byte_slice.h:40
#define LOG_ERROR_CC(ct, message)
Definition net_utils_base.h:469
const char * buf
Definition slow_memmem.cpp:73
unsigned int uint32_t
Definition stdint.h:126
signed int int32_t
Definition stdint.h:123
unsigned char uint8_t
Definition stdint.h:124
unsigned __int64 uint64_t
Definition stdint.h:136
bool m_timer_started
Definition levin_protocol_handler_async.h:228
async_protocol_handler & m_con
Definition levin_protocol_handler_async.h:226
callback_t m_cb
Definition levin_protocol_handler_async.h:225
uint64_t m_timeout
Definition levin_protocol_handler_async.h:231
virtual bool handle(int res, const epee::span< const uint8_t > buff, typename async_protocol_handler::connection_context &context)
Definition levin_protocol_handler_async.h:233
virtual void cancel()
Definition levin_protocol_handler_async.h:245
boost::asio::deadline_timer m_timer
Definition levin_protocol_handler_async.h:227
virtual ~anvoke_handler()
Definition levin_protocol_handler_async.h:223
anvoke_handler(const callback_t &cb, uint64_t timeout, async_protocol_handler &con, int command)
Definition levin_protocol_handler_async.h:202
virtual bool is_timer_started() const
Definition levin_protocol_handler_async.h:241
int m_command
Definition levin_protocol_handler_async.h:232
virtual bool cancel_timer()
Definition levin_protocol_handler_async.h:254
bool m_timer_cancelled
Definition levin_protocol_handler_async.h:230
bool m_cancel_timer_called
Definition levin_protocol_handler_async.h:229
virtual void reset_timer()
Definition levin_protocol_handler_async.h:264
Definition levin_protocol_handler_async.h:192
virtual bool handle(int res, const epee::span< const uint8_t > buff, connection_context &context)=0
Definition levin_base.h:63
uint32_t m_command
Definition levin_base.h:67
uint32_t m_protocol_version
Definition levin_base.h:70
uint32_t m_flags
Definition levin_base.h:69
uint64_t m_cb
Definition levin_base.h:65
int32_t m_return_code
Definition levin_base.h:68
uint64_t m_signature
Definition levin_base.h:64
uint8_t m_have_to_return_data
Definition levin_base.h:66
Definition levin_base.h:91
Definition net_utils_base.h:442
#define CRITICAL_REGION_LOCAL(x)
Definition syncobj.h:153
#define CRITICAL_REGION_END()
Definition syncobj.h:158
#define CRITICAL_REGION_BEGIN(x)
Definition syncobj.h:154