28 #include <boost/asio/deadline_timer.hpp>
29 #include <boost/uuid/uuid_generators.hpp>
30 #include <boost/unordered_map.hpp>
31 #include <boost/interprocess/detail/atomic.hpp>
32 #include <boost/smart_ptr/make_shared.hpp>
46 #undef ELECTRONEUM_DEFAULT_LOG_CATEGORY
47 #define ELECTRONEUM_DEFAULT_LOG_CATEGORY "net"
49 #ifndef MIN_BYTES_WANTED
50 #define MIN_BYTES_WANTED 512
61 template<
class t_connection_context>
62 class async_protocol_handler;
64 template<
class t_connection_context>
67 typedef boost::unordered_map<boost::uuids::uuid, async_protocol_handler<t_connection_context>* > connections_map;
69 connections_map m_connects;
82 void delete_connections (
size_t count,
bool incoming);
90 template<
class callback_t>
97 template<
class callback_t>
99 template<
class callback_t>
115 template<
class t_connection_context = net_utils::connection_context_base>
161 template <
class callback_t>
171 m_timer.expires_from_now(boost::posix_time::milliseconds(timeout));
172 m_timer.async_wait([&con, command, cb, timeout](
const boost::system::error_code& ec)
174 if(ec == boost::asio::error::operation_aborted)
176 MINFO(con.
get_context_ref() <<
"Timeout on invoke operation happened, command: " << command <<
" timeout: " << timeout);
221 boost::system::error_code ignored_ec;
228 boost::system::error_code ignored_ec;
231 callback_t& cb =
m_cb;
236 m_timer.async_wait([&con, cb, command, timeout](
const boost::system::error_code& ec)
238 if(ec == boost::asio::error::operation_aborted)
240 MINFO(con.
get_context_ref() <<
"Timeout on invoke operation happened, command: " << command <<
" timeout: " << timeout);
252 template<
class callback_t>
256 boost::shared_ptr<invoke_response_handler_base> handler(boost::make_shared<
anvoke_handler<callback_t>>(cb, timeout, con, command));
258 return handler->is_timer_started();
264 t_connection_context& conn_context):
292 for (
size_t i = 0; i < 60 * 1000 / 100 && 0 != boost::interprocess::ipcdetail::atomic_read32(&
m_wait_count); ++i)
312 boost::interprocess::ipcdetail::atomic_inc32(&
m_wait_count);
318 boost::interprocess::ipcdetail::atomic_dec32(&
m_wait_count);
333 std::for_each(local_invoke_response_handlers.begin(), local_invoke_response_handlers.end(), [](
const boost::shared_ptr<invoke_response_handler_base>& pinv_resp_hndlr) {
334 pinv_resp_hndlr->cancel();
368 if(boost::interprocess::ipcdetail::atomic_read32(&
m_close_called))
381 <<
", connection will be closed.");
387 bool is_continue =
true;
403 response_handler->reset_timer();
427 bool timer_cancelled = response_handler->cancel_timer();
431 invoke_response_handlers_guard.
unlock();
438 invoke_response_handlers_guard.
unlock();
440 if(!boost::interprocess::ipcdetail::atomic_read32(&
m_wait_count) && !boost::interprocess::ipcdetail::atomic_read32(&
m_close_called))
468 #if BYTE_ORDER == LITTLE_ENDIAN
480 send_buff += return_buff;
510 #if BYTE_ORDER == LITTLE_ENDIAN
535 <<
", connection will be closed.");
559 template<
class callback_t>
588 head.m_have_to_return_data =
true;
646 head.m_have_to_return_data =
true;
668 <<
", f=" <<
head.m_flags
669 <<
", r?=" <<
head.m_have_to_return_data
670 <<
", cmd = " <<
head.m_command
671 <<
", ver=" <<
head.m_protocol_version);
674 size_t prev_size = 0;
719 head.m_have_to_return_data =
false;
739 ", f=" <<
head.m_flags <<
740 ", r?=" <<
head.m_have_to_return_data <<
741 ", cmd = " <<
head.m_command <<
742 ", ver=" <<
head.m_protocol_version);
752 template<
class t_connection_context>
753 void async_protocol_handler_config<t_connection_context>::del_connection(async_protocol_handler<t_connection_context>* pconn)
756 m_connects.erase(pconn->get_connection_id());
758 m_pcommands_handler->on_connection_close(pconn->m_connection_context);
761 template<
class t_connection_context>
762 void async_protocol_handler_config<t_connection_context>::delete_connections(
size_t count,
bool incoming)
764 std::vector <boost::uuids::uuid> connections;
766 for (
auto& c: m_connects)
768 if (c.second->m_connection_context.m_is_income == incoming)
769 connections.push_back(c.first);
774 unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
775 shuffle(connections.begin(), connections.end(), std::default_random_engine(seed));
776 while (
count > 0 && connections.size() > 0)
780 auto i = connections.end() - 1;
781 async_protocol_handler<t_connection_context> *conn = m_connects.at(*i);
782 del_connection(conn);
784 connections.erase(i);
786 catch (
const std::out_of_range &e)
788 MWARNING(
"Connection not found in m_connects, continuing");
796 template<
class t_connection_context>
799 delete_connections(
count,
false);
802 template<
class t_connection_context>
805 delete_connections(
count,
true);
808 template<
class t_connection_context>
817 template<
class t_connection_context>
818 async_protocol_handler<t_connection_context>* async_protocol_handler_config<t_connection_context>::find_connection(
boost::uuids::uuid connection_id)
const
820 auto it = m_connects.find(connection_id);
821 return it == m_connects.end() ? 0 : it->second;
824 template<
class t_connection_context>
825 int async_protocol_handler_config<t_connection_context>::find_and_lock_connection(
boost::uuids::uuid connection_id, async_protocol_handler<t_connection_context>*& aph)
828 aph = find_connection(connection_id);
831 if(!aph->start_outer_call())
836 template<
class t_connection_context>
840 int r = find_and_lock_connection(connection_id, aph);
841 return LEVIN_OK == r ? aph->
invoke(command, in_buff, buff_out) : r;
844 template<
class t_connection_context>
template<
class callback_t>
848 int r = find_and_lock_connection(connection_id, aph);
852 template<
class t_connection_context>
template<
class callback_t>
856 for(
auto& c: m_connects)
865 template<
class t_connection_context>
template<
class callback_t>
877 template<
class t_connection_context>
881 return m_connects.size();
884 template<
class t_connection_context>
887 if (m_pcommands_handler && m_pcommands_handler_destroy)
888 (*m_pcommands_handler_destroy)(m_pcommands_handler);
889 m_pcommands_handler = handler;
890 m_pcommands_handler_destroy = destroy;
893 template<
class t_connection_context>
897 int r = find_and_lock_connection(connection_id, aph);
901 template<
class t_connection_context>
906 return 0 != aph ? aph->
close() :
false;
909 template<
class t_connection_context>
920 template<
class t_connection_context>
924 int r = find_and_lock_connection(connection_id, aph);
async_protocol_handler_config()
bool request_callback(boost::uuids::uuid connection_id)
~async_protocol_handler_config()
size_t get_connections_count()
int invoke_async(int command, const epee::span< const uint8_t > in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout=LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
bool for_connection(const boost::uuids::uuid &connection_id, const callback_t &cb)
bool foreach_connection(const callback_t &cb)
uint64_t m_max_packet_size
t_connection_context connection_context
void del_out_connections(size_t count)
void set_handler(levin_commands_handler< t_connection_context > *handler, void(*destroy)(levin_commands_handler< t_connection_context > *)=NULL)
uint64_t m_invoke_timeout
bool update_connection_context(const t_connection_context &contxt)
bool close(boost::uuids::uuid connection_id)
int invoke(int command, const epee::span< const uint8_t > in_buff, std::string &buff_out, boost::uuids::uuid connection_id)
void del_in_connections(size_t count)
int notify(int command, const epee::span< const uint8_t > in_buff, boost::uuids::uuid connection_id)
std::string m_local_inv_buff
void handle_qued_callback()
critical_section m_call_lock
int32_t m_oponent_protocol_ver
bool after_init_connection()
bool add_invoke_response_handler(const callback_t &cb, uint64_t timeout, async_protocol_handler &con, int command)
net_utils::i_service_endpoint * m_pservice_endpoint
critical_section m_invoke_response_handlers_lock
int notify(int command, const epee::span< const uint8_t > in_buff)
void update_connection_context(const connection_context &contxt)
virtual bool handle_recv(const void *ptr, size_t cb)
t_connection_context connection_context
critical_section m_local_inv_buff_lock
virtual ~async_protocol_handler()
int invoke(int command, const epee::span< const uint8_t > in_buff, std::string &buff_out)
std::list< boost::shared_ptr< invoke_response_handler_base > > m_invoke_response_handlers
bool m_connection_initialized
std::atomic< bool > m_deletion_initiated
volatile uint32_t m_invoke_buf_ready
volatile uint32_t m_wait_count
volatile uint32_t m_close_called
t_connection_context & m_connection_context
boost::uuids::uuid get_connection_id()
volatile int m_invoke_result_code
async_protocol_handler_config< t_connection_context > config_type
bucket_head2 m_current_head
async_protocol_handler(net_utils::i_service_endpoint *psnd_hndlr, config_type &config, t_connection_context &conn_context)
net_utils::buffer m_cache_in_buffer
std::atomic< bool > m_protocol_released
bool async_invoke(int command, const epee::span< const uint8_t > in_buff, const callback_t &cb, size_t timeout=LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
t_connection_context & get_context_ref()
critical_section m_send_lock
epee::span< const uint8_t > carve(size_t sz)
void append(const void *data, size_t sz)
epee::span< const uint8_t > span(size_t sz) const
Non-owning sequence of data. Does not deep copy.
constexpr std::size_t size() const noexcept
constexpr pointer data() const noexcept
#define LEVIN_PACKET_RESPONSE
#define LEVIN_PROTOCOL_VER_1
#define LEVIN_ERROR_CONNECTION
#define LEVIN_PACKET_REQUEST
#define LEVIN_DEFAULT_MAX_PACKET_SIZE
#define LEVIN_ERROR_CONNECTION_DESTROYED
#define LEVIN_ERROR_CONNECTION_NOT_FOUND
#define LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED
#define LEVIN_ERROR_CONNECTION_TIMEDOUT
#define CHECK_AND_ASSERT_MES_NO_RET(expr, message)
boost::shared_ptr< call_befor_die_base > auto_scope_leave_caller
uint64_t get_tick_count()
auto_scope_leave_caller create_scope_leave_handler(t_scope_leave_handler f)
mdb_size_t count(MDB_cursor *cur)
std::unique_ptr< void, terminate > context
Unique ZMQ context handle, calls zmq_term on destruction.
#define LOG_DEBUG_CC(ct, message)
#define LOG_ERROR_CC(ct, message)
unsigned __int64 uint64_t
async_protocol_handler & m_con
virtual bool handle(int res, const epee::span< const uint8_t > buff, typename async_protocol_handler::connection_context &context)
boost::asio::deadline_timer m_timer
virtual ~anvoke_handler()
anvoke_handler(const callback_t &cb, uint64_t timeout, async_protocol_handler &con, int command)
virtual bool is_timer_started() const
virtual bool cancel_timer()
bool m_cancel_timer_called
virtual void reset_timer()
virtual bool cancel_timer()=0
virtual void reset_timer()=0
virtual bool is_timer_started() const =0
virtual bool handle(int res, const epee::span< const uint8_t > buff, connection_context &context)=0
uint32_t m_protocol_version
bool m_have_to_return_data
virtual int notify(int command, const epee::span< const uint8_t > in_buff, t_connection_context &context)=0
virtual void callback(t_connection_context &context)
virtual int invoke(int command, const epee::span< const uint8_t > in_buff, std::string &buff_out, t_connection_context &context)=0
virtual bool request_callback()=0
virtual bool call_run_once_service_io()=0
virtual bool do_send(const void *ptr, size_t cb)=0
#define CRITICAL_REGION_LOCAL1(x)
#define CRITICAL_REGION_LOCAL(x)
#define CRITICAL_REGION_END()
#define CRITICAL_REGION_BEGIN(x)