315{
317{
318
319 template<class TProtocol>
321 {
322 public:
325
334 private:
335 enum overlapped_operation_type
336 {
337 op_type_recv,
338 op_type_send,
339 op_type_stop
340 };
341
342 struct io_data_base
343 {
344 OVERLAPPED m_overlapped;
345 WSABUF DataBuf;
346 overlapped_operation_type m_op_type;
347 DWORD TotalBuffBytes;
348 volatile LONG m_is_in_use;
349 char Buffer[1];
350 };
351
352PRAGMA_WARNING_PUSH
353PRAGMA_WARNING_DISABLE_VS(4355)
354 template<class TProtocol>
355 struct connection: public net_utils::i_service_endpoint
356 {
357 connection(
typename TProtocol::config_type& ref_config):m_sock(
INVALID_SOCKET), m_tprotocol_handler(this, ref_config, context), m_psend_data(NULL), m_precv_data(NULL), m_asked_to_shutdown(0), m_connection_shutwoned(0)
358 {
359 }
360
361
362
363
364
365 connection<TProtocol>&
operator=(
const connection<TProtocol>& obj)
366 {
367 return *this;
368 }
369
371 {
376 return true;
377 }
378
380 {
381 if(!::InterlockedCompareExchange(&m_asked_to_shutdown, 1, 0))
382 {
383 m_psend_data->m_op_type = op_type_stop;
384 ::PostQueuedCompletionStatus(m_completion_port, 0, (ULONG_PTR)this, &m_psend_data->m_overlapped);
385 }
386 return true;
387 }
388
389
390
391
392
393
394
396 {
397 if(m_psend_data)
398 delete m_psend_data;
399
400 if(m_precv_data)
401 delete m_precv_data;
402 }
403 virtual bool handle_send(
const void* ptr,
size_t cb)
404 {
406 if(m_psend_data->TotalBuffBytes < cb)
407 resize_send_buff((DWORD)cb);
408
409 ZeroMemory(&m_psend_data->m_overlapped, sizeof(OVERLAPPED));
410 m_psend_data->DataBuf.len = (u_long)cb;
411 m_psend_data->DataBuf.buf = m_psend_data->Buffer;
412 memcpy(m_psend_data->DataBuf.buf, ptr, cb);
413 m_psend_data->m_op_type = op_type_send;
414 InterlockedExchange(&m_psend_data->m_is_in_use, 1);
415 DWORD bytes_sent = 0;
416 DWORD flags = 0;
418 {
420 res = ::WSASend(m_sock, &(m_psend_data->DataBuf), 1, &bytes_sent, flags, &(m_psend_data->m_overlapped), NULL);
421 }
422
423 if(
res == SOCKET_ERROR )
424 {
425 int err = ::WSAGetLastError();
426 if(WSA_IO_PENDING == err )
427 return true;
428 }
429 LOG_ERROR(
"BIG FAIL: WSASend error code not correct, res=" <<
res <<
" last_err=" << err);
430 ::InterlockedExchange(&m_psend_data->m_is_in_use, 0);
432
433 return false;
435 {
436 ::InterlockedExchange(&m_psend_data->m_is_in_use, 0);
437 if(!bytes_sent || bytes_sent != cb)
438 {
439 int err = ::WSAGetLastError();
440 LOG_ERROR(
"BIG FAIL: WSASend immediatly complete? but bad results, res=" <<
res <<
" last_err=" << err);
442 return false;
443 }else
444 {
445 return true;
446 }
447 }
448
449 return true;
450 }
451 bool resize_send_buff(DWORD new_size)
452 {
453 if(m_psend_data->TotalBuffBytes >= new_size)
454 return true;
455
456 delete m_psend_data;
457 m_psend_data = (io_data_base*)new char[sizeof(io_data_base) + new_size-1];
458 m_psend_data->TotalBuffBytes = new_size;
459 LOG_PRINT("Connection buffer resized up to " << new_size, LOG_LEVEL_3);
460 return true;
461 }
462
463
465 net_utils::connection_context_base context;
466 TProtocol m_tprotocol_handler;
467 typename TProtocol::config_type m_dummy_config;
468 io_data_base* m_precv_data;
469 io_data_base* m_psend_data;
470 HANDLE m_completion_port;
471 volatile LONG m_asked_to_shutdown;
472 volatile LONG m_connection_shutwoned;
473 };
474PRAGMA_WARNING_POP
475
477 static unsigned CALLBACK worker_thread(void* param);
478
481
482
490
493 typename TProtocol::config_type
m_config;
494 };
495}
496}
497#include "abstract_tcp_server_cp.inl"
498
499
500#endif
virtual bool handle_send(const void *ptr, size_t cb)
connection< TProtocol > & operator=(const connection< TProtocol > &obj)
#define LEVIN_DEFAULT_DATA_BUFF_SIZE
connection(typename TProtocol::config_type &ref_config)
Represents a single connection from a client.
size_t get_active_connections_num()
bool init_server(int port_no)
TProtocol::config_type & get_config_object()
bool run_server(int threads_count=0)
virtual ~cp_server_impl()
virtual bool on_net_idle()
void * memcpy(void *a, const void *b, size_t c)
bool shutdown_connection(connection< TProtocol > *pconn)
connections_container m_connections
TProtocol::config_type m_config
PRAGMA_WARNING_POP bool worker_thread_member()
volatile LONG m_worker_thread_counter
bool add_new_connection(SOCKET new_sock, long ip_from, int port_from)
std::map< SOCKET, boost::shared_ptr< connection< TProtocol > > > connections_container
critical_section m_connections_lock