28#include <boost/asio/deadline_timer.hpp>
29#include <boost/uuid/uuid_generators.hpp>
30#include <boost/unordered_map.hpp>
31#include <boost/smart_ptr/make_shared.hpp>
46#undef MONERO_DEFAULT_LOG_CATEGORY
47#define MONERO_DEFAULT_LOG_CATEGORY "net"
49#ifndef MIN_BYTES_WANTED
50#define MIN_BYTES_WANTED 512
53template<
typename context_t>
54void on_levin_traffic(
const context_t &context,
bool initiator,
bool sent,
bool error,
size_t bytes,
const char* category)
56 MCINFO(
"net.p2p.traffic", context << bytes <<
" bytes " << (sent ?
"sent" :
"received") << (error ?
"/corrupt" :
"")
57 <<
" for category " << category <<
" initiated by " << (initiator ?
"us" :
"peer"));
60template<
typename context_t>
61void on_levin_traffic(
const context_t &context,
bool initiator,
bool sent,
bool error,
size_t bytes,
int command)
64 snprintf(
buf,
sizeof(
buf),
"command-%u", command);
76template<
class t_connection_context>
79template<
class t_connection_context>
82 typedef boost::unordered_map<boost::uuids::uuid, async_protocol_handler<t_connection_context>* >
connections_map;
106 template<
class callback_t>
110 bool close(boost::uuids::uuid connection_id);
113 template<
class callback_t>
115 template<
class callback_t>
116 bool for_connection(
const boost::uuids::uuid &connection_id,
const callback_t &cb);
133template<
class t_connection_context = net_utils::connection_context_base>
144 std::memcpy(std::addressof(head), message.data(),
sizeof(head));
199 template <
class callback_t>
206 if(
m_con.start_outer_call())
208 MDEBUG(con.get_context_ref() <<
"anvoke_handler, timeout: " << timeout);
209 m_timer.expires_from_now(boost::posix_time::milliseconds(timeout));
210 m_timer.async_wait([&con, command, cb, timeout](const boost::system::error_code& ec)
212 if(ec == boost::asio::error::operation_aborted)
214 MINFO(con.get_context_ref() <<
"Timeout on invoke operation happened, command: " << command <<
" timeout: " << timeout);
215 epee::span<const uint8_t> fake;
216 cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, con.get_context_ref());
218 con.finish_outer_call();
238 m_con.finish_outer_call();
251 m_con.finish_outer_call();
259 boost::system::error_code ignored_ec;
266 boost::system::error_code ignored_ec;
269 callback_t& cb =
m_cb;
274 m_timer.async_wait([&con, cb, command, timeout](
const boost::system::error_code& ec)
276 if(ec == boost::asio::error::operation_aborted)
278 MINFO(con.
get_context_ref() <<
"Timeout on invoke operation happened, command: " << command <<
" timeout: " << timeout);
290 template<
class callback_t>
296 MERROR(
"Adding response handler to a released object");
299 boost::shared_ptr<invoke_response_handler_base> handler(boost::make_shared<anvoke_handler<callback_t>>(cb, timeout, con, command));
301 return handler->is_timer_started();
303 template<
class callback_t>
friend struct anvoke_handler;
307 t_connection_context& conn_context):
334 for (
size_t i = 0; i < 60 * 1000 / 100 && 0 !=
m_wait_count; ++i)
338 CHECK_AND_ASSERT_MES_NO_RET(0 ==
m_wait_count,
"Failed to wait for operation completion. m_wait_count = " <<
m_wait_count.load());
375 std::for_each(local_invoke_response_handlers.begin(), local_invoke_response_handlers.end(), [](
const boost::shared_ptr<invoke_response_handler_base>& pinv_resp_hndlr) {
376 pinv_resp_hndlr->cancel();
421 CHECK_AND_ASSERT_MES(max_packet_size >=
m_cache_in_buffer.size(),
false,
"Bad m_cache_in_buffer.size()");
427 MWARNING(
m_connection_context <<
"Maximum packet size exceed!, m_max_packet_size = " << max_packet_size
429 <<
", connection will be closed.");
435 bool is_continue =
true;
451 response_handler->reset_timer();
488 if(
m_current_head.m_cb > std::min<size_t>(max_packet_size, max_bytes))
490 MERROR(
m_connection_context <<
"Maximum packet size exceed!, m_max_packet_size = " << std::min<size_t>(max_packet_size, max_bytes)
492 <<
", connection will be closed.");
513 bool timer_cancelled = response_handler->cancel_timer();
517 invoke_response_handlers_guard.unlock();
524 invoke_response_handlers_guard.unlock();
560 if (!temp.empty() && temp.capacity() <= 64 * 1024)
580#if BYTE_ORDER == LITTLE_ENDIAN
602 if(
m_current_head.m_cb > std::min<size_t>(max_packet_size, max_bytes))
606 <<
", connection will be closed.");
630 template<
class callback_t>
637 timeout =
m_config.m_invoke_timeout;
695 size_t prev_size = 0;
750template<
class t_connection_context>
759template<
class t_connection_context>
762 std::vector<typename connections_map::mapped_type> connections;
765 for (
auto &aph: connections)
766 aph->finish_outer_call();
772 if (c.second->m_connection_context.m_is_income == incoming)
773 if (c.second->start_outer_call())
774 connections.push_back(c.second);
779 unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
780 shuffle(connections.begin(), connections.end(), std::default_random_engine(
seed));
781 for (
size_t i = 0; i < connections.size() && i < count; ++i)
782 m_connects.erase(connections[i]->get_connection_id());
786 for (
size_t i = 0; i < connections.size() && i < count; ++i)
787 connections[i]->
close();
790template<
class t_connection_context>
796template<
class t_connection_context>
802template<
class t_connection_context>
811template<
class t_connection_context>
815 return it ==
m_connects.end() ? 0 : it->second;
818template<
class t_connection_context>
830template<
class t_connection_context>
835 return LEVIN_OK == r ? aph->
invoke(command, std::move(in_msg), buff_out) : r;
838template<
class t_connection_context>
template<
class callback_t>
846template<
class t_connection_context>
template<
class callback_t>
849 std::vector<typename connections_map::mapped_type>
conn;
852 for (
auto &aph:
conn)
853 aph->finish_outer_call();
859 if (e.second->start_outer_call())
860 conn.push_back(e.second);
863 for (
auto &aph:
conn)
864 if (!cb(aph->get_context_ref()))
870template<
class t_connection_context>
template<
class callback_t>
883template<
class t_connection_context>
890template<
class t_connection_context>
896 if (!c.second->m_connection_context.m_is_income)
901template<
class t_connection_context>
907 if (c.second->m_connection_context.m_is_income)
912template<
class t_connection_context>
921template<
class t_connection_context>
926 return LEVIN_OK == r ? aph->
send(std::move(message)) : 0;
929template<
class t_connection_context>
944template<
class t_connection_context>
955template<
class t_connection_context>
static void close()
Definition blockchain_blackball.cpp:279
Definition byte_slice.h:69
Definition levin_protocol_handler_async.h:81
async_protocol_handler_config()
Definition levin_protocol_handler_async.h:122
bool request_callback(boost::uuids::uuid connection_id)
Definition levin_protocol_handler_async.h:956
~async_protocol_handler_config()
Definition levin_protocol_handler_async.h:124
size_t get_connections_count()
Definition levin_protocol_handler_async.h:884
int find_and_lock_connection(boost::uuids::uuid connection_id, async_protocol_handler< t_connection_context > *&aph)
Definition levin_protocol_handler_async.h:819
async_protocol_handler< t_connection_context > * find_connection(boost::uuids::uuid connection_id) const
Definition levin_protocol_handler_async.h:812
void(* m_pcommands_handler_destroy)(levin_commands_handler< detail::p2p_context > *)
Definition levin_protocol_handler_async.h:95
bool for_connection(const boost::uuids::uuid &connection_id, const callback_t &cb)
Definition levin_protocol_handler_async.h:871
bool foreach_connection(const callback_t &cb)
Definition levin_protocol_handler_async.h:847
levin_commands_handler< detail::p2p_context > * m_pcommands_handler
Definition levin_protocol_handler_async.h:94
int send(epee::byte_slice message, const boost::uuids::uuid &connection_id)
Definition levin_protocol_handler_async.h:922
boost::unordered_map< boost::uuids::uuid, async_protocol_handler< t_connection_context > * > connections_map
Definition levin_protocol_handler_async.h:82
uint64_t m_max_packet_size
Definition levin_protocol_handler_async.h:102
t_connection_context connection_context
Definition levin_protocol_handler_async.h:100
void delete_connections(size_t count, bool incoming)
Definition levin_protocol_handler_async.h:760
void del_out_connections(size_t count)
Definition levin_protocol_handler_async.h:791
void set_handler(levin_commands_handler< t_connection_context > *handler, void(*destroy)(levin_commands_handler< t_connection_context > *)=NULL)
Definition levin_protocol_handler_async.h:913
uint64_t m_invoke_timeout
Definition levin_protocol_handler_async.h:103
bool update_connection_context(const t_connection_context &contxt)
Definition levin_protocol_handler_async.h:945
critical_section m_connects_lock
Definition levin_protocol_handler_async.h:83
void add_connection(async_protocol_handler< t_connection_context > *pc)
Definition levin_protocol_handler_async.h:803
bool close(boost::uuids::uuid connection_id)
Definition levin_protocol_handler_async.h:930
size_t get_out_connections_count()
Definition levin_protocol_handler_async.h:891
void del_in_connections(size_t count)
Definition levin_protocol_handler_async.h:797
int invoke(int command, message_writer in_msg, std::string &buff_out, boost::uuids::uuid connection_id)
Definition levin_protocol_handler_async.h:831
int invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout=LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
Definition levin_protocol_handler_async.h:839
void del_connection(async_protocol_handler< t_connection_context > *pc)
Definition levin_protocol_handler_async.h:751
connections_map m_connects
Definition levin_protocol_handler_async.h:84
size_t get_in_connections_count()
Definition levin_protocol_handler_async.h:902
uint64_t m_initial_max_packet_size
Definition levin_protocol_handler_async.h:101
Definition levin_protocol_handler_async.h:135
std::string m_local_inv_buff
Definition levin_protocol_handler_async.h:173
bool send_message(byte_slice message)
Definition levin_protocol_handler_async.h:138
void handle_qued_callback()
Definition levin_protocol_handler_async.h:403
std::atomic< uint32_t > m_wait_count
Definition levin_protocol_handler_async.h:177
critical_section m_call_lock
Definition levin_protocol_handler_async.h:175
std::string m_fragment_buffer
Definition levin_protocol_handler_async.h:136
int32_t m_oponent_protocol_ver
Definition levin_protocol_handler_async.h:188
bool after_init_connection()
Definition levin_protocol_handler_async.h:620
bool async_invoke(int command, message_writer in_msg, const callback_t &cb, size_t timeout=LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
Definition levin_protocol_handler_async.h:631
bool release_protocol()
Definition levin_protocol_handler_async.h:365
bool add_invoke_response_handler(const callback_t &cb, uint64_t timeout, async_protocol_handler &con, int command)
Definition levin_protocol_handler_async.h:291
std::atomic< uint64_t > m_max_packet_size
Definition levin_protocol_handler_async.h:183
net_utils::i_service_endpoint * m_pservice_endpoint
Definition levin_protocol_handler_async.h:180
critical_section m_invoke_response_handlers_lock
Definition levin_protocol_handler_async.h:287
bool close()
Definition levin_protocol_handler_async.h:382
void update_connection_context(const connection_context &contxt)
Definition levin_protocol_handler_async.h:390
virtual bool handle_recv(const void *ptr, size_t cb)
Definition levin_protocol_handler_async.h:408
t_connection_context connection_context
Definition levin_protocol_handler_async.h:158
void request_callback()
Definition levin_protocol_handler_async.h:395
critical_section m_local_inv_buff_lock
Definition levin_protocol_handler_async.h:172
int send(byte_slice message)
Definition levin_protocol_handler_async.h:731
virtual ~async_protocol_handler()
Definition levin_protocol_handler_async.h:324
t_connection_context & get_context_ref()
Definition levin_protocol_handler_async.h:747
std::atomic< bool > m_invoke_buf_ready
Definition levin_protocol_handler_async.h:168
std::list< boost::shared_ptr< invoke_response_handler_base > > m_invoke_response_handlers
Definition levin_protocol_handler_async.h:288
bool m_connection_initialized
Definition levin_protocol_handler_async.h:189
stream_state m_state
Definition levin_protocol_handler_async.h:186
stream_state
Definition levin_protocol_handler_async.h:162
@ stream_state_body
Definition levin_protocol_handler_async.h:164
@ stream_state_head
Definition levin_protocol_handler_async.h:163
bool start_outer_call()
Definition levin_protocol_handler_async.h:346
test_connection_context & m_connection_context
Definition levin_protocol_handler_async.h:182
config_type & m_config
Definition levin_protocol_handler_async.h:181
boost::uuids::uuid get_connection_id()
Definition levin_protocol_handler_async.h:745
volatile int m_invoke_result_code
Definition levin_protocol_handler_async.h:170
async_protocol_handler_config< t_connection_context > config_type
Definition levin_protocol_handler_async.h:159
bucket_head2 m_current_head
Definition levin_protocol_handler_async.h:179
async_protocol_handler(net_utils::i_service_endpoint *psnd_hndlr, config_type &config, t_connection_context &conn_context)
Definition levin_protocol_handler_async.h:305
net_utils::buffer m_cache_in_buffer
Definition levin_protocol_handler_async.h:185
std::atomic< bool > m_protocol_released
Definition levin_protocol_handler_async.h:167
bool finish_outer_call()
Definition levin_protocol_handler_async.h:357
int invoke(int command, message_writer in_msg, std::string &buff_out)
Definition levin_protocol_handler_async.h:676
std::atomic< uint32_t > m_close_called
Definition levin_protocol_handler_async.h:178
Provides space for levin (p2p) header, so that payload can be sent without copy.
Definition levin_base.h:132
byte_slice finalize_invoke(uint32_t command)
Definition levin_base.h:151
byte_slice finalize_response(uint32_t command, uint32_t return_code)
Definition levin_base.h:153
byte_stream buffer
Has space for levin header until a finalize method is used.
Definition levin_base.h:159
bucket_head2 header
Definition levin_base.h:135
Non-owning sequence of data. Does not deep copy.
Definition span.h:55
constexpr std::size_t size() const noexcept
Definition span.h:109
constexpr pointer data() const noexcept
Definition span.h:108
const uint8_t seed[32]
Definition code-generator.cpp:37
std::unique_ptr< test_connection > conn(new test_connection(io_service, m_handler_config))
const char * res
Definition hmac_keccak.cpp:42
#define SWAP64LE
Definition int-util.h:285
#define SWAP32LE
Definition int-util.h:277
#define LEVIN_PACKET_RESPONSE
Definition levin_base.h:80
#define LEVIN_PROTOCOL_VER_1
Definition levin_base.h:86
#define LEVIN_ERROR_CONNECTION
Definition levin_base.h:103
#define LEVIN_INITIAL_MAX_PACKET_SIZE
Definition levin_base.h:76
#define LEVIN_PACKET_END
Definition levin_base.h:82
#define LEVIN_PACKET_REQUEST
Definition levin_base.h:79
#define LEVIN_DEFAULT_MAX_PACKET_SIZE
Definition levin_base.h:77
#define LEVIN_OK
Definition levin_base.h:102
#define LEVIN_ERROR_CONNECTION_DESTROYED
Definition levin_base.h:105
#define LEVIN_PACKET_BEGIN
Definition levin_base.h:81
#define LEVIN_ERROR_CONNECTION_NOT_FOUND
Definition levin_base.h:104
#define LEVIN_SIGNATURE
Definition levin_base.h:38
#define LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED
Definition levin_base.h:75
#define LEVIN_ERROR_CONNECTION_TIMEDOUT
Definition levin_base.h:106
#define MIN_BYTES_WANTED
Definition levin_protocol_handler_async.h:50
void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, const char *category)
Definition levin_protocol_handler_async.h:54
Definition cryptonote_config.h:221
Definition levin_base.h:44
boost::shared_ptr< call_befor_die_base > auto_scope_leave_caller
Definition misc_language.h:80
uint64_t get_tick_count()
Definition time_helper.h:82
bool sleep_no_w(long ms)
Definition misc_language.cpp:35
auto_scope_leave_caller create_scope_leave_handler(t_scope_leave_handler f)
Definition misc_language.h:97
TODO: (mj-xmr) This will be reduced in an another PR.
Definition byte_slice.h:40
#define LOG_ERROR_CC(ct, message)
Definition net_utils_base.h:469
const char * buf
Definition slow_memmem.cpp:73
unsigned int uint32_t
Definition stdint.h:126
signed int int32_t
Definition stdint.h:123
unsigned char uint8_t
Definition stdint.h:124
unsigned __int64 uint64_t
Definition stdint.h:136
bool m_timer_started
Definition levin_protocol_handler_async.h:228
async_protocol_handler & m_con
Definition levin_protocol_handler_async.h:226
callback_t m_cb
Definition levin_protocol_handler_async.h:225
uint64_t m_timeout
Definition levin_protocol_handler_async.h:231
virtual bool handle(int res, const epee::span< const uint8_t > buff, typename async_protocol_handler::connection_context &context)
Definition levin_protocol_handler_async.h:233
virtual void cancel()
Definition levin_protocol_handler_async.h:245
boost::asio::deadline_timer m_timer
Definition levin_protocol_handler_async.h:227
virtual ~anvoke_handler()
Definition levin_protocol_handler_async.h:223
anvoke_handler(const callback_t &cb, uint64_t timeout, async_protocol_handler &con, int command)
Definition levin_protocol_handler_async.h:202
virtual bool is_timer_started() const
Definition levin_protocol_handler_async.h:241
int m_command
Definition levin_protocol_handler_async.h:232
virtual bool cancel_timer()
Definition levin_protocol_handler_async.h:254
bool m_timer_cancelled
Definition levin_protocol_handler_async.h:230
bool m_cancel_timer_called
Definition levin_protocol_handler_async.h:229
virtual void reset_timer()
Definition levin_protocol_handler_async.h:264
Definition levin_protocol_handler_async.h:192
virtual bool cancel_timer()=0
virtual void reset_timer()=0
virtual bool is_timer_started() const =0
virtual bool handle(int res, const epee::span< const uint8_t > buff, connection_context &context)=0
Definition levin_base.h:63
uint32_t m_command
Definition levin_base.h:67
uint32_t m_protocol_version
Definition levin_base.h:70
uint32_t m_flags
Definition levin_base.h:69
uint64_t m_cb
Definition levin_base.h:65
int32_t m_return_code
Definition levin_base.h:68
uint64_t m_signature
Definition levin_base.h:64
uint8_t m_have_to_return_data
Definition levin_base.h:66
Definition levin_base.h:91
Definition net_utils_base.h:442
#define CRITICAL_REGION_LOCAL(x)
Definition syncobj.h:153
#define CRITICAL_REGION_END()
Definition syncobj.h:158
#define CRITICAL_REGION_BEGIN(x)
Definition syncobj.h:154