Electroneum
Loading...
Searching...
No Matches
abstract_tcp_server_cp.h
Go to the documentation of this file.
1// Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are met:
6// * Redistributions of source code must retain the above copyright
7// notice, this list of conditions and the following disclaimer.
8// * Redistributions in binary form must reproduce the above copyright
9// notice, this list of conditions and the following disclaimer in the
10// documentation and/or other materials provided with the distribution.
11// * Neither the name of the Andrey N. Sabelnikov nor the
12// names of its contributors may be used to endorse or promote products
13// derived from this software without specific prior written permission.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
16// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER BE LIABLE FOR ANY
19// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25//
26
27
28#ifndef _LEVIN_CP_SERVER_H_
29#define _LEVIN_CP_SERVER_H_
30
31#include <winsock2.h>
32#include <rpc.h>
33#include <string>
34#include <map>
35#include <boost/shared_ptr.hpp>
36
37#include "misc_log_ex.h"
38//#include "threads_helper.h"
39#include "syncobj.h"
40#define ENABLE_PROFILING
41#include "profile_tools.h"
42#include "net_utils_base.h"
43#include "pragma_comp_defs.h"
44
45#undef ELECTRONEUM_DEFAULT_LOG_CATEGORY
46#define ELECTRONEUM_DEFAULT_LOG_CATEGORY "net"
47
48#define LEVIN_DEFAULT_DATA_BUFF_SIZE 2000
49
50namespace epee
51{
52namespace net_utils
53{
54
55 template<class TProtocol>
56 class cp_server_impl//: public abstract_handler
57 {
58 public:
59 cp_server_impl(/*abstract_handler* phandler = NULL*/);
60 virtual ~cp_server_impl();
61
62 bool init_server(int port_no);
64 bool run_server(int threads_count = 0);
67 virtual bool on_net_idle(){return true;}
69 typename TProtocol::config_type& get_config_object(){return m_config;}
70 private:
71 enum overlapped_operation_type
72 {
73 op_type_recv,
74 op_type_send,
75 op_type_stop
76 };
77
78 struct io_data_base
79 {
80 OVERLAPPED m_overlapped;
81 WSABUF DataBuf;
82 overlapped_operation_type m_op_type;
83 DWORD TotalBuffBytes;
84 volatile LONG m_is_in_use;
85 char Buffer[1];
86 };
87
88PRAGMA_WARNING_PUSH
89PRAGMA_WARNING_DISABLE_VS(4355)
90 template<class TProtocol>
91 struct connection: public net_utils::i_service_endpoint
92 {
93 connection(typename TProtocol::config_type& ref_config):m_sock(INVALID_SOCKET), m_tprotocol_handler(this, ref_config, context), m_psend_data(NULL), m_precv_data(NULL), m_asked_to_shutdown(0), m_connection_shutwoned(0)
94 {
95 }
96
97 //connection():m_sock(INVALID_SOCKET), m_tprotocol_handler(this, m_dummy_config, context), m_psend_data(NULL), m_precv_data(NULL), m_asked_to_shutdown(0), m_connection_shutwoned(0)
98 //{
99 //}
100
101 connection<TProtocol>& operator=(const connection<TProtocol>& obj)
102 {
103 return *this;
104 }
105
106 bool init_buffers()
107 {
108 m_psend_data = (io_data_base*)new char[sizeof(io_data_base) + LEVIN_DEFAULT_DATA_BUFF_SIZE-1];
109 m_psend_data->TotalBuffBytes = LEVIN_DEFAULT_DATA_BUFF_SIZE;
110 m_precv_data = (io_data_base*)new char[sizeof(io_data_base) + LEVIN_DEFAULT_DATA_BUFF_SIZE-1];
111 m_precv_data->TotalBuffBytes = LEVIN_DEFAULT_DATA_BUFF_SIZE;
112 return true;
113 }
114
115 bool query_shutdown()
116 {
117 if(!::InterlockedCompareExchange(&m_asked_to_shutdown, 1, 0))
118 {
119 m_psend_data->m_op_type = op_type_stop;
120 ::PostQueuedCompletionStatus(m_completion_port, 0, (ULONG_PTR)this, &m_psend_data->m_overlapped);
121 }
122 return true;
123 }
124
125 //bool set_config(typename TProtocol::config_type& config)
126 //{
127 // this->~connection();
128 // new(this) connection<TProtocol>(config);
129 // return true;
130 //}
132 {
133 if(m_psend_data)
134 delete m_psend_data;
135
136 if(m_precv_data)
137 delete m_precv_data;
138 }
139 virtual bool handle_send(const void* ptr, size_t cb)
140 {
141 PROFILE_FUNC("[handle_send]");
142 if(m_psend_data->TotalBuffBytes < cb)
143 resize_send_buff((DWORD)cb);
144
145 ZeroMemory(&m_psend_data->m_overlapped, sizeof(OVERLAPPED));
146 m_psend_data->DataBuf.len = (u_long)cb;//m_psend_data->TotalBuffBytes;
147 m_psend_data->DataBuf.buf = m_psend_data->Buffer;
148 memcpy(m_psend_data->DataBuf.buf, ptr, cb);
149 m_psend_data->m_op_type = op_type_send;
150 InterlockedExchange(&m_psend_data->m_is_in_use, 1);
151 DWORD bytes_sent = 0;
152 DWORD flags = 0;
153 int res = 0;
154 {
155 PROFILE_FUNC("[handle_send] ::WSASend");
156 res = ::WSASend(m_sock, &(m_psend_data->DataBuf), 1, &bytes_sent, flags, &(m_psend_data->m_overlapped), NULL);
157 }
158
159 if(res == SOCKET_ERROR )
160 {
161 int err = ::WSAGetLastError();
162 if(WSA_IO_PENDING == err )
163 return true;
164 }
165 LOG_ERROR("BIG FAIL: WSASend error code not correct, res=" << res << " last_err=" << err);
166 ::InterlockedExchange(&m_psend_data->m_is_in_use, 0);
168 //closesocket(m_psend_data);
169 return false;
170 }else if(0 == res)
171 {
172 ::InterlockedExchange(&m_psend_data->m_is_in_use, 0);
173 if(!bytes_sent || bytes_sent != cb)
174 {
175 int err = ::WSAGetLastError();
176 LOG_ERROR("BIG FAIL: WSASend immediatly complete? but bad results, res=" << res << " last_err=" << err);
178 return false;
179 }else
180 {
181 return true;
182 }
183 }
184
185 return true;
186 }
187 bool resize_send_buff(DWORD new_size)
189 if(m_psend_data->TotalBuffBytes >= new_size)
190 return true;
191
192 delete m_psend_data;
193 m_psend_data = (io_data_base*)new char[sizeof(io_data_base) + new_size-1];
194 m_psend_data->TotalBuffBytes = new_size;
195 LOG_PRINT("Connection buffer resized up to " << new_size, LOG_LEVEL_3);
196 return true;
197 }
198
199
200 SOCKET m_sock;
202 TProtocol m_tprotocol_handler;
203 typename TProtocol::config_type m_dummy_config;
204 io_data_base* m_precv_data;
205 io_data_base* m_psend_data;
206 HANDLE m_completion_port;
207 volatile LONG m_asked_to_shutdown;
208 volatile LONG m_connection_shutwoned;
209 };
210PRAGMA_WARNING_POP
211
213 static unsigned CALLBACK worker_thread(void* param);
214
215 bool add_new_connection(SOCKET new_sock, long ip_from, int port_from);
217
218
219 typedef std::map<SOCKET, boost::shared_ptr<connection<TProtocol> > > connections_container;
225 volatile LONG m_stop;
226 //abstract_handler* m_phandler;
229 typename TProtocol::config_type m_config;
230 };
231}
232}
233#include "abstract_tcp_server_cp.inl"
235
236#endif //_LEVIN_SERVER_H_
virtual bool handle_send(const void *ptr, size_t cb)
connection< TProtocol > & operator=(const connection< TProtocol > &obj)
bool query_shutdown()
bool init_buffers()
#define LEVIN_DEFAULT_DATA_BUFF_SIZE
connection(typename TProtocol::config_type &ref_config)
Represents a single connection from a client.
TProtocol::config_type & get_config_object()
bool run_server(int threads_count=0)
void * memcpy(void *a, const void *b, size_t c)
const char * res
#define INVALID_SOCKET
#define SOCKET
#define LOG_ERROR(x)
Definition misc_log_ex.h:98
bool shutdown_connection(connection< TProtocol > *pconn)
connections_container m_connections
TProtocol::config_type m_config
PRAGMA_WARNING_POP bool worker_thread_member()
volatile LONG m_worker_thread_counter
bool add_new_connection(SOCKET new_sock, long ip_from, int port_from)
std::map< SOCKET, boost::shared_ptr< connection< TProtocol > > > connections_container
critical_section m_connections_lock
#define PROFILE_FUNC(immortal_ptr_str)