Monero
levin_protocol_handler_async.h
Go to the documentation of this file.
1 // Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are met:
6 // * Redistributions of source code must retain the above copyright
7 // notice, this list of conditions and the following disclaimer.
8 // * Redistributions in binary form must reproduce the above copyright
9 // notice, this list of conditions and the following disclaimer in the
10 // documentation and/or other materials provided with the distribution.
11 // * Neither the name of the Andrey N. Sabelnikov nor the
12 // names of its contributors may be used to endorse or promote products
13 // derived from this software without specific prior written permission.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
16 // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17 // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18 // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER BE LIABLE FOR ANY
19 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25 //
26 
27 #pragma once
28 #include <boost/asio/deadline_timer.hpp>
29 #include <boost/uuid/uuid_generators.hpp>
30 #include <boost/unordered_map.hpp>
31 #include <boost/smart_ptr/make_shared.hpp>
32 
33 #include <atomic>
34 #include <deque>
35 
36 #include "levin_base.h"
37 #include "buffer.h"
38 #include "misc_language.h"
39 #include "syncobj.h"
40 #include "time_helper.h"
41 #include "int-util.h"
42 
43 #include <random>
44 #include <chrono>
45 
46 #undef MONERO_DEFAULT_LOG_CATEGORY
47 #define MONERO_DEFAULT_LOG_CATEGORY "net"
48 
49 #ifndef MIN_BYTES_WANTED
50 #define MIN_BYTES_WANTED 512
51 #endif
52 
53 template<typename context_t>
54 void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, const char* category)
55 {
56  MCINFO("net.p2p.traffic", context << bytes << " bytes " << (sent ? "sent" : "received") << (error ? "/corrupt" : "")
57  << " for category " << category << " initiated by " << (initiator ? "us" : "peer"));
58 }
59 
60 template<typename context_t>
61 void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, int command)
62 {
63  char buf[32];
64  snprintf(buf, sizeof(buf), "command-%u", command);
65  on_levin_traffic(context, initiator, sent, error, bytes, buf);
66 }
67 
68 namespace epee
69 {
70 namespace levin
71 {
72 
73 /************************************************************************/
74 /* */
75 /************************************************************************/
76 template<class t_connection_context>
78 
79 template<class t_connection_context>
81 {
82  typedef boost::unordered_map<boost::uuids::uuid, async_protocol_handler<t_connection_context>* > connections_map;
85 
88 
91 
92  friend class async_protocol_handler<t_connection_context>;
93 
96 
97  void delete_connections (size_t count, bool incoming);
98 
99 public:
100  typedef t_connection_context connection_context;
104 
105  int invoke(int command, message_writer in_msg, std::string& buff_out, boost::uuids::uuid connection_id);
106  template<class callback_t>
107  int invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED);
108 
109  int send(epee::byte_slice message, const boost::uuids::uuid& connection_id);
110  bool close(boost::uuids::uuid connection_id);
111  bool update_connection_context(const t_connection_context& contxt);
112  bool request_callback(boost::uuids::uuid connection_id);
113  template<class callback_t>
114  bool foreach_connection(const callback_t &cb);
115  template<class callback_t>
116  bool for_connection(const boost::uuids::uuid &connection_id, const callback_t &cb);
117  size_t get_connections_count();
118  size_t get_out_connections_count();
119  size_t get_in_connections_count();
121 
123  {}
125  void del_out_connections(size_t count);
126  void del_in_connections(size_t count);
127 };
128 
129 
130 /************************************************************************/
131 /* */
132 /************************************************************************/
133 template<class t_connection_context = net_utils::connection_context_base>
134 class async_protocol_handler
135 {
137 
139  {
140  if (message.size() < sizeof(message_writer::header))
141  return false;
142 
144  std::memcpy(std::addressof(head), message.data(), sizeof(head));
146  return false;
147 
148  on_levin_traffic(m_connection_context, true, true, false, head.m_cb, head.m_command);
149  MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb
150  << ", flags" << head.m_flags
151  << ", r?=" << head.m_have_to_return_data
152  <<", cmd = " << head.m_command
153  << ", ver=" << head.m_protocol_version);
154  return true;
155  }
156 
157 public:
158  typedef t_connection_context connection_context;
160 
162  {
165  };
166 
167  std::atomic<bool> m_protocol_released;
168  std::atomic<bool> m_invoke_buf_ready;
169 
170  volatile int m_invoke_result_code;
171 
174 
176 
177  std::atomic<uint32_t> m_wait_count;
178  std::atomic<uint32_t> m_close_called;
182  t_connection_context& m_connection_context;
183  std::atomic<uint64_t> m_max_packet_size;
184 
187 
190 
192  {
193  virtual bool handle(int res, const epee::span<const uint8_t> buff, connection_context& context)=0;
194  virtual bool is_timer_started() const=0;
195  virtual void cancel()=0;
196  virtual bool cancel_timer()=0;
197  virtual void reset_timer()=0;
198  };
199  template <class callback_t>
201  {
202  anvoke_handler(const callback_t& cb, uint64_t timeout, async_protocol_handler& con, int command)
203  :m_cb(cb), m_timeout(timeout), m_con(con), m_timer(con.m_pservice_endpoint->get_io_context()), m_timer_started(false),
205  {
206  if(m_con.start_outer_call())
207  {
208  MDEBUG(con.get_context_ref() << "anvoke_handler, timeout: " << timeout);
209  m_timer.expires_from_now(boost::posix_time::milliseconds(timeout));
210  m_timer.async_wait([&con, command, cb, timeout](const boost::system::error_code& ec)
211  {
212  if(ec == boost::asio::error::operation_aborted)
213  return;
214  MINFO(con.get_context_ref() << "Timeout on invoke operation happened, command: " << command << " timeout: " << timeout);
216  cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, con.get_context_ref());
217  con.close();
218  con.finish_outer_call();
219  });
220  m_timer_started = true;
221  }
222  }
223  virtual ~anvoke_handler()
224  {}
225  callback_t m_cb;
227  boost::asio::deadline_timer m_timer;
234  {
235  if(!cancel_timer())
236  return false;
237  m_cb(res, buff, context);
239  return true;
240  }
241  virtual bool is_timer_started() const
242  {
243  return m_timer_started;
244  }
245  virtual void cancel()
246  {
247  if(cancel_timer())
248  {
252  }
253  }
254  virtual bool cancel_timer()
255  {
257  {
258  m_cancel_timer_called = true;
259  boost::system::error_code ignored_ec;
260  m_timer_cancelled = 1 == m_timer.cancel(ignored_ec);
261  }
262  return m_timer_cancelled;
263  }
264  virtual void reset_timer()
265  {
266  boost::system::error_code ignored_ec;
267  if (!m_cancel_timer_called && m_timer.cancel(ignored_ec) > 0)
268  {
269  callback_t& cb = m_cb;
270  uint64_t timeout = m_timeout;
272  int command = m_command;
273  m_timer.expires_from_now(boost::posix_time::milliseconds(m_timeout));
274  m_timer.async_wait([&con, cb, command, timeout](const boost::system::error_code& ec)
275  {
276  if(ec == boost::asio::error::operation_aborted)
277  return;
278  MINFO(con.get_context_ref() << "Timeout on invoke operation happened, command: " << command << " timeout: " << timeout);
280  cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, con.get_context_ref());
281  con.close();
282  con.finish_outer_call();
283  });
284  }
285  }
286  };
288  std::list<boost::shared_ptr<invoke_response_handler_base> > m_invoke_response_handlers;
289 
290  template<class callback_t>
291  bool add_invoke_response_handler(const callback_t &cb, uint64_t timeout, async_protocol_handler& con, int command)
292  {
295  {
296  MERROR("Adding response handler to a released object");
297  return false;
298  }
299  boost::shared_ptr<invoke_response_handler_base> handler(boost::make_shared<anvoke_handler<callback_t>>(cb, timeout, con, command));
300  m_invoke_response_handlers.push_back(handler);
301  return handler->is_timer_started();
302  }
303  template<class callback_t> friend struct anvoke_handler;
304 public:
307  t_connection_context& conn_context):
309  m_pservice_endpoint(psnd_hndlr),
310  m_config(config),
311  m_connection_context(conn_context),
312  m_max_packet_size(config.m_initial_max_packet_size),
313  m_cache_in_buffer(4 * 1024),
315  {
316  m_close_called = 0;
317  m_protocol_released = false;
318  m_wait_count = 0;
320  m_connection_initialized = false;
321  m_invoke_buf_ready = false;
323  }
325  {
326  try
327  {
328 
330  {
331  m_config.del_connection(this);
332  }
333 
334  for (size_t i = 0; i < 60 * 1000 / 100 && 0 != m_wait_count; ++i)
335  {
337  }
338  CHECK_AND_ASSERT_MES_NO_RET(0 == m_wait_count, "Failed to wait for operation completion. m_wait_count = " << m_wait_count.load());
339 
340  MTRACE(m_connection_context << "~async_protocol_handler()");
341 
342  }
343  catch (...) { /* ignore */ }
344  }
345 
347  {
348  MTRACE(m_connection_context << "[levin_protocol] -->> start_outer_call");
350  {
351  MERROR(m_connection_context << "[levin_protocol] -->> start_outer_call failed");
352  return false;
353  }
354  ++m_wait_count;
355  return true;
356  }
358  {
359  MTRACE(m_connection_context << "[levin_protocol] <<-- finish_outer_call");
360  --m_wait_count;
362  return true;
363  }
364 
366  {
367  decltype(m_invoke_response_handlers) local_invoke_response_handlers;
369  local_invoke_response_handlers.swap(m_invoke_response_handlers);
370  m_protocol_released = true;
372 
373  // Never call callback inside critical section, that can cause deadlock. Callback can be called when
374  // invoke_response_handler_base is cancelled
375  std::for_each(local_invoke_response_handlers.begin(), local_invoke_response_handlers.end(), [](const boost::shared_ptr<invoke_response_handler_base>& pinv_resp_hndlr) {
376  pinv_resp_hndlr->cancel();
377  });
378 
379  return true;
380  }
381 
382  bool close()
383  {
384  ++m_close_called;
385 
387  return true;
388  }
389 
391  {
392  m_connection_context = contxt;
393  }
394 
396  {
398  boost::bind(&async_protocol_handler::finish_outer_call, this));
399 
401  }
402 
404  {
406  }
407 
408  virtual bool handle_recv(const void* ptr, size_t cb)
409  {
410  if(m_close_called)
411  return false; //closing connections
412 
414  {
415  MERROR(m_connection_context << "Commands handler not set!");
416  return false;
417  }
418 
419  // these should never fail, but do runtime check for safety
420  const uint64_t max_packet_size = m_max_packet_size;
421  CHECK_AND_ASSERT_MES(max_packet_size >= m_cache_in_buffer.size(), false, "Bad m_cache_in_buffer.size()");
422  CHECK_AND_ASSERT_MES(max_packet_size - m_cache_in_buffer.size() >= m_fragment_buffer.size(), false, "Bad m_cache_in_buffer.size() + m_fragment_buffer.size()");
423 
424  // flipped to subtraction; prevent overflow since m_max_packet_size is variable and public
425  if(cb > max_packet_size - m_cache_in_buffer.size() - m_fragment_buffer.size())
426  {
427  MWARNING(m_connection_context << "Maximum packet size exceed!, m_max_packet_size = " << max_packet_size
428  << ", packet received " << m_cache_in_buffer.size() + cb
429  << ", connection will be closed.");
430  return false;
431  }
432 
433  m_cache_in_buffer.append((const char*)ptr, cb);
434 
435  bool is_continue = true;
436  while(is_continue)
437  {
438  switch(m_state)
439  {
440  case stream_state_body:
442  {
443  is_continue = false;
444  if(cb >= MIN_BYTES_WANTED)
445  {
447  if (!m_invoke_response_handlers.empty())
448  {
449  //async call scenario
450  boost::shared_ptr<invoke_response_handler_base> response_handler = m_invoke_response_handlers.front();
451  response_handler->reset_timer();
452  MDEBUG(m_connection_context << "LEVIN_PACKET partial msg received. len=" << cb << ", current total " << m_cache_in_buffer.size() << "/" << m_current_head.m_cb << " (" << (100.0f * m_cache_in_buffer.size() / (m_current_head.m_cb ? m_current_head.m_cb : 1)) << "%)");
453  }
454  }
455  break;
456  }
457 
458  {
459  std::string temp{};
460  epee::span<const uint8_t> buff_to_invoke = m_cache_in_buffer.carve((std::string::size_type)m_current_head.m_cb);
462 
463  // abstract_tcp_server2.h manages max bandwidth for a p2p link
465  {
466  // special noise/fragment command
467  static constexpr const uint32_t both_flags = (LEVIN_PACKET_BEGIN | LEVIN_PACKET_END);
468  if ((m_current_head.m_flags & both_flags) == both_flags)
469  break; // noise message, skip to next message
470 
472  m_fragment_buffer.clear();
473 
474  m_fragment_buffer.append(reinterpret_cast<const char*>(buff_to_invoke.data()), buff_to_invoke.size());
476  break; // skip to next message
477 
478  if (m_fragment_buffer.size() < sizeof(bucket_head2))
479  {
480  MERROR(m_connection_context << "Fragmented data too small for levin header");
481  return false;
482  }
483 
485  m_fragment_buffer.clear();
486  std::memcpy(std::addressof(m_current_head), std::addressof(temp[0]), sizeof(bucket_head2));
487  const size_t max_bytes = m_connection_context.get_max_bytes(m_current_head.m_command);
488  if(m_current_head.m_cb > std::min<size_t>(max_packet_size, max_bytes))
489  {
490  MERROR(m_connection_context << "Maximum packet size exceed!, m_max_packet_size = " << std::min<size_t>(max_packet_size, max_bytes)
491  << ", packet header received " << m_current_head.m_cb << ", command " << m_current_head.m_command
492  << ", connection will be closed.");
493  return false;
494  }
495  buff_to_invoke = {reinterpret_cast<const uint8_t*>(temp.data()) + sizeof(bucket_head2), temp.size() - sizeof(bucket_head2)};
496  }
497 
499 
500  MDEBUG(m_connection_context << "LEVIN_PACKET_RECEIVED. [len=" << m_current_head.m_cb
501  << ", flags" << m_current_head.m_flags
503  <<", cmd = " << m_current_head.m_command
504  << ", v=" << m_current_head.m_protocol_version);
505 
506  if(is_response)
507  {//response to some invoke
508 
510  if(!m_invoke_response_handlers.empty())
511  {//async call scenario
512  boost::shared_ptr<invoke_response_handler_base> response_handler = m_invoke_response_handlers.front();
513  bool timer_cancelled = response_handler->cancel_timer();
514  // Don't pop handler, to avoid destroying it
515  if(timer_cancelled)
516  m_invoke_response_handlers.pop_front();
517  invoke_response_handlers_guard.unlock();
518 
519  if(timer_cancelled)
520  response_handler->handle(m_current_head.m_return_code, buff_to_invoke, m_connection_context);
521  }
522  else
523  {
524  invoke_response_handlers_guard.unlock();
525  //use sync call scenario
527  {
528  MERROR(m_connection_context << "no active invoke when response came, wtf?");
529  return false;
530  }else
531  {
533  m_local_inv_buff = std::string((const char*)buff_to_invoke.data(), buff_to_invoke.size());
534  buff_to_invoke = epee::span<const uint8_t>((const uint8_t*)NULL, 0);
537  m_invoke_buf_ready = true;
538  }
539  }
540  }else
541  {
543  {
544  levin::message_writer return_message{32 * 1024};
545  const uint32_t return_code = m_config.m_pcommands_handler->invoke(
546  m_current_head.m_command, buff_to_invoke, return_message.buffer, m_connection_context
547  );
548 
549  // peer_id remains unset if dropped
550  if (m_current_head.m_command == m_connection_context.handshake_command() && m_connection_context.handshake_complete())
552 
553  if(!send_message(return_message.finalize_response(m_current_head.m_command, return_code)))
554  return false;
555  }
556  else
558  }
559  // reuse small buffer
560  if (!temp.empty() && temp.capacity() <= 64 * 1024)
561  {
562  temp.clear();
564  }
565  }
566  break;
567  case stream_state_head:
568  {
569  if(m_cache_in_buffer.size() < sizeof(bucket_head2))
570  {
572  {
573  MWARNING(m_connection_context << "Signature mismatch, connection will be closed");
574  return false;
575  }
576  is_continue = false;
577  break;
578  }
579 
580 #if BYTE_ORDER == LITTLE_ENDIAN
581  bucket_head2& phead = *(bucket_head2*)m_cache_in_buffer.span(sizeof(bucket_head2)).data();
582 #else
583  bucket_head2 phead = *(bucket_head2*)m_cache_in_buffer.span(sizeof(bucket_head2)).data();
584  phead.m_signature = SWAP64LE(phead.m_signature);
585  phead.m_cb = SWAP64LE(phead.m_cb);
586  phead.m_command = SWAP32LE(phead.m_command);
587  phead.m_return_code = SWAP32LE(phead.m_return_code);
588  phead.m_flags = SWAP32LE(phead.m_flags);
590 #endif
591  if(LEVIN_SIGNATURE != phead.m_signature)
592  {
593  LOG_ERROR_CC(m_connection_context, "Signature mismatch, connection will be closed");
594  return false;
595  }
596  m_current_head = phead;
597 
601  const size_t max_bytes = m_connection_context.get_max_bytes(m_current_head.m_command);
602  if(m_current_head.m_cb > std::min<size_t>(max_packet_size, max_bytes))
603  {
604  LOG_ERROR_CC(m_connection_context, "Maximum packet size exceed!, m_max_packet_size = " << std::min<size_t>(max_packet_size, max_bytes)
605  << ", packet header received " << m_current_head.m_cb << ", command " << m_current_head.m_command
606  << ", connection will be closed.");
607  return false;
608  }
609  }
610  break;
611  default:
612  LOG_ERROR_CC(m_connection_context, "Undefined state in levin_server_impl::connection_handler, m_state=" << m_state);
613  return false;
614  }
615  }
616 
617  return true;
618  }
619 
621  {
623  {
625  m_config.add_connection(this);
626  }
627  return true;
628  }
629 
630  template<class callback_t>
631  bool async_invoke(int command, message_writer in_msg, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
632  {
634  boost::bind(&async_protocol_handler::finish_outer_call, this));
635 
637  timeout = m_config.m_invoke_timeout;
638 
639  int err_code = LEVIN_OK;
640  do
641  {
643 
644  m_invoke_buf_ready = false;
646 
647  if (command == m_connection_context.handshake_command())
649 
650  if(!send_message(in_msg.finalize_invoke(command)))
651  {
652  LOG_ERROR_CC(m_connection_context, "Failed to do_send");
653  err_code = LEVIN_ERROR_CONNECTION;
654  break;
655  }
656 
657  if(!add_invoke_response_handler(cb, timeout, *this, command))
658  {
660  break;
661  }
663  } while (false);
664 
665  if (LEVIN_OK != err_code)
666  {
667  epee::span<const uint8_t> stub_buff = nullptr;
668  // Never call callback inside critical section, that can cause deadlock
669  cb(err_code, stub_buff, m_connection_context);
670  return false;
671  }
672 
673  return true;
674  }
675 
676  int invoke(int command, message_writer in_msg, std::string& buff_out)
677  {
679  boost::bind(&async_protocol_handler::finish_outer_call, this));
680 
682 
683  m_invoke_buf_ready = false;
684 
685  if (command == m_connection_context.handshake_command())
687 
688  if (!send_message(in_msg.finalize_invoke(command)))
689  {
690  LOG_ERROR_CC(m_connection_context, "Failed to send request");
691  return LEVIN_ERROR_CONNECTION;
692  }
693 
694  uint64_t ticks_start = misc_utils::get_tick_count();
695  size_t prev_size = 0;
696 
698  {
699  if(m_cache_in_buffer.size() - prev_size >= MIN_BYTES_WANTED)
700  {
701  prev_size = m_cache_in_buffer.size();
702  ticks_start = misc_utils::get_tick_count();
703  }
705  {
706  MWARNING(m_connection_context << "invoke timeout (" << m_config.m_invoke_timeout << "), closing connection ");
707  close();
709  }
712  }
713 
716 
718  buff_out.swap(m_local_inv_buff);
719  m_local_inv_buff.clear();
721 
722  return m_invoke_result_code;
723  }
724 
732  {
735  );
736 
738  {
739  LOG_ERROR_CC(m_connection_context, "Failed to send message, dropping it");
740  return -1;
741  }
742  return 1;
743  }
744  //------------------------------------------------------------------------------------------
746  //------------------------------------------------------------------------------------------
747  t_connection_context& get_context_ref() {return m_connection_context;}
748 };
749 //------------------------------------------------------------------------------------------
750 template<class t_connection_context>
752 {
753  CRITICAL_REGION_BEGIN(m_connects_lock);
754  m_connects.erase(pconn->get_connection_id());
756  m_pcommands_handler->on_connection_close(pconn->m_connection_context);
757 }
758 //------------------------------------------------------------------------------------------
759 template<class t_connection_context>
761 {
762  std::vector<typename connections_map::mapped_type> connections;
763 
764  auto scope_exit_handler = misc_utils::create_scope_leave_handler([&connections]{
765  for (auto &aph: connections)
766  aph->finish_outer_call();
767  });
768 
769  CRITICAL_REGION_BEGIN(m_connects_lock);
770  for (auto& c: m_connects)
771  {
772  if (c.second->m_connection_context.m_is_income == incoming)
773  if (c.second->start_outer_call())
774  connections.push_back(c.second);
775  }
776 
777  // close random connections from the provided set
778  // TODO or better just keep removing random elements (performance)
779  unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
780  shuffle(connections.begin(), connections.end(), std::default_random_engine(seed));
781  for (size_t i = 0; i < connections.size() && i < count; ++i)
782  m_connects.erase(connections[i]->get_connection_id());
783 
785 
786  for (size_t i = 0; i < connections.size() && i < count; ++i)
787  connections[i]->close();
788 }
789 //------------------------------------------------------------------------------------------
790 template<class t_connection_context>
792 {
793  delete_connections(count, false);
794 }
795 //------------------------------------------------------------------------------------------
796 template<class t_connection_context>
798 {
799  delete_connections(count, true);
800 }
801 //------------------------------------------------------------------------------------------
802 template<class t_connection_context>
804 {
805  CRITICAL_REGION_BEGIN(m_connects_lock);
806  m_connects[pconn->get_connection_id()] = pconn;
808  m_pcommands_handler->on_connection_new(pconn->m_connection_context);
809 }
810 //------------------------------------------------------------------------------------------
811 template<class t_connection_context>
813 {
814  auto it = m_connects.find(connection_id);
815  return it == m_connects.end() ? 0 : it->second;
816 }
817 //------------------------------------------------------------------------------------------
818 template<class t_connection_context>
820 {
821  CRITICAL_REGION_LOCAL(m_connects_lock);
822  aph = find_connection(connection_id);
823  if(0 == aph)
825  if(!aph->start_outer_call())
827  return LEVIN_OK;
828 }
829 //------------------------------------------------------------------------------------------
830 template<class t_connection_context>
832 {
834  int r = find_and_lock_connection(connection_id, aph);
835  return LEVIN_OK == r ? aph->invoke(command, std::move(in_msg), buff_out) : r;
836 }
837 //------------------------------------------------------------------------------------------
838 template<class t_connection_context> template<class callback_t>
839 int async_protocol_handler_config<t_connection_context>::invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout)
840 {
842  int r = find_and_lock_connection(connection_id, aph);
843  return LEVIN_OK == r ? aph->async_invoke(command, std::move(in_msg), cb, timeout) : r;
844 }
845 //------------------------------------------------------------------------------------------
846 template<class t_connection_context> template<class callback_t>
848 {
849  std::vector<typename connections_map::mapped_type> conn;
850 
851  auto scope_exit_handler = misc_utils::create_scope_leave_handler([&conn]{
852  for (auto &aph: conn)
853  aph->finish_outer_call();
854  });
855 
856  CRITICAL_REGION_BEGIN(m_connects_lock);
857  conn.reserve(m_connects.size());
858  for (auto &e: m_connects)
859  if (e.second->start_outer_call())
860  conn.push_back(e.second);
862 
863  for (auto &aph: conn)
864  if (!cb(aph->get_context_ref()))
865  return false;
866 
867  return true;
868 }
869 //------------------------------------------------------------------------------------------
870 template<class t_connection_context> template<class callback_t>
872 {
874  if (find_and_lock_connection(connection_id, aph) != LEVIN_OK)
875  return false;
876  auto scope_exit_handler = misc_utils::create_scope_leave_handler(
878  if(!cb(aph->get_context_ref()))
879  return false;
880  return true;
881 }
882 //------------------------------------------------------------------------------------------
883 template<class t_connection_context>
885 {
886  CRITICAL_REGION_LOCAL(m_connects_lock);
887  return m_connects.size();
888 }
889 //------------------------------------------------------------------------------------------
890 template<class t_connection_context>
892 {
893  CRITICAL_REGION_LOCAL(m_connects_lock);
894  size_t count = 0;
895  for (const auto &c: m_connects)
896  if (!c.second->m_connection_context.m_is_income)
897  ++count;
898  return count;
899 }
900 //------------------------------------------------------------------------------------------
901 template<class t_connection_context>
903 {
904  CRITICAL_REGION_LOCAL(m_connects_lock);
905  size_t count = 0;
906  for (const auto &c: m_connects)
907  if (c.second->m_connection_context.m_is_income)
908  ++count;
909  return count;
910 }
911 //------------------------------------------------------------------------------------------
912 template<class t_connection_context>
914 {
915  if (m_pcommands_handler && m_pcommands_handler_destroy)
916  (*m_pcommands_handler_destroy)(m_pcommands_handler);
917  m_pcommands_handler = handler;
918  m_pcommands_handler_destroy = destroy;
919 }
920 //------------------------------------------------------------------------------------------
921 template<class t_connection_context>
923 {
925  int r = find_and_lock_connection(connection_id, aph);
926  return LEVIN_OK == r ? aph->send(std::move(message)) : 0;
927 }
928 //------------------------------------------------------------------------------------------
929 template<class t_connection_context>
931 {
933  if (find_and_lock_connection(connection_id, aph) != LEVIN_OK)
934  return false;
935  auto scope_exit_handler = misc_utils::create_scope_leave_handler(
937  if (!aph->close())
938  return false;
939  CRITICAL_REGION_LOCAL(m_connects_lock);
940  m_connects.erase(connection_id);
941  return true;
942 }
943 //------------------------------------------------------------------------------------------
944 template<class t_connection_context>
946 {
947  CRITICAL_REGION_LOCAL(m_connects_lock);
948  async_protocol_handler<t_connection_context>* aph = find_connection(contxt.m_connection_id);
949  if(0 == aph)
950  return false;
951  aph->update_connection_context(contxt);
952  return true;
953 }
954 //------------------------------------------------------------------------------------------
955 template<class t_connection_context>
957 {
959  int r = find_and_lock_connection(connection_id, aph);
960  if(LEVIN_OK == r)
961  {
962  aph->request_callback();
963  return true;
964  }
965  else
966  {
967  return false;
968  }
969 }
970 }
971 }
const char * res
Definition: hmac_keccak.cpp:42
#define LEVIN_ERROR_CONNECTION_TIMEDOUT
Definition: levin_base.h:106
Definition: levin_protocol_handler_async.h:163
std::atomic< uint32_t > m_close_called
Definition: levin_protocol_handler_async.h:178
void append(const void *data, size_t sz)
Definition: buffer.cpp:42
#define LEVIN_SIGNATURE
Definition: levin_base.h:38
net_utils::i_service_endpoint * m_pservice_endpoint
Definition: levin_protocol_handler_async.h:180
uint32_t m_protocol_version
Definition: levin_base.h:70
boost::unordered_map< boost::uuids::uuid, async_protocol_handler< t_connection_context > *> connections_map
Definition: levin_protocol_handler_async.h:82
async_protocol_handler_config< t_connection_context > config_type
Definition: levin_protocol_handler_async.h:159
config_type & m_config
Definition: levin_protocol_handler_async.h:181
uint64_t get_tick_count()
Definition: time_helper.h:82
int * count
Definition: gmock_stress_test.cc:176
critical_section m_invoke_response_handlers_lock
Definition: levin_protocol_handler_async.h:287
virtual bool handle(int res, const epee::span< const uint8_t > buff, connection_context &context)=0
int i
Definition: pymoduletest.py:23
bool m_connection_initialized
Definition: levin_protocol_handler_async.h:189
levin_commands_handler< t_connection_context > * m_pcommands_handler
Definition: levin_protocol_handler_async.h:94
uint64_t m_timeout
Definition: levin_protocol_handler_async.h:231
stream_state
Definition: levin_protocol_handler_async.h:161
::std::string string
Definition: gtest-port.h:1097
t_connection_context connection_context
Definition: levin_protocol_handler_async.h:100
bool send_message(byte_slice message)
Definition: levin_protocol_handler_async.h:138
anvoke_handler(const callback_t &cb, uint64_t timeout, async_protocol_handler &con, int command)
Definition: levin_protocol_handler_async.h:202
#define LEVIN_PACKET_REQUEST
Definition: levin_base.h:79
auto_scope_leave_caller create_scope_leave_handler(t_scope_leave_handler f)
Definition: misc_language.h:97
uint64_t m_cb
Definition: levin_base.h:65
std::atomic< uint32_t > m_wait_count
Definition: levin_protocol_handler_async.h:177
bool m_timer_started
Definition: levin_protocol_handler_async.h:228
critical_section m_connects_lock
Definition: levin_protocol_handler_async.h:83
#define LEVIN_PACKET_RESPONSE
Definition: levin_base.h:80
virtual void cancel()
Definition: levin_protocol_handler_async.h:245
#define MIN_BYTES_WANTED
Definition: levin_protocol_handler_async.h:50
int m_command
Definition: levin_protocol_handler_async.h:232
uint32_t m_flags
Definition: levin_base.h:69
uint64_t m_max_packet_size
Definition: levin_protocol_handler_async.h:102
std::string m_local_inv_buff
Definition: levin_protocol_handler_async.h:173
critical_section m_local_inv_buff_lock
Definition: levin_protocol_handler_async.h:172
void erase(size_t sz)
Definition: buffer.h:52
Definition: cryptonote_config.h:220
Non-owning sequence of data. Does not deep copy.
Definition: span.h:54
uint64_t m_signature
Definition: levin_base.h:64
int send(epee::byte_slice message, const boost::uuids::uuid &connection_id)
Definition: levin_protocol_handler_async.h:922
bool add_invoke_response_handler(const callback_t &cb, uint64_t timeout, async_protocol_handler &con, int command)
Definition: levin_protocol_handler_async.h:291
std::atomic< bool > m_protocol_released
Definition: levin_protocol_handler_async.h:167
unsigned char uint8_t
Definition: stdint.h:124
~async_protocol_handler_config()
Definition: levin_protocol_handler_async.h:124
Provides space for levin (p2p) header, so that payload can be sent without copy.
Definition: levin_base.h:131
size_t get_connections_count()
Definition: levin_protocol_handler_async.h:884
bool after_init_connection()
Definition: levin_protocol_handler_async.h:620
test_levin_commands_handler * m_pcommands_handler
Definition: levin.cpp:305
async_protocol_handler(net_utils::i_service_endpoint *psnd_hndlr, config_type &config, t_connection_context &conn_context)
Definition: levin_protocol_handler_async.h:305
net_utils::buffer m_cache_in_buffer
Definition: levin_protocol_handler_async.h:185
std::atomic< uint64_t > m_max_packet_size
Definition: levin_protocol_handler_async.h:183
int32_t m_return_code
Definition: levin_base.h:68
void request_callback()
Definition: levin_protocol_handler_async.h:395
#define CRITICAL_REGION_END()
Definition: syncobj.h:158
int invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout=LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
Definition: levin_protocol_handler_async.h:839
#define LEVIN_DEFAULT_MAX_PACKET_SIZE
Definition: levin_base.h:77
t_connection_context & m_connection_context
Definition: levin_protocol_handler_async.h:182
virtual bool is_timer_started() const
Definition: levin_protocol_handler_async.h:241
bool close()
Definition: levin_protocol_handler_async.h:382
constexpr std::size_t size() const noexcept
Definition: span.h:109
e
Definition: pymoduletest.py:79
int find_and_lock_connection(boost::uuids::uuid connection_id, async_protocol_handler< t_connection_context > *&aph)
Definition: levin_protocol_handler_async.h:819
Definition: levin_base.h:62
void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, const char *category)
Definition: levin_protocol_handler_async.h:54
size_t size() const
Definition: buffer.h:56
Definition: syncobj.h:81
bool start_outer_call()
Definition: levin_protocol_handler_async.h:346
static void close()
Definition: blockchain_blackball.cpp:279
std::list< boost::shared_ptr< invoke_response_handler_base > > m_invoke_response_handlers
Definition: levin_protocol_handler_async.h:288
virtual bool handle(int res, const epee::span< const uint8_t > buff, typename async_protocol_handler::connection_context &context)
Definition: levin_protocol_handler_async.h:233
size_t get_out_connections_count()
Definition: levin_protocol_handler_async.h:891
bool request_callback(boost::uuids::uuid connection_id)
Definition: levin_protocol_handler_async.h:956
int invoke(int command, message_writer in_msg, std::string &buff_out, boost::uuids::uuid connection_id)
Definition: levin_protocol_handler_async.h:831
epee::levin::async_protocol_handler_config< detail::p2p_context > connections
Definition: levin_notify.h:66
const char * uuid
Definition: minissdp.c:598
unsigned int uint32_t
Definition: stdint.h:126
#define CRITICAL_REGION_BEGIN(x)
Definition: syncobj.h:154
#define LEVIN_PACKET_BEGIN
Definition: levin_base.h:81
Definition: levin_base.h:90
bool async_invoke(int command, message_writer in_msg, const callback_t &cb, size_t timeout=LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
Definition: levin_protocol_handler_async.h:631
t_connection_context & get_context_ref()
Definition: levin_protocol_handler_async.h:747
boost::shared_ptr< call_befor_die_base > auto_scope_leave_caller
Definition: misc_language.h:80
uint64_t m_invoke_timeout
Definition: levin_protocol_handler_async.h:103
bool close(boost::uuids::uuid connection_id)
Definition: levin_protocol_handler_async.h:930
void delete_connections(size_t count, bool incoming)
Definition: levin_protocol_handler_async.h:760
bool foreach_connection(const callback_t &cb)
Definition: levin_protocol_handler_async.h:847
void del_in_connections(size_t count)
Definition: levin_protocol_handler_async.h:797
#define LEVIN_PACKET_END
Definition: levin_base.h:82
epee::span< const uint8_t > carve(size_t sz)
Definition: buffer.h:55
std::string m_fragment_buffer
Definition: levin_protocol_handler_async.h:136
unsigned __int64 uint64_t
Definition: stdint.h:136
Definition: byte_slice.h:68
#define CRITICAL_REGION_LOCAL(x)
Definition: syncobj.h:153
std::unique_ptr< void, terminate > context
Unique ZMQ context handle, calls zmq_term on destruction.
Definition: zmq.h:105
void del_out_connections(size_t count)
Definition: levin_protocol_handler_async.h:791
Definition: buffer.h:46
callback_t m_cb
Definition: levin_protocol_handler_async.h:225
#define false
Definition: stdbool.h:37
async_protocol_handler_config()
Definition: levin_protocol_handler_async.h:122
byte_slice finalize_invoke(uint32_t command)
Definition: levin_base.h:151
size_t get_in_connections_count()
Definition: levin_protocol_handler_async.h:902
bool sleep_no_w(long ms)
Definition: misc_language.cpp:35
connections_map m_connects
Definition: levin_protocol_handler_async.h:84
const char * buf
Definition: slow_memmem.cpp:73
virtual ~async_protocol_handler()
Definition: levin_protocol_handler_async.h:324
void unlock()
Definition: syncobj.h:142
std::atomic< bool > m_invoke_buf_ready
Definition: levin_protocol_handler_async.h:168
#define LOG_ERROR_CC(ct, message)
Definition: net_utils_base.h:469
uint32_t m_command
Definition: levin_base.h:67
bool release_protocol()
Definition: levin_protocol_handler_async.h:365
r
Definition: testupnpigd.py:61
async_protocol_handler< t_connection_context > * find_connection(boost::uuids::uuid connection_id) const
Definition: levin_protocol_handler_async.h:812
void del_connection(async_protocol_handler< t_connection_context > *pc)
Definition: levin_protocol_handler_async.h:751
boost::uuids::uuid get_connection_id()
Definition: levin_protocol_handler_async.h:745
TODO: (mj-xmr) This will be reduced in an another PR.
Definition: byte_slice.h:39
virtual bool handle_recv(const void *ptr, size_t cb)
Definition: levin_protocol_handler_async.h:408
Definition: net_utils_base.h:441
const T & move(const T &t)
Definition: gtest-port.h:1317
#define LEVIN_ERROR_CONNECTION
Definition: levin_base.h:103
bucket_head2 m_current_head
Definition: levin_protocol_handler_async.h:179
#define SWAP64LE
Definition: int-util.h:313
void * memcpy(void *a, const void *b, size_t c)
Definition: glibc_compat.cpp:16
void add_connection(async_protocol_handler< t_connection_context > *pc)
Definition: levin_protocol_handler_async.h:803
volatile int m_invoke_result_code
Definition: levin_protocol_handler_async.h:170
Definition: levin_protocol_handler_async.h:77
virtual bool call_run_once_service_io()=0
Definition: syncobj.h:124
Definition: levin_protocol_handler_async.h:80
epee::span< const uint8_t > span(size_t sz) const
Definition: buffer.h:53
void update_connection_context(const connection_context &contxt)
Definition: levin_protocol_handler_async.h:390
int invoke(int command, message_writer in_msg, std::string &buff_out)
Definition: levin_protocol_handler_async.h:676
#define LEVIN_PROTOCOL_VER_1
Definition: levin_base.h:86
bool update_connection_context(const t_connection_context &contxt)
Definition: levin_protocol_handler_async.h:945
Definition: levin_protocol_handler_async.h:200
const uint8_t seed[32]
Definition: code-generator.cpp:37
#define LEVIN_ERROR_CONNECTION_DESTROYED
Definition: levin_base.h:105
void set_handler(levin_commands_handler< t_connection_context > *handler, void(*destroy)(levin_commands_handler< t_connection_context > *)=NULL)
Definition: levin_protocol_handler_async.h:913
stream_state m_state
Definition: levin_protocol_handler_async.h:186
bucket_head2 header
Definition: levin_base.h:135
bool finish_outer_call()
Definition: levin_protocol_handler_async.h:357
void handle_qued_callback()
Definition: levin_protocol_handler_async.h:403
signed int int32_t
Definition: stdint.h:123
virtual bool do_send(byte_slice message)=0
Definition: levin_protocol_handler_async.h:164
virtual bool cancel_timer()
Definition: levin_protocol_handler_async.h:254
#define LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED
Definition: levin_base.h:75
bool m_timer_cancelled
Definition: levin_protocol_handler_async.h:230
int32_t m_oponent_protocol_ver
Definition: levin_protocol_handler_async.h:188
critical_section m_call_lock
Definition: levin_protocol_handler_async.h:175
error
Tracks LMDB error codes.
Definition: error.h:44
tuple message
Definition: gtest_output_test.py:331
async_protocol_handler & m_con
Definition: levin_protocol_handler_async.h:226
uint8_t m_have_to_return_data
Definition: levin_base.h:66
int send(byte_slice message)
Definition: levin_protocol_handler_async.h:731
std::unique_ptr< test_connection > conn(new test_connection(io_service, m_handler_config))
bool for_connection(const boost::uuids::uuid &connection_id, const callback_t &cb)
Definition: levin_protocol_handler_async.h:871
virtual ~anvoke_handler()
Definition: levin_protocol_handler_async.h:223
bool m_cancel_timer_called
Definition: levin_protocol_handler_async.h:229
#define SWAP32LE
Definition: int-util.h:305
#define LEVIN_OK
Definition: levin_base.h:102
c
Definition: pymoduletest.py:79
virtual void reset_timer()
Definition: levin_protocol_handler_async.h:264
uint64_t m_initial_max_packet_size
Definition: levin_protocol_handler_async.h:101
void(* m_pcommands_handler_destroy)(levin_commands_handler< t_connection_context > *)
Definition: levin_protocol_handler_async.h:95
boost::asio::deadline_timer m_timer
Definition: levin_protocol_handler_async.h:227
Definition: levin_protocol_handler_async.h:191
#define LEVIN_INITIAL_MAX_PACKET_SIZE
Definition: levin_base.h:76
t_connection_context connection_context
Definition: levin_protocol_handler_async.h:158
#define LEVIN_ERROR_CONNECTION_NOT_FOUND
Definition: levin_base.h:104
if(!cryptonote::get_account_address_from_str_or_url(info, cryptonote::TESTNET, "9uVsvEryzpN8WH2t1WWhFFCG5tS8cBNdmJYNRuckLENFimfauV5pZKeS1P2CbxGkSDTUPHXWwiYE5ZGSXDAGbaZgDxobqDN"))
Definition: signature.cpp:53
constexpr pointer data() const noexcept
Definition: span.h:108