28 #include <boost/asio/deadline_timer.hpp> 29 #include <boost/uuid/uuid_generators.hpp> 30 #include <boost/unordered_map.hpp> 31 #include <boost/smart_ptr/make_shared.hpp> 46 #undef MONERO_DEFAULT_LOG_CATEGORY 47 #define MONERO_DEFAULT_LOG_CATEGORY "net" 49 #ifndef MIN_BYTES_WANTED 50 #define MIN_BYTES_WANTED 512 53 template<
typename context_t>
56 MCINFO(
"net.p2p.traffic",
context << bytes <<
" bytes " << (sent ?
"sent" :
"received") << (
error ?
"/corrupt" :
"")
57 <<
" for category " << category <<
" initiated by " << (initiator ?
"us" :
"peer"));
60 template<
typename context_t>
64 snprintf(
buf,
sizeof(
buf),
"command-%u", command);
76 template<
class t_connection_context>
79 template<
class t_connection_context>
82 typedef boost::unordered_map<boost::uuids::uuid, async_protocol_handler<t_connection_context>* >
connections_map;
106 template<
class callback_t>
113 template<
class callback_t>
115 template<
class callback_t>
133 template<
class t_connection_context = net_utils::connection_context_base>
134 class async_protocol_handler
199 template <
class callback_t>
208 MDEBUG(con.get_context_ref() <<
"anvoke_handler, timeout: " << timeout);
209 m_timer.expires_from_now(boost::posix_time::milliseconds(timeout));
210 m_timer.async_wait([&con, command, cb, timeout](
const boost::system::error_code& ec)
212 if(ec == boost::asio::error::operation_aborted)
214 MINFO(con.get_context_ref() <<
"Timeout on invoke operation happened, command: " << command <<
" timeout: " << timeout);
218 con.finish_outer_call();
259 boost::system::error_code ignored_ec;
266 boost::system::error_code ignored_ec;
269 callback_t& cb =
m_cb;
274 m_timer.async_wait([&con, cb, command, timeout](
const boost::system::error_code& ec)
276 if(ec == boost::asio::error::operation_aborted)
278 MINFO(con.get_context_ref() <<
"Timeout on invoke operation happened, command: " << command <<
" timeout: " << timeout);
282 con.finish_outer_call();
290 template<
class callback_t>
296 MERROR(
"Adding response handler to a released object");
299 boost::shared_ptr<invoke_response_handler_base> handler(boost::make_shared<
anvoke_handler<callback_t>>(cb, timeout, con, command));
301 return handler->is_timer_started();
307 t_connection_context& conn_context):
338 CHECK_AND_ASSERT_MES_NO_RET(0 ==
m_wait_count,
"Failed to wait for operation completion. m_wait_count = " <<
m_wait_count.load());
375 std::for_each(local_invoke_response_handlers.begin(), local_invoke_response_handlers.end(), [](
const boost::shared_ptr<invoke_response_handler_base>& pinv_resp_hndlr) {
376 pinv_resp_hndlr->cancel();
421 CHECK_AND_ASSERT_MES(max_packet_size >=
m_cache_in_buffer.
size(),
false,
"Bad m_cache_in_buffer.size()");
427 MWARNING(
m_connection_context <<
"Maximum packet size exceed!, m_max_packet_size = " << max_packet_size
429 <<
", connection will be closed.");
435 bool is_continue =
true;
451 response_handler->reset_timer();
490 MERROR(
m_connection_context <<
"Maximum packet size exceed!, m_max_packet_size = " << std::min<size_t>(max_packet_size, max_bytes)
492 <<
", connection will be closed.");
513 bool timer_cancelled = response_handler->cancel_timer();
517 invoke_response_handlers_guard.
unlock();
524 invoke_response_handlers_guard.
unlock();
560 if (!temp.empty() && temp.capacity() <= 64 * 1024)
580 #if BYTE_ORDER == LITTLE_ENDIAN 606 <<
", connection will be closed.");
630 template<
class callback_t>
695 size_t prev_size = 0;
750 template<
class t_connection_context>
754 m_connects.erase(pconn->get_connection_id());
759 template<
class t_connection_context>
762 std::vector<typename connections_map::mapped_type>
connections;
766 aph->finish_outer_call();
770 for (
auto&
c: m_connects)
772 if (
c.second->m_connection_context.m_is_income == incoming)
773 if (
c.second->start_outer_call())
779 unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
790 template<
class t_connection_context>
793 delete_connections(
count,
false);
796 template<
class t_connection_context>
799 delete_connections(
count,
true);
802 template<
class t_connection_context>
806 m_connects[pconn->get_connection_id()] = pconn;
811 template<
class t_connection_context>
814 auto it = m_connects.find(connection_id);
815 return it == m_connects.end() ? 0 : it->second;
818 template<
class t_connection_context>
822 aph = find_connection(connection_id);
825 if(!aph->start_outer_call())
830 template<
class t_connection_context>
834 int r = find_and_lock_connection(connection_id, aph);
838 template<
class t_connection_context>
template<
class callback_t>
842 int r = find_and_lock_connection(connection_id, aph);
846 template<
class t_connection_context>
template<
class callback_t>
849 std::vector<typename connections_map::mapped_type>
conn;
852 for (
auto &aph:
conn)
853 aph->finish_outer_call();
857 conn.reserve(m_connects.size());
858 for (
auto &
e: m_connects)
859 if (
e.second->start_outer_call())
860 conn.push_back(
e.second);
863 for (
auto &aph:
conn)
864 if (!cb(aph->get_context_ref()))
870 template<
class t_connection_context>
template<
class callback_t>
874 if (find_and_lock_connection(connection_id, aph) !=
LEVIN_OK)
878 if(!cb(aph->get_context_ref()))
883 template<
class t_connection_context>
887 return m_connects.size();
890 template<
class t_connection_context>
895 for (
const auto &
c: m_connects)
896 if (!
c.second->m_connection_context.m_is_income)
901 template<
class t_connection_context>
906 for (
const auto &
c: m_connects)
907 if (
c.second->m_connection_context.m_is_income)
912 template<
class t_connection_context>
918 m_pcommands_handler_destroy = destroy;
921 template<
class t_connection_context>
925 int r = find_and_lock_connection(connection_id, aph);
929 template<
class t_connection_context>
933 if (find_and_lock_connection(connection_id, aph) !=
LEVIN_OK)
940 m_connects.erase(connection_id);
944 template<
class t_connection_context>
951 aph->update_connection_context(contxt);
955 template<
class t_connection_context>
959 int r = find_and_lock_connection(connection_id, aph);
962 aph->request_callback();
const char * res
Definition: hmac_keccak.cpp:42
#define LEVIN_ERROR_CONNECTION_TIMEDOUT
Definition: levin_base.h:106
Definition: levin_protocol_handler_async.h:163
std::atomic< uint32_t > m_close_called
Definition: levin_protocol_handler_async.h:178
void append(const void *data, size_t sz)
Definition: buffer.cpp:42
#define LEVIN_SIGNATURE
Definition: levin_base.h:38
net_utils::i_service_endpoint * m_pservice_endpoint
Definition: levin_protocol_handler_async.h:180
uint32_t m_protocol_version
Definition: levin_base.h:70
boost::unordered_map< boost::uuids::uuid, async_protocol_handler< t_connection_context > *> connections_map
Definition: levin_protocol_handler_async.h:82
async_protocol_handler_config< t_connection_context > config_type
Definition: levin_protocol_handler_async.h:159
config_type & m_config
Definition: levin_protocol_handler_async.h:181
uint64_t get_tick_count()
Definition: time_helper.h:82
int * count
Definition: gmock_stress_test.cc:176
critical_section m_invoke_response_handlers_lock
Definition: levin_protocol_handler_async.h:287
virtual bool handle(int res, const epee::span< const uint8_t > buff, connection_context &context)=0
int i
Definition: pymoduletest.py:23
bool m_connection_initialized
Definition: levin_protocol_handler_async.h:189
levin_commands_handler< t_connection_context > * m_pcommands_handler
Definition: levin_protocol_handler_async.h:94
uint64_t m_timeout
Definition: levin_protocol_handler_async.h:231
stream_state
Definition: levin_protocol_handler_async.h:161
::std::string string
Definition: gtest-port.h:1097
t_connection_context connection_context
Definition: levin_protocol_handler_async.h:100
bool send_message(byte_slice message)
Definition: levin_protocol_handler_async.h:138
anvoke_handler(const callback_t &cb, uint64_t timeout, async_protocol_handler &con, int command)
Definition: levin_protocol_handler_async.h:202
#define LEVIN_PACKET_REQUEST
Definition: levin_base.h:79
auto_scope_leave_caller create_scope_leave_handler(t_scope_leave_handler f)
Definition: misc_language.h:97
uint64_t m_cb
Definition: levin_base.h:65
std::atomic< uint32_t > m_wait_count
Definition: levin_protocol_handler_async.h:177
bool m_timer_started
Definition: levin_protocol_handler_async.h:228
critical_section m_connects_lock
Definition: levin_protocol_handler_async.h:83
#define LEVIN_PACKET_RESPONSE
Definition: levin_base.h:80
virtual void cancel()
Definition: levin_protocol_handler_async.h:245
#define MIN_BYTES_WANTED
Definition: levin_protocol_handler_async.h:50
int m_command
Definition: levin_protocol_handler_async.h:232
uint32_t m_flags
Definition: levin_base.h:69
uint64_t m_max_packet_size
Definition: levin_protocol_handler_async.h:102
std::string m_local_inv_buff
Definition: levin_protocol_handler_async.h:173
critical_section m_local_inv_buff_lock
Definition: levin_protocol_handler_async.h:172
void erase(size_t sz)
Definition: buffer.h:52
Definition: cryptonote_config.h:220
Non-owning sequence of data. Does not deep copy.
Definition: span.h:54
uint64_t m_signature
Definition: levin_base.h:64
int send(epee::byte_slice message, const boost::uuids::uuid &connection_id)
Definition: levin_protocol_handler_async.h:922
bool add_invoke_response_handler(const callback_t &cb, uint64_t timeout, async_protocol_handler &con, int command)
Definition: levin_protocol_handler_async.h:291
std::atomic< bool > m_protocol_released
Definition: levin_protocol_handler_async.h:167
unsigned char uint8_t
Definition: stdint.h:124
~async_protocol_handler_config()
Definition: levin_protocol_handler_async.h:124
Provides space for levin (p2p) header, so that payload can be sent without copy.
Definition: levin_base.h:131
size_t get_connections_count()
Definition: levin_protocol_handler_async.h:884
bool after_init_connection()
Definition: levin_protocol_handler_async.h:620
test_levin_commands_handler * m_pcommands_handler
Definition: levin.cpp:305
async_protocol_handler(net_utils::i_service_endpoint *psnd_hndlr, config_type &config, t_connection_context &conn_context)
Definition: levin_protocol_handler_async.h:305
net_utils::buffer m_cache_in_buffer
Definition: levin_protocol_handler_async.h:185
std::atomic< uint64_t > m_max_packet_size
Definition: levin_protocol_handler_async.h:183
int32_t m_return_code
Definition: levin_base.h:68
void request_callback()
Definition: levin_protocol_handler_async.h:395
#define CRITICAL_REGION_END()
Definition: syncobj.h:158
int invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout=LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
Definition: levin_protocol_handler_async.h:839
#define LEVIN_DEFAULT_MAX_PACKET_SIZE
Definition: levin_base.h:77
t_connection_context & m_connection_context
Definition: levin_protocol_handler_async.h:182
virtual bool is_timer_started() const
Definition: levin_protocol_handler_async.h:241
bool close()
Definition: levin_protocol_handler_async.h:382
constexpr std::size_t size() const noexcept
Definition: span.h:109
e
Definition: pymoduletest.py:79
int find_and_lock_connection(boost::uuids::uuid connection_id, async_protocol_handler< t_connection_context > *&aph)
Definition: levin_protocol_handler_async.h:819
Definition: levin_base.h:62
void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, const char *category)
Definition: levin_protocol_handler_async.h:54
size_t size() const
Definition: buffer.h:56
bool start_outer_call()
Definition: levin_protocol_handler_async.h:346
static void close()
Definition: blockchain_blackball.cpp:279
std::list< boost::shared_ptr< invoke_response_handler_base > > m_invoke_response_handlers
Definition: levin_protocol_handler_async.h:288
virtual bool handle(int res, const epee::span< const uint8_t > buff, typename async_protocol_handler::connection_context &context)
Definition: levin_protocol_handler_async.h:233
size_t get_out_connections_count()
Definition: levin_protocol_handler_async.h:891
bool request_callback(boost::uuids::uuid connection_id)
Definition: levin_protocol_handler_async.h:956
int invoke(int command, message_writer in_msg, std::string &buff_out, boost::uuids::uuid connection_id)
Definition: levin_protocol_handler_async.h:831
epee::levin::async_protocol_handler_config< detail::p2p_context > connections
Definition: levin_notify.h:66
const char * uuid
Definition: minissdp.c:598
unsigned int uint32_t
Definition: stdint.h:126
#define CRITICAL_REGION_BEGIN(x)
Definition: syncobj.h:154
#define LEVIN_PACKET_BEGIN
Definition: levin_base.h:81
virtual void reset_timer()=0
Definition: levin_base.h:90
bool async_invoke(int command, message_writer in_msg, const callback_t &cb, size_t timeout=LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
Definition: levin_protocol_handler_async.h:631
t_connection_context & get_context_ref()
Definition: levin_protocol_handler_async.h:747
boost::shared_ptr< call_befor_die_base > auto_scope_leave_caller
Definition: misc_language.h:80
uint64_t m_invoke_timeout
Definition: levin_protocol_handler_async.h:103
bool close(boost::uuids::uuid connection_id)
Definition: levin_protocol_handler_async.h:930
void delete_connections(size_t count, bool incoming)
Definition: levin_protocol_handler_async.h:760
bool foreach_connection(const callback_t &cb)
Definition: levin_protocol_handler_async.h:847
void del_in_connections(size_t count)
Definition: levin_protocol_handler_async.h:797
#define LEVIN_PACKET_END
Definition: levin_base.h:82
epee::span< const uint8_t > carve(size_t sz)
Definition: buffer.h:55
std::string m_fragment_buffer
Definition: levin_protocol_handler_async.h:136
unsigned __int64 uint64_t
Definition: stdint.h:136
Definition: byte_slice.h:68
#define CRITICAL_REGION_LOCAL(x)
Definition: syncobj.h:153
std::unique_ptr< void, terminate > context
Unique ZMQ context handle, calls zmq_term on destruction.
Definition: zmq.h:105
void del_out_connections(size_t count)
Definition: levin_protocol_handler_async.h:791
callback_t m_cb
Definition: levin_protocol_handler_async.h:225
#define false
Definition: stdbool.h:37
async_protocol_handler_config()
Definition: levin_protocol_handler_async.h:122
byte_slice finalize_invoke(uint32_t command)
Definition: levin_base.h:151
size_t get_in_connections_count()
Definition: levin_protocol_handler_async.h:902
bool sleep_no_w(long ms)
Definition: misc_language.cpp:35
connections_map m_connects
Definition: levin_protocol_handler_async.h:84
const char * buf
Definition: slow_memmem.cpp:73
virtual ~async_protocol_handler()
Definition: levin_protocol_handler_async.h:324
void unlock()
Definition: syncobj.h:142
std::atomic< bool > m_invoke_buf_ready
Definition: levin_protocol_handler_async.h:168
#define LOG_ERROR_CC(ct, message)
Definition: net_utils_base.h:469
uint32_t m_command
Definition: levin_base.h:67
bool release_protocol()
Definition: levin_protocol_handler_async.h:365
r
Definition: testupnpigd.py:61
async_protocol_handler< t_connection_context > * find_connection(boost::uuids::uuid connection_id) const
Definition: levin_protocol_handler_async.h:812
void del_connection(async_protocol_handler< t_connection_context > *pc)
Definition: levin_protocol_handler_async.h:751
boost::uuids::uuid get_connection_id()
Definition: levin_protocol_handler_async.h:745
TODO: (mj-xmr) This will be reduced in an another PR.
Definition: byte_slice.h:39
virtual bool handle_recv(const void *ptr, size_t cb)
Definition: levin_protocol_handler_async.h:408
virtual bool is_timer_started() const =0
Definition: net_utils_base.h:441
const T & move(const T &t)
Definition: gtest-port.h:1317
#define LEVIN_ERROR_CONNECTION
Definition: levin_base.h:103
virtual bool cancel_timer()=0
bucket_head2 m_current_head
Definition: levin_protocol_handler_async.h:179
#define SWAP64LE
Definition: int-util.h:313
void * memcpy(void *a, const void *b, size_t c)
Definition: glibc_compat.cpp:16
void add_connection(async_protocol_handler< t_connection_context > *pc)
Definition: levin_protocol_handler_async.h:803
volatile int m_invoke_result_code
Definition: levin_protocol_handler_async.h:170
Definition: levin_protocol_handler_async.h:77
virtual bool call_run_once_service_io()=0
Definition: syncobj.h:124
Definition: levin_protocol_handler_async.h:80
epee::span< const uint8_t > span(size_t sz) const
Definition: buffer.h:53
void update_connection_context(const connection_context &contxt)
Definition: levin_protocol_handler_async.h:390
int invoke(int command, message_writer in_msg, std::string &buff_out)
Definition: levin_protocol_handler_async.h:676
#define LEVIN_PROTOCOL_VER_1
Definition: levin_base.h:86
bool update_connection_context(const t_connection_context &contxt)
Definition: levin_protocol_handler_async.h:945
Definition: levin_protocol_handler_async.h:200
const uint8_t seed[32]
Definition: code-generator.cpp:37
#define LEVIN_ERROR_CONNECTION_DESTROYED
Definition: levin_base.h:105
void set_handler(levin_commands_handler< t_connection_context > *handler, void(*destroy)(levin_commands_handler< t_connection_context > *)=NULL)
Definition: levin_protocol_handler_async.h:913
stream_state m_state
Definition: levin_protocol_handler_async.h:186
virtual bool request_callback()=0
bucket_head2 header
Definition: levin_base.h:135
bool finish_outer_call()
Definition: levin_protocol_handler_async.h:357
void handle_qued_callback()
Definition: levin_protocol_handler_async.h:403
signed int int32_t
Definition: stdint.h:123
virtual bool do_send(byte_slice message)=0
Definition: levin_protocol_handler_async.h:164
virtual bool cancel_timer()
Definition: levin_protocol_handler_async.h:254
#define LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED
Definition: levin_base.h:75
bool m_timer_cancelled
Definition: levin_protocol_handler_async.h:230
int32_t m_oponent_protocol_ver
Definition: levin_protocol_handler_async.h:188
critical_section m_call_lock
Definition: levin_protocol_handler_async.h:175
error
Tracks LMDB error codes.
Definition: error.h:44
tuple message
Definition: gtest_output_test.py:331
async_protocol_handler & m_con
Definition: levin_protocol_handler_async.h:226
uint8_t m_have_to_return_data
Definition: levin_base.h:66
int send(byte_slice message)
Definition: levin_protocol_handler_async.h:731
std::unique_ptr< test_connection > conn(new test_connection(io_service, m_handler_config))
bool for_connection(const boost::uuids::uuid &connection_id, const callback_t &cb)
Definition: levin_protocol_handler_async.h:871
virtual ~anvoke_handler()
Definition: levin_protocol_handler_async.h:223
bool m_cancel_timer_called
Definition: levin_protocol_handler_async.h:229
#define SWAP32LE
Definition: int-util.h:305
#define LEVIN_OK
Definition: levin_base.h:102
c
Definition: pymoduletest.py:79
virtual void reset_timer()
Definition: levin_protocol_handler_async.h:264
uint64_t m_initial_max_packet_size
Definition: levin_protocol_handler_async.h:101
void(* m_pcommands_handler_destroy)(levin_commands_handler< t_connection_context > *)
Definition: levin_protocol_handler_async.h:95
boost::asio::deadline_timer m_timer
Definition: levin_protocol_handler_async.h:227
Definition: levin_protocol_handler_async.h:191
#define LEVIN_INITIAL_MAX_PACKET_SIZE
Definition: levin_base.h:76
t_connection_context connection_context
Definition: levin_protocol_handler_async.h:158
#define LEVIN_ERROR_CONNECTION_NOT_FOUND
Definition: levin_base.h:104
if(!cryptonote::get_account_address_from_str_or_url(info, cryptonote::TESTNET, "9uVsvEryzpN8WH2t1WWhFFCG5tS8cBNdmJYNRuckLENFimfauV5pZKeS1P2CbxGkSDTUPHXWwiYE5ZGSXDAGbaZgDxobqDN"))
Definition: signature.cpp:53
constexpr pointer data() const noexcept
Definition: span.h:108