Monero
Loading...
Searching...
No Matches
abstract_tcp_server2.inl
Go to the documentation of this file.
1
7// Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net
8// All rights reserved.
9//
10// Redistribution and use in source and binary forms, with or without
11// modification, are permitted provided that the following conditions are met:
12// * Redistributions of source code must retain the above copyright
13// notice, this list of conditions and the following disclaimer.
14// * Redistributions in binary form must reproduce the above copyright
15// notice, this list of conditions and the following disclaimer in the
16// documentation and/or other materials provided with the distribution.
17// * Neither the name of the Andrey N. Sabelnikov nor the
18// names of its contributors may be used to endorse or promote products
19// derived from this software without specific prior written permission.
20//
21// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
24// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER BE LIABLE FOR ANY
25// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
26// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
28// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31//
32
33
34#include <boost/asio/post.hpp>
35#include <boost/foreach.hpp>
36#include <boost/uuid/random_generator.hpp>
37#include <boost/chrono.hpp>
38#include <boost/utility/value_init.hpp>
39#include <boost/asio/bind_executor.hpp>
40#include <boost/asio/deadline_timer.hpp>
41#include <boost/date_time/posix_time/posix_time.hpp> // TODO
42#include <boost/thread/condition_variable.hpp> // TODO
43#include <boost/make_shared.hpp>
44#include <boost/thread.hpp>
45#include "warnings.h"
47#include "misc_language.h"
48
49#include <sstream>
50#include <iomanip>
51#include <algorithm>
52#include <functional>
53#include <random>
54
55#undef MONERO_DEFAULT_LOG_CATEGORY
56#define MONERO_DEFAULT_LOG_CATEGORY "net"
57
58#define AGGRESSIVE_TIMEOUT_THRESHOLD 120 // sockets
59#define NEW_CONNECTION_TIMEOUT_LOCAL 1200000 // 2 minutes
60#define NEW_CONNECTION_TIMEOUT_REMOTE 10000 // 10 seconds
61#define DEFAULT_TIMEOUT_MS_LOCAL 1800000 // 30 minutes
62#define DEFAULT_TIMEOUT_MS_REMOTE 300000 // 5 minutes
63#define TIMEOUT_EXTRA_MS_PER_BYTE 0.2
64
65
66namespace epee
67{
68namespace net_utils
69{
70 template<typename T>
71 T& check_and_get(std::shared_ptr<T>& ptr)
72 {
73 CHECK_AND_ASSERT_THROW_MES(bool(ptr), "shared_state cannot be null");
74 return *ptr;
75 }
76
77 /************************************************************************/
78 /* */
79 /************************************************************************/
80 template<typename T>
81 unsigned int connection<T>::host_count(int delta)
82 {
83 static std::mutex hosts_mutex;
84 std::lock_guard<std::mutex> guard(hosts_mutex);
85 static std::map<std::string, unsigned int> hosts;
86 unsigned int &val = hosts[m_host];
87 if (delta > 0)
88 MTRACE("New connection from host " << m_host << ": " << val);
89 else if (delta < 0)
90 MTRACE("Closed connection from host " << m_host << ": " << val);
91 CHECK_AND_ASSERT_THROW_MES(delta >= 0 || val >= (unsigned)-delta, "Count would go negative");
92 CHECK_AND_ASSERT_THROW_MES(delta <= 0 || val <= std::numeric_limits<unsigned int>::max() - (unsigned)delta, "Count would wrap");
93 val += delta;
94 return val;
95 }
96
97 template<typename T>
99 {
100 unsigned count{};
101 try { count = host_count(); } catch (...) {}
102 const unsigned shift = (
104 std::min(std::max(count, 1u) - 1, 8u) :
105 0
106 );
107 return (
108 m_local ?
109 std::chrono::milliseconds(DEFAULT_TIMEOUT_MS_LOCAL >> shift) :
110 std::chrono::milliseconds(DEFAULT_TIMEOUT_MS_REMOTE >> shift)
111 );
112 }
113
114 template<typename T>
116 {
117 return std::chrono::duration_cast<connection<T>::duration_t>(
118 std::chrono::duration<double, std::chrono::milliseconds::period>(
120 )
121 );
122 }
123
124 template<typename T>
126 {
127 switch (m_state.status)
128 {
130 interrupt();
131 break;
134 break;
137 break;
138 default:
139 break;
140 }
141 }
142
143 template<typename T>
144 void connection<T>::start_timer(duration_t duration, bool add)
145 {
146 if (m_state.timers.general.wait_expire) {
147 m_state.timers.general.cancel_expire = true;
148 m_state.timers.general.reset_expire = true;
149 m_timers.general.expires_after(
150 std::min(
151 duration + (add ? (m_timers.general.expiry() - std::chrono::steady_clock::now()) : duration_t{}),
153 )
154 );
155 }
156 else {
157 m_timers.general.expires_after(
158 std::min(
159 duration + (add ? (m_timers.general.expiry() - std::chrono::steady_clock::now()) : duration_t{}),
161 )
162 );
164 }
165 }
166
167 template<typename T>
169 {
170 if (m_state.timers.general.wait_expire)
171 return;
172 m_state.timers.general.wait_expire = true;
174 auto on_wait = [this, self] {
175 std::lock_guard<std::mutex> guard(m_state.lock);
176 m_state.timers.general.wait_expire = false;
177 if (m_state.timers.general.cancel_expire) {
178 m_state.timers.general.cancel_expire = false;
179 if (m_state.timers.general.reset_expire) {
180 m_state.timers.general.reset_expire = false;
182 }
183 else if (m_state.status == status_t::INTERRUPTED)
185 else if (m_state.status == status_t::TERMINATING)
187 }
188 else if (m_state.status == status_t::RUNNING)
189 interrupt();
190 else if (m_state.status == status_t::INTERRUPTED)
191 terminate();
192 };
193 m_timers.general.async_wait([this, self, on_wait](const ec_t & ec){
194 boost::asio::post(m_strand, on_wait);
195 });
196 }
197
198 template<typename T>
200 {
201 if (!m_state.timers.general.wait_expire)
202 return;
203 m_state.timers.general.cancel_expire = true;
204 m_state.timers.general.reset_expire = false;
205 m_timers.general.cancel();
206 }
207
208 template<typename T>
210 {
211 if (m_state.socket.wait_handshake)
212 return;
213 static_assert(
214 epee::net_utils::get_ssl_magic_size() <= sizeof(m_state.data.read.buffer),
215 ""
216 );
218 if (!m_state.ssl.forced && !m_state.ssl.detected) {
219 m_state.socket.wait_read = true;
220 boost::asio::async_read(
221 connection_basic::socket_.next_layer(),
222 boost::asio::buffer(
223 m_state.data.read.buffer.data(),
224 m_state.data.read.buffer.size()
225 ),
226 boost::asio::transfer_exactly(epee::net_utils::get_ssl_magic_size()),
227 boost::asio::bind_executor(
228 m_strand,
229 [this, self](const ec_t &ec, size_t bytes_transferred){
230 std::lock_guard<std::mutex> guard(m_state.lock);
231 m_state.socket.wait_read = false;
232 if (m_state.socket.cancel_read) {
233 m_state.socket.cancel_read = false;
234 state_status_check();
235 }
236 else if (ec.value()) {
237 terminate();
238 }
239 else if (
241 static_cast<const unsigned char *>(
242 m_state.data.read.buffer.data()
243 ),
244 bytes_transferred
245 )
246 ) {
247 m_state.ssl.enabled = false;
248 finish_read(bytes_transferred);
249 }
250 else {
251 m_state.ssl.detected = true;
252 start_handshake();
253 }
254 }
255 )
256 );
257 return;
258 }
259
260 m_state.socket.wait_handshake = true;
261 auto on_handshake = [this, self](const ec_t &ec, size_t bytes_transferred){
262 std::lock_guard<std::mutex> guard(m_state.lock);
263 m_state.socket.wait_handshake = false;
264 if (m_state.socket.cancel_handshake) {
265 m_state.socket.cancel_handshake = false;
266 state_status_check();
267 }
268 else if (ec.value()) {
269 ec_t ec;
270 connection_basic::socket_.next_layer().shutdown(
271 socket_t::shutdown_both,
272 ec
273 );
274 connection_basic::socket_.next_layer().close(ec);
275 m_state.socket.connected = false;
276 interrupt();
277 }
278 else {
279 m_state.ssl.handshaked = true;
280 start_write();
281 start_read();
282 }
283 };
284 const auto handshake = handshake_t::server;
285 static_cast<shared_state&>(
287 ).ssl_options().configure(connection_basic::socket_, handshake);
288 boost::asio::post(
289 m_strand,
290 [this, self, on_handshake]{
291 connection_basic::socket_.async_handshake(
292 handshake,
293 boost::asio::buffer(
294 m_state.data.read.buffer.data(),
295 m_state.ssl.forced ? 0 :
297 ),
298 boost::asio::bind_executor(m_strand, on_handshake)
299 );
300 }
301 );
302 }
303
304 template<typename T>
306 {
307 if (m_state.timers.throttle.in.wait_expire || m_state.socket.wait_read ||
308 m_state.socket.handle_read || m_state.socket.shutdown_read
309 ) {
310 return;
311 }
314 auto calc_duration = []{
317 );
318 return std::chrono::duration_cast<connection<T>::duration_t>(
319 std::chrono::duration<double, std::chrono::seconds::period>(
320 std::min(
322 ).get_sleep_time_after_tick(1),
323 1.0
324 )
325 )
326 );
327 };
328 const auto duration = calc_duration();
329 if (duration > duration_t{}) {
330 m_timers.throttle.in.expires_after(duration);
331 m_state.timers.throttle.in.wait_expire = true;
332 auto on_wait = [this, self](const ec_t &ec){
333 std::lock_guard<std::mutex> guard(m_state.lock);
334 m_state.timers.throttle.in.wait_expire = false;
335 if (m_state.timers.throttle.in.cancel_expire) {
336 m_state.timers.throttle.in.cancel_expire = false;
338 }
339 else if (ec.value())
340 interrupt();
341 };
342 m_timers.throttle.in.async_wait([this, self, on_wait](const ec_t &ec){
343 std::lock_guard<std::mutex> guard(m_state.lock);
344 const bool error_status = m_state.timers.throttle.in.cancel_expire || ec.value();
345 if (error_status)
346 boost::asio::post(m_strand, std::bind(on_wait, ec));
347 else {
348 m_state.timers.throttle.in.wait_expire = false;
349 start_read();
350 }
351 });
352 return;
353 }
354 }
355 m_state.socket.wait_read = true;
356 auto on_read = [this, self](const ec_t &ec, size_t bytes_transferred){
357 std::lock_guard<std::mutex> guard(m_state.lock);
358 m_state.socket.wait_read = false;
359 if (m_state.socket.cancel_read) {
360 m_state.socket.cancel_read = false;
361 state_status_check();
362 }
363 else if (ec.value())
364 terminate();
365 else {
367 m_state.stat.in.throttle.handle_trafic_exact(bytes_transferred);
368 const auto speed = m_state.stat.in.throttle.get_current_speed();
369 m_conn_context.m_current_speed_down = speed;
370 m_conn_context.m_max_speed_down = std::max(
371 m_conn_context.m_max_speed_down,
372 speed
373 );
374 if (speed_limit_is_enabled()) {
376 network_throttle_manager_t::m_lock_get_global_throttle_in
377 );
378 network_throttle_manager_t::get_global_throttle_in(
379 ).handle_trafic_exact(bytes_transferred);
380 }
382 m_conn_context.m_last_recv = time(NULL);
383 m_conn_context.m_recv_cnt += bytes_transferred;
384 start_timer(get_timeout_from_bytes_read(bytes_transferred), true);
385 }
386 finish_read(bytes_transferred);
387 }
388 };
389 if (!m_state.ssl.enabled)
390 connection_basic::socket_.next_layer().async_read_some(
391 boost::asio::buffer(
392 m_state.data.read.buffer.data(),
393 m_state.data.read.buffer.size()
394 ),
395 boost::asio::bind_executor(m_strand, on_read)
396 );
397 else
398 boost::asio::post(
399 m_strand,
400 [this, self, on_read]{
401 connection_basic::socket_.async_read_some(
402 boost::asio::buffer(
403 m_state.data.read.buffer.data(),
404 m_state.data.read.buffer.size()
405 ),
406 boost::asio::bind_executor(m_strand, on_read)
407 );
408 }
409 );
410 }
411
412 template<typename T>
413 void connection<T>::finish_read(size_t bytes_transferred)
414 {
415 // Post handle_recv to a separate `strand_`, distinct from `m_strand`
416 // which is listening for reads/writes. This avoids a circular dep.
417 // handle_recv can queue many writes, and `m_strand` will process those
418 // writes until the connection terminates without deadlocking waiting
419 // for handle_recv.
420 m_state.socket.handle_read = true;
422 boost::asio::post(
424 [this, self, bytes_transferred]{
425 bool success = m_handler.handle_recv(
426 reinterpret_cast<char *>(m_state.data.read.buffer.data()),
427 bytes_transferred
428 );
429 std::lock_guard<std::mutex> guard(m_state.lock);
430 const bool error_status = m_state.status == status_t::INTERRUPTED
431 || m_state.status == status_t::TERMINATING
432 || !success;
433 if (!error_status) {
434 m_state.socket.handle_read = false;
435 start_read();
436 return;
437 }
438 boost::asio::post(
439 m_strand,
440 [this, self, success]{
441 // expect error_status == true
442 std::lock_guard<std::mutex> guard(m_state.lock);
443 m_state.socket.handle_read = false;
444 if (m_state.status == status_t::INTERRUPTED)
446 else if (m_state.status == status_t::TERMINATING)
448 else if (!success) {
449 ec_t ec;
450 if (m_state.socket.wait_write) {
451 // Allow the already queued writes time to finish, but no more new reads
452 connection_basic::socket_.next_layer().shutdown(
453 socket_t::shutdown_receive,
454 ec
455 );
456 m_state.socket.shutdown_read = true;
457 }
458 if (!m_state.socket.wait_write || ec.value()) {
459 interrupt();
460 }
461 }
462 }
463 );
464 }
465 );
466 }
467
468 template<typename T>
470 {
471 if (m_state.timers.throttle.out.wait_expire || m_state.socket.wait_write ||
472 m_state.data.write.queue.empty() ||
473 (m_state.ssl.enabled && !m_state.ssl.handshaked)
474 ) {
475 return;
476 }
479 auto calc_duration = [this]{
482 );
483 return std::chrono::duration_cast<connection<T>::duration_t>(
484 std::chrono::duration<double, std::chrono::seconds::period>(
485 std::min(
487 ).get_sleep_time_after_tick(
488 m_state.data.write.queue.back().size()
489 ),
490 1.0
491 )
492 )
493 );
494 };
495 const auto duration = calc_duration();
496 if (duration > duration_t{}) {
497 m_timers.throttle.out.expires_after(duration);
498 m_state.timers.throttle.out.wait_expire = true;
499 auto on_wait = [this, self](const ec_t &ec){
500 std::lock_guard<std::mutex> guard(m_state.lock);
501 m_state.timers.throttle.out.wait_expire = false;
502 if (m_state.timers.throttle.out.cancel_expire) {
503 m_state.timers.throttle.out.cancel_expire = false;
504 state_status_check();
505 }
506 else if (ec.value())
507 interrupt();
508 };
509 m_timers.throttle.out.async_wait([this, self, on_wait](const ec_t &ec){
510 std::lock_guard<std::mutex> guard(m_state.lock);
511 const bool error_status = m_state.timers.throttle.out.cancel_expire || ec.value();
512 if (error_status)
513 boost::asio::post(m_strand, std::bind(on_wait, ec));
514 else {
515 m_state.timers.throttle.out.wait_expire = false;
516 start_write();
517 }
518 });
519 }
520 }
521
522 m_state.socket.wait_write = true;
523 auto on_write = [this, self](const ec_t &ec, size_t bytes_transferred){
524 std::lock_guard<std::mutex> guard(m_state.lock);
525 m_state.socket.wait_write = false;
526 if (m_state.socket.cancel_write) {
527 m_state.socket.cancel_write = false;
528 m_state.data.write.queue.clear();
529 m_state.data.write.total_bytes = 0;
530 state_status_check();
531 }
532 else if (ec.value()) {
533 m_state.data.write.queue.clear();
534 m_state.data.write.total_bytes = 0;
535 interrupt();
536 }
537 else {
538 {
539 m_state.stat.out.throttle.handle_trafic_exact(bytes_transferred);
540 const auto speed = m_state.stat.out.throttle.get_current_speed();
541 m_conn_context.m_current_speed_up = speed;
542 m_conn_context.m_max_speed_down = std::max(
543 m_conn_context.m_max_speed_down,
544 speed
545 );
546 if (speed_limit_is_enabled()) {
548 network_throttle_manager_t::m_lock_get_global_throttle_out
549 );
550 network_throttle_manager_t::get_global_throttle_out(
551 ).handle_trafic_exact(bytes_transferred);
552 }
553 connection_basic::logger_handle_net_write(bytes_transferred);
554 m_conn_context.m_last_send = time(NULL);
555 m_conn_context.m_send_cnt += bytes_transferred;
556
557 start_timer(get_default_timeout(), true);
558 }
559 const std::size_t byte_count = m_state.data.write.queue.back().size();
560 assert(bytes_transferred == byte_count);
561 m_state.data.write.queue.pop_back();
562 m_state.data.write.total_bytes -=
563 std::min(m_state.data.write.total_bytes, byte_count);
564 m_state.condition.notify_all();
565 if (m_state.data.write.queue.empty() && m_state.socket.shutdown_read) {
566 // All writes have been sent and reads shutdown already, connection can be closed
567 interrupt();
568 } else {
569 start_write();
570 }
571 }
572 };
573 if (!m_state.ssl.enabled)
574 boost::asio::async_write(
575 connection_basic::socket_.next_layer(),
576 boost::asio::buffer(
577 m_state.data.write.queue.back().data(),
578 m_state.data.write.queue.back().size()
579 ),
580 boost::asio::bind_executor(m_strand, on_write)
581 );
582 else
583 boost::asio::post(
584 m_strand,
585 [this, self, on_write]{
586 boost::asio::async_write(
587 connection_basic::socket_,
588 boost::asio::buffer(
589 m_state.data.write.queue.back().data(),
590 m_state.data.write.queue.back().size()
591 ),
592 boost::asio::bind_executor(m_strand, on_write)
593 );
594 }
595 );
596 }
597
598 template<typename T>
600 {
601 if (m_state.socket.wait_shutdown)
602 return;
604 m_state.socket.wait_shutdown = true;
605 auto on_shutdown = [this, self](const ec_t &ec){
606 std::lock_guard<std::mutex> guard(m_state.lock);
607 m_state.socket.wait_shutdown = false;
608 if (m_state.socket.cancel_shutdown) {
609 m_state.socket.cancel_shutdown = false;
610 switch (m_state.status)
611 {
613 interrupt();
614 break;
616 terminate();
617 break;
620 break;
621 default:
622 break;
623 }
624 }
625 else {
626 terminate();
627 }
628 };
629 boost::asio::post(
630 m_strand,
631 [this, self, on_shutdown]{
632 connection_basic::socket_.async_shutdown(
633 boost::asio::bind_executor(m_strand, on_shutdown)
634 );
635 }
636 );
638 }
639
640 template<typename T>
642 {
643 bool wait_socket = false;
644 if (m_state.socket.wait_handshake)
645 wait_socket = m_state.socket.cancel_handshake = true;
646 if (m_state.timers.throttle.in.wait_expire) {
647 m_state.timers.throttle.in.cancel_expire = true;
648 m_timers.throttle.in.cancel();
649 }
650 if (m_state.socket.wait_read)
651 wait_socket = m_state.socket.cancel_read = true;
652 if (m_state.timers.throttle.out.wait_expire) {
653 m_state.timers.throttle.out.cancel_expire = true;
654 m_timers.throttle.out.cancel();
655 }
656 if (m_state.socket.wait_write)
657 wait_socket = m_state.socket.cancel_write = true;
658 if (m_state.socket.wait_shutdown)
659 wait_socket = m_state.socket.cancel_shutdown = true;
660 if (wait_socket) {
661 ec_t ec;
662 connection_basic::socket_.next_layer().cancel(ec);
663 }
664 }
665
666 template<typename T>
668 {
669 if (m_state.protocol.released || m_state.protocol.wait_release)
670 return;
671 m_state.protocol.wait_release = true;
672 m_state.lock.unlock();
673 m_handler.release_protocol();
674 m_state.lock.lock();
675 m_state.protocol.wait_release = false;
676 m_state.protocol.released = true;
677 if (m_state.status == status_t::INTERRUPTED)
679 else if (m_state.status == status_t::TERMINATING)
681 }
682
683 template<typename T>
685 {
686 if (m_state.status != status_t::RUNNING)
687 return;
689 cancel_timer();
692 m_state.condition.notify_all();
694 }
695
696 template<typename T>
698 {
699 assert(m_state.status == status_t::INTERRUPTED);
700 if (m_state.timers.general.wait_expire)
701 return;
702 if (m_state.socket.wait_handshake)
703 return;
704 if (m_state.timers.throttle.in.wait_expire)
705 return;
706 if (m_state.socket.wait_read)
707 return;
708 if (m_state.socket.handle_read)
709 return;
710 if (m_state.timers.throttle.out.wait_expire)
711 return;
712 // \NOTE See on_terminating() comments
713 //if (m_state.socket.wait_write)
714 // return;
715 if (m_state.socket.wait_shutdown)
716 return;
717 if (m_state.protocol.wait_init)
718 return;
719 if (m_state.protocol.wait_callback)
720 return;
721 if (m_state.protocol.wait_release)
722 return;
723 if (m_state.socket.connected) {
724 if (!m_state.ssl.enabled) {
725 ec_t ec;
726 connection_basic::socket_.next_layer().shutdown(
727 socket_t::shutdown_both,
728 ec
729 );
730 connection_basic::socket_.next_layer().close(ec);
731 m_state.socket.connected = false;
732 m_state.status = status_t::WASTED;
733 }
734 else
736 }
737 else
738 m_state.status = status_t::WASTED;
739 }
740
741 template<typename T>
743 {
744 if (m_state.status != status_t::RUNNING &&
746 )
747 return;
749 cancel_timer();
752 m_state.condition.notify_all();
754 }
755
756 template<typename T>
758 {
759 assert(m_state.status == status_t::TERMINATING);
760 if (m_state.timers.general.wait_expire)
761 return;
762 if (m_state.socket.wait_handshake)
763 return;
764 if (m_state.timers.throttle.in.wait_expire)
765 return;
766 if (m_state.socket.wait_read)
767 return;
768 if (m_state.socket.handle_read)
769 return;
770 if (m_state.timers.throttle.out.wait_expire)
771 return;
772 // Writes cannot be canceled due to `async_write` being a "composed"
773 // handler. ASIO has new cancellation routines, not available in 1.66, to
774 // handle this situation. The problem is that if cancel is called after an
775 // intermediate handler is queued, the op will not check the cancel flag in
776 // our code, and will instead queue up another write.
777 //if (m_state.socket.wait_write)
778 // return;
779 if (m_state.socket.wait_shutdown)
780 return;
781 if (m_state.protocol.wait_init)
782 return;
783 if (m_state.protocol.wait_callback)
784 return;
785 if (m_state.protocol.wait_release)
786 return;
787 if (m_state.socket.connected) {
788 ec_t ec;
789 connection_basic::socket_.next_layer().shutdown(
790 socket_t::shutdown_both,
791 ec
792 );
793 connection_basic::socket_.next_layer().close(ec);
794 m_state.socket.connected = false;
795 }
796 m_state.status = status_t::WASTED;
797 }
798
799 template<typename T>
801 {
802 // synchronize with intermediate writes on `m_strand`
804 boost::asio::post(m_strand, [this, self] {
805 std::lock_guard<std::mutex> guard(m_state.lock);
806 terminate();
807 });
808 }
809
810 template<typename T>
812 {
813 std::lock_guard<std::mutex> guard(m_state.lock);
814 if (m_state.status != status_t::RUNNING || m_state.socket.wait_handshake)
815 return false;
816 if (std::numeric_limits<std::size_t>::max() - m_state.data.write.total_bytes < message.size())
817 return false;
818
819 // Wait for the write queue to fall below the max. If it doesn't after a
820 // randomized delay, drop the connection.
821 auto wait_consume = [this] {
822 auto random_delay = []{
823 using engine = std::mt19937;
824 std::random_device dev;
825 std::seed_seq::result_type rand[
826 engine::state_size // Use complete bit space
827 ]{};
828 std::generate_n(rand, engine::state_size, std::ref(dev));
829 std::seed_seq seed(rand, rand + engine::state_size);
830 engine rng(seed);
831 return std::chrono::milliseconds(
832 std::uniform_int_distribution<>(5000, 6000)(rng)
833 );
834 };
835
836 // The bytes check intentionally does not include incoming message size.
837 // This allows for a soft overflow; a single http response will never fail
838 // this check, but multiple responses could. Clients can avoid this case
839 // by reading the entire response before making another request. P2P
840 // should never hit the MAX_BYTES check (when using default values).
841 if (m_state.data.write.queue.size() <= ABSTRACT_SERVER_SEND_QUE_MAX_COUNT &&
842 m_state.data.write.total_bytes <= static_cast<shared_state&>(connection_basic::get_state()).response_soft_limit)
843 return true;
844 m_state.data.write.wait_consume = true;
845 bool success = m_state.condition.wait_for(
846 m_state.lock,
847 random_delay(),
848 [this]{
849 return (
850 m_state.status != status_t::RUNNING ||
851 (
852 m_state.data.write.queue.size() <=
854 m_state.data.write.total_bytes <=
856 )
857 );
858 }
859 );
860 m_state.data.write.wait_consume = false;
861 if (!success) {
863 return false;
864 }
865 else
866 return m_state.status == status_t::RUNNING;
867 };
868 auto wait_sender = [this] {
869 m_state.condition.wait(
870 m_state.lock,
871 [this] {
872 return (
873 m_state.status != status_t::RUNNING ||
874 !m_state.data.write.wait_consume
875 );
876 }
877 );
878 return m_state.status == status_t::RUNNING;
879 };
880 if (!wait_sender())
881 return false;
882 constexpr size_t CHUNK_SIZE = 32 * 1024;
884 message.size() <= 2 * CHUNK_SIZE
885 ) {
886 if (!wait_consume())
887 return false;
888 const std::size_t byte_count = message.size();
889 m_state.data.write.queue.emplace_front(std::move(message));
890 m_state.data.write.total_bytes += byte_count;
891 start_write();
892 }
893 else {
894 while (!message.empty()) {
895 if (!wait_consume())
896 return false;
897 m_state.data.write.queue.emplace_front(
898 message.take_slice(CHUNK_SIZE)
899 );
900 m_state.data.write.total_bytes += m_state.data.write.queue.front().size();
901 start_write();
902 }
903 }
904 m_state.condition.notify_all();
905 return true;
906 }
907
908 template<typename T>
910 bool is_income,
911 bool is_multithreaded,
912 boost::optional<network_address> real_remote
913 )
914 {
915 std::unique_lock<std::mutex> guard(m_state.lock);
916 if (m_state.status != status_t::TERMINATED)
917 return false;
918 if (!real_remote) {
919 ec_t ec;
920 auto endpoint = connection_basic::socket_.next_layer().remote_endpoint(
921 ec
922 );
923 if (ec.value())
924 return false;
925 real_remote = (
926 endpoint.address().is_v6() ?
928 ipv6_network_address{endpoint.address().to_v6(), endpoint.port()}
929 } :
932 uint32_t{
933 boost::asio::detail::socket_ops::host_to_network_long(
934 endpoint.address().to_v4().to_uint()
935 )
936 },
937 endpoint.port()
938 }
939 }
940 );
941 }
942 auto *filter = static_cast<shared_state&>(
944 ).pfilter;
945 if (filter && !filter->is_remote_host_allowed(*real_remote))
946 return false;
947
948 auto *limit = static_cast<shared_state&>(
950 ).plimit;
951 if (is_income && limit && limit->is_host_limit(*real_remote))
952 return false;
953
954 ec_t ec;
955 #if !defined(_WIN32) || !defined(__i686)
956 connection_basic::socket_.next_layer().set_option(
957 boost::asio::detail::socket_option::integer<IPPROTO_IP, IP_TOS>{
959 },
960 ec
961 );
962 if (ec.value())
963 return false;
964 #endif
965 connection_basic::socket_.next_layer().set_option(
966 boost::asio::ip::tcp::no_delay{false},
967 ec
968 );
969 if (ec.value())
970 return false;
971 connection_basic::m_is_multithreaded = is_multithreaded;
972 m_conn_context.set_details(
973 boost::uuids::random_generator()(),
974 *real_remote,
975 is_income,
977 );
978 m_host = real_remote->host_str();
979 try { host_count(1); } catch(...) { /* ignore */ }
980 m_local = real_remote->is_loopback() || real_remote->is_local();
981 m_state.ssl.enabled = (
983 );
984 m_state.ssl.forced = (
986 );
987 m_state.socket.connected = true;
988 m_state.status = status_t::RUNNING;
990 std::chrono::milliseconds(
992 )
993 );
994 m_state.protocol.wait_init = true;
995 guard.unlock();
996 m_handler.after_init_connection();
997 guard.lock();
998 m_state.protocol.wait_init = false;
999 m_state.protocol.initialized = true;
1000 if (m_state.status == status_t::INTERRUPTED)
1002 else if (m_state.status == status_t::TERMINATING)
1004 else if (!is_income || !m_state.ssl.enabled)
1005 start_read();
1006 else
1008 return true;
1009 }
1010
1011 template<typename T>
1013 io_context_t &io_context,
1014 std::shared_ptr<shared_state> shared_state,
1015 t_connection_type connection_type,
1016 ssl_support_t ssl_support,
1017 t_connection_context&& initial
1018 ):
1019 connection(
1020 io_context,
1021 socket_t{io_context},
1022 std::move(shared_state),
1023 connection_type,
1024 ssl_support,
1025 std::move(initial)
1026 )
1027 {
1028 }
1029
1030 template<typename T>
1032 io_context_t &io_context,
1033 socket_t &&socket,
1034 std::shared_ptr<shared_state> shared_state,
1035 t_connection_type connection_type,
1036 ssl_support_t ssl_support,
1037 t_connection_context&& initial
1038 ):
1039 connection_basic(io_context, std::move(socket), shared_state, ssl_support),
1041 m_connection_type(connection_type),
1042 m_io_context{io_context},
1043 m_conn_context(std::move(initial)),
1046 {
1047 }
1048
1049 template<typename T>
1051 {
1052 std::lock_guard<std::mutex> guard(m_state.lock);
1053 assert(m_state.status == status_t::TERMINATED ||
1054 m_state.status == status_t::WASTED ||
1055 m_io_context.stopped()
1056 );
1057 if (m_state.status != status_t::WASTED)
1058 return;
1059 try { host_count(-1); } catch (...) { /* ignore */ }
1060 }
1061
1062 template<typename T>
1064 bool is_income,
1065 bool is_multithreaded
1066 )
1067 {
1068 return start_internal(is_income, is_multithreaded, {});
1069 }
1070
1071 template<typename T>
1073 bool is_income,
1074 bool is_multithreaded,
1075 network_address real_remote
1076 )
1077 {
1078 return start_internal(is_income, is_multithreaded, real_remote);
1079 }
1080
1081 template<typename T>
1083 {
1084 std::lock_guard<std::mutex> guard(m_state.lock);
1085 std::string address;
1086 std::string port;
1087 ec_t ec;
1088 auto endpoint = connection_basic::socket().remote_endpoint(ec);
1089 if (ec.value()) {
1090 address = "<not connected>";
1091 port = "<not connected>";
1092 }
1093 else {
1094 address = endpoint.address().to_string();
1095 port = std::to_string(endpoint.port());
1096 }
1097 MDEBUG(
1098 " connection type " << std::to_string(m_connection_type) <<
1099 " " << connection_basic::socket().local_endpoint().address().to_string() <<
1100 ":" << connection_basic::socket().local_endpoint().port() <<
1101 " <--> " << m_conn_context.m_remote_address.str() <<
1102 " (via " << address << ":" << port << ")"
1103 );
1104 }
1105
1106 template<typename T>
1111
1112 template<typename T>
1114 {
1115 return close();
1116 }
1117
1118 template<typename T>
1120 {
1121 return send(std::move(message));
1122 }
1123
1124 template<typename T>
1126 {
1127 return true;
1128 }
1129
1130 template<typename T>
1132 {
1133 std::lock_guard<std::mutex> guard(m_state.lock);
1134 if (m_state.status != status_t::RUNNING)
1135 return false;
1137 return true;
1138 }
1139
1140 template<typename T>
1142 {
1144 if (!m_io_context.poll_one())
1146 }
1147 else {
1148 if (!m_io_context.run_one())
1149 return false;
1150 }
1151 return true;
1152 }
1153
1154 template<typename T>
1156 {
1157 std::lock_guard<std::mutex> guard(m_state.lock);
1158 if (m_state.status != status_t::RUNNING)
1159 return false;
1161 ++m_state.protocol.wait_callback;
1162 boost::asio::post(connection_basic::strand_, [this, self]{
1163 m_handler.handle_qued_callback();
1164 std::lock_guard<std::mutex> guard(m_state.lock);
1165 --m_state.protocol.wait_callback;
1166 if (m_state.status == status_t::INTERRUPTED)
1168 else if (m_state.status == status_t::TERMINATING)
1170 });
1171 return true;
1172 }
1173
1174 template<typename T>
1179
1180 template<typename T>
1182 {
1183 try {
1185 std::lock_guard<std::mutex> guard(m_state.lock);
1186 this->self = std::move(self);
1187 ++m_state.protocol.reference_counter;
1188 return true;
1189 }
1190 catch (boost::bad_weak_ptr &exception) {
1191 return false;
1192 }
1193 }
1194
1195 template<typename T>
1197 {
1199 std::lock_guard<std::mutex> guard(m_state.lock);
1200 if (!(--m_state.protocol.reference_counter))
1201 self = std::move(this->self);
1202 return true;
1203 }
1204
1205 template<typename T>
1207 {
1208 std::lock_guard<std::mutex> guard(m_state.lock);
1210 }
1211
1212 template<class t_protocol_handler>
1214 m_state(std::make_shared<typename connection<t_protocol_handler>::shared_state>()),
1221 m_threads_count(0),
1222 m_thread_index(0),
1223 m_connection_type( connection_type ),
1226 {
1228 m_thread_name_prefix = "NET";
1229 }
1230
1231 template<class t_protocol_handler>
1232 boosted_tcp_server<t_protocol_handler>::boosted_tcp_server(boost::asio::io_context& extarnal_io_context, t_connection_type connection_type) :
1233 m_state(std::make_shared<typename connection<t_protocol_handler>::shared_state>()),
1234 io_context_(extarnal_io_context),
1239 m_threads_count(0),
1240 m_thread_index(0),
1241 m_connection_type(connection_type),
1244 {
1246 m_thread_name_prefix = "NET";
1247 }
1248 //---------------------------------------------------------------------------------
1249 template<class t_protocol_handler>
1255 //---------------------------------------------------------------------------------
1256 template<class t_protocol_handler>
1263 //---------------------------------------------------------------------------------
1264 template<class t_protocol_handler>
1266 uint32_t port_ipv6, const std::string& address_ipv6, bool use_ipv6, bool require_ipv4,
1267 ssl_options_t ssl_options)
1268 {
1269 TRY_ENTRY();
1270 m_stop_signal_sent = false;
1271 m_port = port;
1272 m_port_ipv6 = port_ipv6;
1274 m_address_ipv6 = address_ipv6;
1275 m_use_ipv6 = use_ipv6;
1276 m_require_ipv4 = require_ipv4;
1277
1278 if (ssl_options)
1279 m_state->configure_ssl(std::move(ssl_options));
1280
1281 std::string ipv4_failed = "";
1282 std::string ipv6_failed = "";
1283
1284 boost::asio::ip::tcp::resolver resolver(io_context_);
1285
1286 try
1287 {
1288 const auto results = resolver.resolve(
1289 address, boost::lexical_cast<std::string>(port), boost::asio::ip::tcp::resolver::canonical_name
1290 );
1291 acceptor_.open(results.begin()->endpoint().protocol());
1292#if !defined(_WIN32)
1293 acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
1294#endif
1295 acceptor_.bind(*results.begin());
1296 acceptor_.listen();
1297 boost::asio::ip::tcp::endpoint binded_endpoint = acceptor_.local_endpoint();
1298 m_port = binded_endpoint.port();
1299 MDEBUG("start accept (IPv4)");
1301 acceptor_.async_accept(new_connection_->socket(),
1303 boost::asio::placeholders::error));
1304 }
1305 catch (const std::exception &e)
1306 {
1307 ipv4_failed = e.what();
1308 }
1309
1310 if (ipv4_failed != "")
1311 {
1312 MERROR("Failed to bind IPv4: " << ipv4_failed);
1313 if (require_ipv4)
1314 {
1315 throw std::runtime_error("Failed to bind IPv4 (set to required)");
1316 }
1317 }
1318
1319 if (use_ipv6)
1320 {
1321 try
1322 {
1323 if (port_ipv6 == 0) port_ipv6 = port; // default arg means bind to same port as ipv4
1324
1325 const auto results = resolver.resolve(
1326 address_ipv6, boost::lexical_cast<std::string>(port_ipv6), boost::asio::ip::tcp::resolver::canonical_name
1327 );
1328
1329 acceptor_ipv6.open(results.begin()->endpoint().protocol());
1330#if !defined(_WIN32)
1331 acceptor_ipv6.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
1332#endif
1333 acceptor_ipv6.set_option(boost::asio::ip::v6_only(true));
1334 acceptor_ipv6.bind(*results.begin());
1335 acceptor_ipv6.listen();
1336 boost::asio::ip::tcp::endpoint binded_endpoint = acceptor_ipv6.local_endpoint();
1337 m_port_ipv6 = binded_endpoint.port();
1338 MDEBUG("start accept (IPv6)");
1340 acceptor_ipv6.async_accept(new_connection_ipv6->socket(),
1342 boost::asio::placeholders::error));
1343 }
1344 catch (const std::exception &e)
1345 {
1346 ipv6_failed = e.what();
1347 }
1348 }
1349
1350 if (use_ipv6 && ipv6_failed != "")
1351 {
1352 MERROR("Failed to bind IPv6: " << ipv6_failed);
1353 if (ipv4_failed != "")
1354 {
1355 throw std::runtime_error("Failed to bind IPv4 and IPv6");
1356 }
1357 }
1358
1359 return true;
1360 }
1361 catch (const std::exception &e)
1362 {
1363 MFATAL("Error starting server: " << e.what());
1364 return false;
1365 }
1366 catch (...)
1367 {
1368 MFATAL("Error starting server");
1369 return false;
1370 }
1371 }
1372 //-----------------------------------------------------------------------------
1373 template<class t_protocol_handler>
1374 bool boosted_tcp_server<t_protocol_handler>::init_server(const std::string port, const std::string& address,
1375 const std::string port_ipv6, const std::string address_ipv6, bool use_ipv6, bool require_ipv4,
1376 ssl_options_t ssl_options)
1377 {
1378 uint32_t p = 0;
1379 uint32_t p_ipv6 = 0;
1380
1381 if (port.size() && !string_tools::get_xtype_from_string(p, port)) {
1382 MERROR("Failed to convert port no = " << port);
1383 return false;
1384 }
1385
1386 if (port_ipv6.size() && !string_tools::get_xtype_from_string(p_ipv6, port_ipv6)) {
1387 MERROR("Failed to convert port no = " << port_ipv6);
1388 return false;
1389 }
1390 return this->init_server(p, address, p_ipv6, address_ipv6, use_ipv6, require_ipv4, std::move(ssl_options));
1391 }
1392 //---------------------------------------------------------------------------------
1393 template<class t_protocol_handler>
1395 {
1396 TRY_ENTRY();
1397 const uint32_t local_thr_index = m_thread_index++; // atomically increment, getting value before increment
1398 std::string thread_name = std::string("[") + m_thread_name_prefix;
1399 thread_name += boost::to_string(local_thr_index) + "]";
1400 MLOG_SET_THREAD_NAME(thread_name);
1401 // _fact("Thread name: " << m_thread_name_prefix);
1402 while(!m_stop_signal_sent)
1403 {
1404 try
1405 {
1406 io_context_.run();
1407 return true;
1408 }
1409 catch(const std::exception& ex)
1410 {
1411 _erro("Exception at server worker thread, what=" << ex.what());
1412 }
1413 catch(...)
1414 {
1415 _erro("Exception at server worker thread, unknown execption");
1416 }
1417 }
1418 //_info("Worker thread finished");
1419 return true;
1420 CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::worker_thread", false);
1421 }
1422 //---------------------------------------------------------------------------------
1423 template<class t_protocol_handler>
1425 {
1426 m_thread_name_prefix = prefix_name;
1427 auto it = server_type_map.find(m_thread_name_prefix);
1428 if (it==server_type_map.end()) throw std::runtime_error("Unknown prefix/server type:" + std::string(prefix_name));
1429 auto connection_type = it->second; // the value of type
1430 MINFO("Set server type to: " << connection_type << " from name: " << m_thread_name_prefix << ", prefix_name = " << prefix_name);
1431 }
1432 //---------------------------------------------------------------------------------
1433 template<class t_protocol_handler>
1435 {
1436 assert(m_state != nullptr); // always set in constructor
1437 m_state->pfilter = pfilter;
1438 }
1439 //---------------------------------------------------------------------------------
1440 template<class t_protocol_handler>
1442 {
1443 assert(m_state != nullptr); // always set in constructor
1444 m_state->plimit = plimit;
1445 }
1446 //---------------------------------------------------------------------------------
1447 template<class t_protocol_handler>
1449 {
1450 assert(m_state != nullptr); // always set in constructor
1451 m_state->response_soft_limit = limit;
1452 }
1453 //---------------------------------------------------------------------------------
1454 template<class t_protocol_handler>
1455 bool boosted_tcp_server<t_protocol_handler>::run_server(size_t threads_count, bool wait, const boost::thread::attributes& attrs)
1456 {
1457 TRY_ENTRY();
1458 m_threads_count = threads_count;
1459 m_main_thread_id = boost::this_thread::get_id();
1460 MLOG_SET_THREAD_NAME("[SRV_MAIN]");
1461 while(!m_stop_signal_sent)
1462 {
1463
1464 // Create a pool of threads to run all of the io_contexts.
1466 for (std::size_t i = 0; i < threads_count; ++i)
1467 {
1468 boost::shared_ptr<boost::thread> thread(new boost::thread(
1469 attrs, boost::bind(&boosted_tcp_server<t_protocol_handler>::worker_thread, this)));
1470 _note("Run server thread name: " << m_thread_name_prefix);
1471 m_threads.push_back(thread);
1472 }
1474 // Wait for all threads in the pool to exit.
1475 if (wait)
1476 {
1477 _fact("JOINING all threads");
1478 for (std::size_t i = 0; i < m_threads.size(); ++i) {
1479 m_threads[i]->join();
1480 }
1481 _fact("JOINING all threads - almost");
1482 m_threads.clear();
1483 _fact("JOINING all threads - DONE");
1484
1485 }
1486 else {
1487 _dbg1("Reiniting OK.");
1488 return true;
1489 }
1490
1491 if(wait && !m_stop_signal_sent)
1492 {
1493 //some problems with the listening socket ?..
1494 _dbg1("Net service stopped without stop request, restarting...");
1496 {
1497 _dbg1("Reiniting service failed, exit.");
1498 return false;
1499 }else
1500 {
1501 _dbg1("Reiniting OK.");
1502 }
1503 }
1504 }
1505 return true;
1506 CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::run_server", false);
1507 }
1508 //---------------------------------------------------------------------------------
1509 template<class t_protocol_handler>
1511 {
1512 TRY_ENTRY();
1514 BOOST_FOREACH(boost::shared_ptr<boost::thread>& thp, m_threads)
1515 {
1516 if(thp->get_id() == boost::this_thread::get_id())
1517 return true;
1518 }
1519 if(m_threads_count == 1 && boost::this_thread::get_id() == m_main_thread_id)
1520 return true;
1521 return false;
1522 CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::is_thread_worker", false);
1523 }
1524 //---------------------------------------------------------------------------------
1525 template<class t_protocol_handler>
1527 {
1528 TRY_ENTRY();
1529 boost::chrono::milliseconds ms(wait_mseconds);
1530 for (std::size_t i = 0; i < m_threads.size(); ++i)
1531 {
1532 if(m_threads[i]->joinable() && !m_threads[i]->try_join_for(ms))
1533 {
1534 _dbg1("Interrupting thread " << m_threads[i]->native_handle());
1535 m_threads[i]->interrupt();
1536 }
1537 }
1538 return true;
1539 CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::timed_wait_server_stop", false);
1540 }
1541 //---------------------------------------------------------------------------------
1542 template<class t_protocol_handler>
1544 {
1545 m_stop_signal_sent = true;
1547 state->stop_signal_sent = true;
1548 TRY_ENTRY();
1549 connections_mutex.lock();
1550 for (auto &c: connections_)
1551 {
1552 c->cancel();
1553 }
1554 connections_.clear();
1555 connections_mutex.unlock();
1556 io_context_.stop();
1557 CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::send_stop_signal()", void());
1558 }
1559 //---------------------------------------------------------------------------------
1560 template<class t_protocol_handler>
1561 void boosted_tcp_server<t_protocol_handler>::handle_accept_ipv4(const boost::system::error_code& e)
1562 {
1563 this->handle_accept(e, false);
1564 }
1565 //---------------------------------------------------------------------------------
1566 template<class t_protocol_handler>
1567 void boosted_tcp_server<t_protocol_handler>::handle_accept_ipv6(const boost::system::error_code& e)
1568 {
1569 this->handle_accept(e, true);
1570 }
1571 //---------------------------------------------------------------------------------
1572 template<class t_protocol_handler>
1573 void boosted_tcp_server<t_protocol_handler>::handle_accept(const boost::system::error_code& e, bool ipv6)
1574 {
1575 MDEBUG("handle_accept");
1576
1577 boost::asio::ip::tcp::acceptor* current_acceptor = &acceptor_;
1578 connection_ptr* current_new_connection = &new_connection_;
1579 auto accept_function_pointer = &boosted_tcp_server<t_protocol_handler>::handle_accept_ipv4;
1580 if (ipv6)
1581 {
1582 current_acceptor = &acceptor_ipv6;
1583 current_new_connection = &new_connection_ipv6;
1585 }
1586
1587 bool accept_started = false;
1588 try
1589 {
1590 if (!e)
1591 {
1593 const char *ssl_message = "unknown";
1594 switch ((*current_new_connection)->get_ssl_support())
1595 {
1596 case epee::net_utils::ssl_support_t::e_ssl_support_disabled: ssl_message = "disabled"; break;
1597 case epee::net_utils::ssl_support_t::e_ssl_support_enabled: ssl_message = "enabled"; break;
1598 case epee::net_utils::ssl_support_t::e_ssl_support_autodetect: ssl_message = "autodetection"; break;
1599 }
1600 MDEBUG("New server for RPC connections, SSL " << ssl_message);
1601 (*current_new_connection)->setRpcStation(); // hopefully this is not needed actually
1602 }
1603 connection_ptr conn(std::move((*current_new_connection)));
1604 (*current_new_connection).reset(new connection<t_protocol_handler>(io_context_, m_state, m_connection_type, conn->get_ssl_support()));
1605 current_acceptor->async_accept((*current_new_connection)->socket(),
1606 boost::bind(accept_function_pointer, this,
1607 boost::asio::placeholders::error));
1608 accept_started = true;
1609
1610 boost::asio::socket_base::keep_alive opt(true);
1611 conn->socket().set_option(opt);
1612
1613 bool res;
1615 res = conn->start(true, 1 < m_threads_count);
1616 else
1617 res = conn->start(true, 1 < m_threads_count, default_remote);
1618 if (!res)
1619 {
1620 conn->cancel();
1621 return;
1622 }
1623 conn->save_dbg_log();
1624 return;
1625 }
1626 else
1627 {
1628 MERROR("Error in boosted_tcp_server<t_protocol_handler>::handle_accept: " << e);
1629 }
1630 }
1631 catch (const std::exception &e)
1632 {
1633 MERROR("Exception in boosted_tcp_server<t_protocol_handler>::handle_accept: " << e.what());
1634 if (accept_started)
1635 return;
1636 }
1637
1638 // error path, if e or exception
1639 assert(m_state != nullptr); // always set in constructor
1640 _erro("Some problems at accept: " << e.message() << ", connections_count = " << m_state->sock_count);
1642 (*current_new_connection).reset(new connection<t_protocol_handler>(io_context_, m_state, m_connection_type, (*current_new_connection)->get_ssl_support()));
1643 current_acceptor->async_accept((*current_new_connection)->socket(),
1644 boost::bind(accept_function_pointer, this,
1645 boost::asio::placeholders::error));
1646 }
1647 //---------------------------------------------------------------------------------
1648 template<class t_protocol_handler>
1650 {
1651 if(std::addressof(get_io_context()) == std::addressof(sock.get_executor().context()))
1652 {
1654 if(conn->start(false, 1 < m_threads_count, std::move(real_remote)))
1655 {
1656 conn->get_context(out);
1657 conn->save_dbg_log();
1658 return true;
1659 }
1660 }
1661 else
1662 {
1663 MWARNING(out << " was not added, socket/io_context mismatch");
1664 }
1665 return false;
1666 }
1667 //---------------------------------------------------------------------------------
1668 template<class t_protocol_handler>
1669 typename boosted_tcp_server<t_protocol_handler>::try_connect_result_t boosted_tcp_server<t_protocol_handler>::try_connect(connection_ptr new_connection_l, const std::string& adr, const std::string& port, boost::asio::ip::tcp::socket &sock_, const boost::asio::ip::tcp::endpoint &remote_endpoint, const std::string &bind_ip, uint32_t conn_timeout, epee::net_utils::ssl_support_t ssl_support)
1670 {
1671 TRY_ENTRY();
1672
1673 sock_.open(remote_endpoint.protocol());
1674 if(bind_ip != "0.0.0.0" && bind_ip != "0" && bind_ip != "" )
1675 {
1676 boost::asio::ip::tcp::endpoint local_endpoint(boost::asio::ip::make_address(bind_ip), 0);
1677 boost::system::error_code ec;
1678 sock_.bind(local_endpoint, ec);
1679 if (ec)
1680 {
1681 MERROR("Error binding to " << bind_ip << ": " << ec.message());
1682 if (sock_.is_open())
1683 sock_.close();
1684 return CONNECT_FAILURE;
1685 }
1686 }
1687
1688 /*
1689 NOTICE: be careful to make sync connection from event handler: in case if all threads suddenly do sync connect, there will be no thread to dispatch events from io service.
1690 */
1691
1692 boost::system::error_code ec = boost::asio::error::would_block;
1693
1694 //have another free thread(s), work in wait mode, without event handling
1695 struct local_async_context
1696 {
1697 boost::system::error_code ec;
1698 boost::mutex connect_mut;
1699 boost::condition_variable cond;
1700 };
1701
1702 boost::shared_ptr<local_async_context> local_shared_context(new local_async_context());
1703 local_shared_context->ec = boost::asio::error::would_block;
1704 boost::unique_lock<boost::mutex> lock(local_shared_context->connect_mut);
1705 auto connect_callback = [](boost::system::error_code ec_, boost::shared_ptr<local_async_context> shared_context)
1706 {
1707 shared_context->connect_mut.lock(); shared_context->ec = ec_; shared_context->cond.notify_one(); shared_context->connect_mut.unlock();
1708 };
1709
1710 sock_.async_connect(remote_endpoint, std::bind<void>(connect_callback, std::placeholders::_1, local_shared_context));
1711 while(local_shared_context->ec == boost::asio::error::would_block)
1712 {
1713 bool r = local_shared_context->cond.timed_wait(lock, boost::get_system_time() + boost::posix_time::milliseconds(conn_timeout));
1715 {
1716 if (sock_.is_open())
1717 sock_.close();
1718 return CONNECT_FAILURE;
1719 }
1720 if(local_shared_context->ec == boost::asio::error::would_block && !r)
1721 {
1722 //timeout
1723 sock_.close();
1724 _dbg3("Failed to connect to " << adr << ":" << port << ", because of timeout (" << conn_timeout << ")");
1725 return CONNECT_FAILURE;
1726 }
1727 }
1728 ec = local_shared_context->ec;
1729
1730 if (ec || !sock_.is_open())
1731 {
1732 _dbg3("Some problems at connect, message: " << ec.message());
1733 if (sock_.is_open())
1734 sock_.close();
1735 return CONNECT_FAILURE;
1736 }
1737
1738 _dbg3("Connected success to " << adr << ':' << port);
1739
1740 const ssl_support_t ssl_support = new_connection_l->get_ssl_support();
1742 {
1743 // Handshake
1744 MDEBUG("Handshaking SSL...");
1745 if (!new_connection_l->handshake(boost::asio::ssl::stream_base::client))
1746 {
1748 {
1749 boost::system::error_code ignored_ec;
1750 sock_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec);
1751 sock_.close();
1752 return CONNECT_NO_SSL;
1753 }
1754 MERROR("SSL handshake failed");
1755 if (sock_.is_open())
1756 sock_.close();
1757 return CONNECT_FAILURE;
1758 }
1759 }
1760
1761 return CONNECT_SUCCESS;
1762
1763 CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::try_connect", CONNECT_FAILURE);
1764 }
1765 //---------------------------------------------------------------------------------
1766 template<class t_protocol_handler>
1767 bool boosted_tcp_server<t_protocol_handler>::connect(const std::string& adr, const std::string& port, uint32_t conn_timeout, t_connection_context& conn_context, const std::string& bind_ip, epee::net_utils::ssl_support_t ssl_support)
1768 {
1769 TRY_ENTRY();
1770
1772 connections_mutex.lock();
1773 connections_.insert(new_connection_l);
1774 MDEBUG("connections_ size now " << connections_.size());
1775 connections_mutex.unlock();
1777 boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket();
1778
1779 bool try_ipv6 = false;
1780
1781 boost::asio::ip::tcp::resolver resolver(io_context_);
1782 boost::asio::ip::tcp::resolver::results_type results{};
1783 boost::system::error_code resolve_error;
1784
1785 try
1786 {
1787 //resolving ipv4 address as ipv6 throws, catch here and move on
1788 results = resolver.resolve(
1789 boost::asio::ip::tcp::v4(), adr, port, boost::asio::ip::tcp::resolver::canonical_name, resolve_error
1790 );
1791 }
1792 catch (const boost::system::system_error& e)
1793 {
1794 if (!m_use_ipv6 || (resolve_error != boost::asio::error::host_not_found &&
1795 resolve_error != boost::asio::error::host_not_found_try_again))
1796 {
1797 throw;
1798 }
1799 try_ipv6 = true;
1800 }
1801 catch (...)
1802 {
1803 throw;
1804 }
1805
1806 std::string bind_ip_to_use;
1807
1808 if(results.empty())
1809 {
1810 if (!m_use_ipv6)
1811 {
1812 _erro("Failed to resolve " << adr);
1813 return false;
1814 }
1815 else
1816 {
1817 try_ipv6 = true;
1818 MINFO("Resolving address as IPv4 failed, trying IPv6");
1819 }
1820 }
1821 else
1822 {
1823 bind_ip_to_use = bind_ip;
1824 }
1825
1826 if (try_ipv6)
1827 {
1828 results = resolver.resolve(
1829 boost::asio::ip::tcp::v6(), adr, port, boost::asio::ip::tcp::resolver::canonical_name, resolve_error
1830 );
1831
1832 if(results.empty())
1833 {
1834 _erro("Failed to resolve " << adr);
1835 return false;
1836 }
1837 else
1838 {
1839 if (bind_ip == "0.0.0.0")
1840 {
1841 bind_ip_to_use = "::";
1842 }
1843 else
1844 {
1845 bind_ip_to_use = "";
1846 }
1847
1848 }
1849
1850 }
1851
1852 const auto iterator = results.begin();
1853
1854 MDEBUG("Trying to connect to " << adr << ":" << port << ", bind_ip = " << bind_ip_to_use);
1855
1856 //boost::asio::ip::tcp::endpoint remote_endpoint(boost::asio::ip::address::from_string(addr.c_str()), port);
1857 boost::asio::ip::tcp::endpoint remote_endpoint(*iterator);
1858
1859 auto try_connect_result = try_connect(new_connection_l, adr, port, sock_, remote_endpoint, bind_ip_to_use, conn_timeout, ssl_support);
1860 if (try_connect_result == CONNECT_FAILURE)
1861 return false;
1862 if (ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect && try_connect_result == CONNECT_NO_SSL)
1863 {
1864 // we connected, but could not connect with SSL, try without
1865 MERROR("SSL handshake failed on an autodetect connection, reconnecting without SSL");
1866 new_connection_l->disable_ssl();
1867 try_connect_result = try_connect(new_connection_l, adr, port, sock_, remote_endpoint, bind_ip_to_use, conn_timeout, epee::net_utils::ssl_support_t::e_ssl_support_disabled);
1868 if (try_connect_result != CONNECT_SUCCESS)
1869 return false;
1870 }
1871
1872 // start adds the connection to the config object's list, so we don't need to have it locally anymore
1873 connections_mutex.lock();
1874 connections_.erase(new_connection_l);
1875 connections_mutex.unlock();
1876 bool r = new_connection_l->start(false, 1 < m_threads_count);
1877 if (r)
1878 {
1879 new_connection_l->get_context(conn_context);
1880 }
1881 else
1882 {
1883 assert(m_state != nullptr); // always set in constructor
1884 _erro("[sock " << new_connection_l->socket().native_handle() << "] Failed to start connection, connections_count = " << m_state->sock_count);
1885 }
1886
1887 new_connection_l->save_dbg_log();
1888
1889 return r;
1890
1891 CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::connect", false);
1892 }
1893 //---------------------------------------------------------------------------------
1894 template<class t_protocol_handler> template<class t_callback>
1895 bool boosted_tcp_server<t_protocol_handler>::connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeout, const t_callback &cb, const std::string& bind_ip, epee::net_utils::ssl_support_t ssl_support, t_connection_context&& initial)
1896 {
1897 TRY_ENTRY();
1898 connection_ptr new_connection_l(new connection<t_protocol_handler>(io_context_, m_state, m_connection_type, ssl_support, std::move(initial)) );
1899 connections_mutex.lock();
1900 connections_.insert(new_connection_l);
1901 MDEBUG("connections_ size now " << connections_.size());
1902 connections_mutex.unlock();
1904 boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket();
1905
1906 bool try_ipv6 = false;
1907
1908 boost::asio::ip::tcp::resolver resolver(io_context_);
1909 boost::asio::ip::tcp::resolver::results_type results{};
1910 boost::system::error_code resolve_error;
1911
1912 try
1913 {
1914 //resolving ipv4 address as ipv6 throws, catch here and move on
1915 results = resolver.resolve(
1916 boost::asio::ip::tcp::v4(), adr, port, boost::asio::ip::tcp::resolver::canonical_name, resolve_error
1917 );
1918 }
1919 catch (const boost::system::system_error& e)
1920 {
1921 if (!m_use_ipv6 || (resolve_error != boost::asio::error::host_not_found &&
1922 resolve_error != boost::asio::error::host_not_found_try_again))
1923 {
1924 throw;
1925 }
1926 try_ipv6 = true;
1927 }
1928 catch (...)
1929 {
1930 throw;
1931 }
1932
1933 if(results.empty())
1934 {
1935 if (!try_ipv6)
1936 {
1937 _erro("Failed to resolve " << adr);
1938 return false;
1939 }
1940 else
1941 {
1942 MINFO("Resolving address as IPv4 failed, trying IPv6");
1943 }
1944 }
1945
1946 if (try_ipv6)
1947 {
1948 results = resolver.resolve(
1949 boost::asio::ip::tcp::v6(), adr, port, boost::asio::ip::tcp::resolver::canonical_name, resolve_error
1950 );
1951
1952 if(results.empty())
1953 {
1954 _erro("Failed to resolve " << adr);
1955 return false;
1956 }
1957 }
1958
1959 boost::asio::ip::tcp::endpoint remote_endpoint(*results.begin());
1960
1961 sock_.open(remote_endpoint.protocol());
1962 if(bind_ip != "0.0.0.0" && bind_ip != "0" && bind_ip != "" )
1963 {
1964 boost::asio::ip::tcp::endpoint local_endpoint(boost::asio::ip::make_address(bind_ip.c_str()), 0);
1965 boost::system::error_code ec;
1966 sock_.bind(local_endpoint, ec);
1967 if (ec)
1968 {
1969 MERROR("Error binding to " << bind_ip << ": " << ec.message());
1970 if (sock_.is_open())
1971 sock_.close();
1972 return false;
1973 }
1974 }
1975
1976 boost::shared_ptr<boost::asio::deadline_timer> sh_deadline(new boost::asio::deadline_timer(io_context_));
1977 //start deadline
1978 sh_deadline->expires_from_now(boost::posix_time::milliseconds(conn_timeout));
1979 sh_deadline->async_wait([=](const boost::system::error_code& error)
1980 {
1981 if(error != boost::asio::error::operation_aborted)
1982 {
1983 _dbg3("Failed to connect to " << adr << ':' << port << ", because of timeout (" << conn_timeout << ")");
1984 new_connection_l->socket().close();
1985 }
1986 });
1987 //start async connect
1988 sock_.async_connect(remote_endpoint, [=](const boost::system::error_code& ec_)
1989 {
1990 t_connection_context conn_context = AUTO_VAL_INIT(conn_context);
1991 boost::system::error_code ignored_ec;
1992 boost::asio::ip::tcp::socket::endpoint_type lep = new_connection_l->socket().local_endpoint(ignored_ec);
1993 if(!ec_)
1994 {//success
1995 if(!sh_deadline->cancel())
1996 {
1997 cb(conn_context, boost::asio::error::operation_aborted);//this mean that deadline timer already queued callback with cancel operation, rare situation
1998 }else
1999 {
2000 _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Connected success to " << adr << ':' << port <<
2001 " from " << lep.address().to_string() << ':' << lep.port());
2002
2003 // start adds the connection to the config object's list, so we don't need to have it locally anymore
2004 connections_mutex.lock();
2005 connections_.erase(new_connection_l);
2006 connections_mutex.unlock();
2007 bool r = new_connection_l->start(false, 1 < m_threads_count);
2008 if (r)
2009 {
2010 new_connection_l->get_context(conn_context);
2011 cb(conn_context, ec_);
2012 }
2013 else
2014 {
2015 _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Failed to start connection to " << adr << ':' << port);
2016 cb(conn_context, boost::asio::error::fault);
2017 }
2018 }
2019 }else
2020 {
2021 _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Failed to connect to " << adr << ':' << port <<
2022 " from " << lep.address().to_string() << ':' << lep.port() << ": " << ec_.message() << ':' << ec_.value());
2023 cb(conn_context, ec_);
2024 }
2025 });
2026 return true;
2027 CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::connect_async", false);
2028 }
2029
2030} // namespace
2031} // namespace
#define ABSTRACT_SERVER_SEND_QUE_MAX_COUNT
Definition abstract_tcp_server2.h:67
#define NEW_CONNECTION_TIMEOUT_LOCAL
Definition abstract_tcp_server2.inl:59
#define NEW_CONNECTION_TIMEOUT_REMOTE
Definition abstract_tcp_server2.inl:60
#define TIMEOUT_EXTRA_MS_PER_BYTE
Definition abstract_tcp_server2.inl:63
#define AGGRESSIVE_TIMEOUT_THRESHOLD
Definition abstract_tcp_server2.inl:58
#define DEFAULT_TIMEOUT_MS_LOCAL
Definition abstract_tcp_server2.inl:61
#define DEFAULT_TIMEOUT_MS_REMOTE
Definition abstract_tcp_server2.inl:62
static void close()
Definition blockchain_blackball.cpp:279
Definition byte_slice.h:69
boost::asio::ip::tcp::acceptor acceptor_ipv6
Definition abstract_tcp_server2.h:522
boost::shared_ptr< connection< t_protocol_handler > > connection_ptr
Definition abstract_tcp_server2.h:356
uint32_t m_port
Definition abstract_tcp_server2.h:526
size_t m_threads_count
Definition abstract_tcp_server2.h:533
void set_connection_limit(i_connection_limit *plimit)
Definition abstract_tcp_server2.inl:1441
void handle_accept_ipv6(const boost::system::error_code &e)
Definition abstract_tcp_server2.inl:1567
bool m_require_ipv4
Definition abstract_tcp_server2.h:531
std::map< std::string, t_connection_type > server_type_map
Definition abstract_tcp_server2.h:365
std::vector< boost::shared_ptr< boost::thread > > m_threads
Definition abstract_tcp_server2.h:534
boost::asio::ip::tcp::acceptor acceptor_
Acceptor used to listen for incoming connections.
Definition abstract_tcp_server2.h:521
try_connect_result_t
Definition abstract_tcp_server2.h:349
@ CONNECT_FAILURE
Definition abstract_tcp_server2.h:351
@ CONNECT_NO_SSL
Definition abstract_tcp_server2.h:352
@ CONNECT_SUCCESS
Definition abstract_tcp_server2.h:350
std::atomic< bool > m_stop_signal_sent
Definition abstract_tcp_server2.h:525
~boosted_tcp_server()
Definition abstract_tcp_server2.inl:1250
bool connect(const std::string &adr, const std::string &port, uint32_t conn_timeot, t_connection_context &cn, const std::string &bind_ip="0.0.0.0", epee::net_utils::ssl_support_t ssl_support=epee::net_utils::ssl_support_t::e_ssl_support_autodetect)
Definition abstract_tcp_server2.inl:1767
bool timed_wait_server_stop(uint64_t wait_mseconds)
wait for service workers stop
Definition abstract_tcp_server2.inl:1526
connection_ptr new_connection_
The next connection to be accepted.
Definition abstract_tcp_server2.h:542
boost::asio::io_context & get_io_context()
Definition abstract_tcp_server2.h:437
t_protocol_handler::connection_context t_connection_context
Definition abstract_tcp_server2.h:357
boosted_tcp_server(t_connection_type connection_type)
Definition abstract_tcp_server2.inl:1213
try_connect_result_t try_connect(connection_ptr new_connection_l, const std::string &adr, const std::string &port, boost::asio::ip::tcp::socket &sock_, const boost::asio::ip::tcp::endpoint &remote_endpoint, const std::string &bind_ip, uint32_t conn_timeout, epee::net_utils::ssl_support_t ssl_support)
Definition abstract_tcp_server2.inl:1669
boost::mutex connections_mutex
Definition abstract_tcp_server2.h:546
void handle_accept_ipv4(const boost::system::error_code &e)
Handle completion of an asynchronous accept operation.
Definition abstract_tcp_server2.inl:1561
connection_ptr new_connection_ipv6
Definition abstract_tcp_server2.h:543
bool worker_thread()
Run the server's io_context loop.
Definition abstract_tcp_server2.inl:1394
void create_server_type_map()
Definition abstract_tcp_server2.inl:1257
bool add_connection(t_connection_context &out, boost::asio::ip::tcp::socket &&sock, network_address real_remote, epee::net_utils::ssl_support_t ssl_support=epee::net_utils::ssl_support_t::e_ssl_support_autodetect)
Definition abstract_tcp_server2.inl:1649
std::string m_address_ipv6
Definition abstract_tcp_server2.h:529
void send_stop_signal()
Stop the server.
Definition abstract_tcp_server2.inl:1543
critical_section m_threads_lock
Definition abstract_tcp_server2.h:536
void set_connection_filter(i_connection_filter *pfilter)
Definition abstract_tcp_server2.inl:1434
t_connection_type m_connection_type
Definition abstract_tcp_server2.h:539
std::string m_address
Definition abstract_tcp_server2.h:528
std::string m_thread_name_prefix
Definition abstract_tcp_server2.h:532
epee::net_utils::network_address default_remote
Definition abstract_tcp_server2.h:523
bool run_server(size_t threads_count, bool wait=true, const boost::thread::attributes &attrs=boost::thread::attributes())
Run the server's io_context loop.
Definition abstract_tcp_server2.inl:1455
void handle_accept(const boost::system::error_code &e, bool ipv6=false)
Definition abstract_tcp_server2.inl:1573
bool connect_async(const std::string &adr, const std::string &port, uint32_t conn_timeot, const t_callback &cb, const std::string &bind_ip="0.0.0.0", epee::net_utils::ssl_support_t ssl_support=epee::net_utils::ssl_support_t::e_ssl_support_autodetect, t_connection_context &&initial=t_connection_context{})
void set_threads_prefix(const std::string &prefix_name)
Definition abstract_tcp_server2.inl:1424
bool init_server(uint32_t port, const std::string &address="0.0.0.0", uint32_t port_ipv6=0, const std::string &address_ipv6="::", bool use_ipv6=false, bool require_ipv4=true, ssl_options_t ssl_options=ssl_support_t::e_ssl_support_autodetect)
Definition abstract_tcp_server2.inl:1265
boost::asio::io_context & io_context_
Definition abstract_tcp_server2.h:518
std::unique_ptr< worker > m_io_context_local_instance
Definition abstract_tcp_server2.h:517
uint32_t m_port_ipv6
Definition abstract_tcp_server2.h:527
void set_response_soft_limit(std::size_t limit)
Definition abstract_tcp_server2.inl:1448
bool m_use_ipv6
Definition abstract_tcp_server2.h:530
boost::thread::id m_main_thread_id
Definition abstract_tcp_server2.h:535
std::set< connection_ptr > connections_
Definition abstract_tcp_server2.h:547
bool is_thread_worker()
Definition abstract_tcp_server2.inl:1510
const std::shared_ptr< typename connection< epee::net_utils::http::http_custom_handler< epee::net_utils::connection_context_base > >::shared_state > m_state
Definition abstract_tcp_server2.h:505
std::atomic< uint32_t > m_thread_index
Definition abstract_tcp_server2.h:537
std::atomic< long > sock_count
Definition connection_basic.hpp:67
boost::asio::ssl::stream< boost::asio::ip::tcp::socket > socket_
Socket for the connection.
Definition connection_basic.hpp:117
static int get_tos_flag()
Definition connection_basic.cpp:226
boost::asio::io_context::strand strand_
Strand to ensure the connection's handlers are not called concurrently.
Definition connection_basic.hpp:115
void logger_handle_net_read(size_t size)
Definition connection_basic.cpp:270
boost::asio::ip::tcp::socket & socket()
Definition connection_basic.hpp:130
ssl_support_t m_ssl_support
Definition connection_basic.hpp:118
connection_basic_shared_state & get_state() noexcept
Definition connection_basic.hpp:128
volatile bool m_is_multithreaded
Definition connection_basic.hpp:113
connection_basic(boost::asio::io_context &context, boost::asio::ip::tcp::socket &&sock, std::shared_ptr< connection_basic_shared_state > state, ssl_support_t ssl_support)
Definition connection_basic.cpp:124
Represents a single connection from a client.
Definition abstract_tcp_server2.h:100
void setRpcStation()
Definition abstract_tcp_server2.inl:1206
virtual bool call_run_once_service_io()
Definition abstract_tcp_server2.inl:1141
boost::asio::ip::tcp::socket socket_t
Definition abstract_tcp_server2.h:114
io_context_t & m_io_context
Definition abstract_tcp_server2.h:262
void terminate()
Definition abstract_tcp_server2.inl:742
virtual bool send_done()
Definition abstract_tcp_server2.inl:1125
void start_timer(duration_t duration, bool add={})
Definition abstract_tcp_server2.inl:144
t_protocol_handler::connection_context t_connection_context
Definition abstract_tcp_server2.h:102
void cancel_handler()
Definition abstract_tcp_server2.inl:667
boost::asio::io_context io_context_t
Definition abstract_tcp_server2.h:112
void async_wait_timer()
Definition abstract_tcp_server2.inl:168
bool m_local
Definition abstract_tcp_server2.h:268
void start_write()
Definition abstract_tcp_server2.inl:469
t_protocol_handler m_handler
Definition abstract_tcp_server2.h:271
void start_read()
Definition abstract_tcp_server2.inl:305
void interrupt()
Definition abstract_tcp_server2.inl:684
connection(io_context_t &io_context, std::shared_ptr< shared_state > state, t_connection_type connection_type, epee::net_utils::ssl_support_t ssl_support, t_connection_context &&initial=t_connection_context{})
Construct a connection with the given io_context.
Definition abstract_tcp_server2.inl:1012
void on_terminating()
Definition abstract_tcp_server2.inl:757
virtual bool do_send(byte_slice message)
(see do_send from i_service_endpoint)
Definition abstract_tcp_server2.inl:1119
bool send(epee::byte_slice message)
Definition abstract_tcp_server2.inl:811
virtual bool release()
Definition abstract_tcp_server2.inl:1196
void finish_read(size_t bytes_transferred)
Definition abstract_tcp_server2.inl:413
virtual bool request_callback()
Definition abstract_tcp_server2.inl:1155
bool speed_limit_is_enabled() const
tells us should we be sleeping here (e.g. do not sleep on RPC connections)
Definition abstract_tcp_server2.inl:1107
boost::shared_ptr< connection_t > connection_ptr
Definition abstract_tcp_server2.h:105
void on_interrupted()
Definition abstract_tcp_server2.inl:697
std::string m_host
Definition abstract_tcp_server2.h:269
void start_handshake()
Definition abstract_tcp_server2.inl:209
virtual io_context_t & get_io_context()
Definition abstract_tcp_server2.inl:1175
timers_t m_timers
Definition abstract_tcp_server2.h:266
bool start_internal(bool is_income, bool is_multithreaded, boost::optional< network_address > real_remote)
Definition abstract_tcp_server2.inl:909
@ RUNNING
Definition abstract_tcp_server2.h:154
@ INTERRUPTED
Definition abstract_tcp_server2.h:155
@ TERMINATING
Definition abstract_tcp_server2.h:156
@ WASTED
Definition abstract_tcp_server2.h:157
@ TERMINATED
Definition abstract_tcp_server2.h:153
void terminate_async()
Definition abstract_tcp_server2.inl:800
duration_t get_timeout_from_bytes_read(size_t bytes) const
Definition abstract_tcp_server2.inl:115
virtual ~connection() noexcept(false)
Definition abstract_tcp_server2.inl:1050
void cancel_timer()
Definition abstract_tcp_server2.inl:199
void state_status_check()
Definition abstract_tcp_server2.inl:125
state_t m_state
Definition abstract_tcp_server2.h:270
strand_t m_strand
Definition abstract_tcp_server2.h:265
void start_shutdown()
Definition abstract_tcp_server2.inl:599
epee::net_utils::ssl_support_t ssl_support_t
Definition abstract_tcp_server2.h:106
void cancel_socket()
Definition abstract_tcp_server2.inl:641
t_connection_context m_conn_context
Definition abstract_tcp_server2.h:264
void save_dbg_log()
Definition abstract_tcp_server2.inl:1082
virtual bool add_ref()
Definition abstract_tcp_server2.inl:1181
unsigned int host_count(int delta=0)
Definition abstract_tcp_server2.inl:81
bool start(bool is_income, bool is_multithreaded)
Start the first asynchronous operation for the connection.
Definition abstract_tcp_server2.inl:1063
timer_t::duration duration_t
Definition abstract_tcp_server2.h:108
bool cancel()
Definition abstract_tcp_server2.inl:1113
t_connection_type m_connection_type
Definition abstract_tcp_server2.h:263
connection_ptr self
Definition abstract_tcp_server2.h:267
virtual bool close()
Definition abstract_tcp_server2.inl:1131
duration_t get_default_timeout()
Definition abstract_tcp_server2.inl:98
boost::system::error_code ec_t
Definition abstract_tcp_server2.h:109
Definition net_utils_base.h:69
Definition net_utils_base.h:172
Definition net_utils_base.h:225
static boost::mutex m_lock_get_global_throttle_in
Definition network_throttle.hpp:107
static i_network_throttle & get_global_throttle_in()
singleton ; for friend class ; caller MUST use proper locks! like m_lock_get_global_throttle_in
Definition network_throttle.cpp:76
static i_network_throttle & get_global_throttle_out()
ditto ; use lock ... use m_lock_get_global_throttle_out obviously
Definition network_throttle.cpp:89
static boost::mutex m_lock_get_global_throttle_out
Definition network_throttle.hpp:109
Definition net_ssl.h:77
const uint8_t seed[32]
Definition code-generator.cpp:37
bool success
Definition cold-transaction.cpp:57
std::unique_ptr< test_connection > conn(new test_connection(io_service, m_handler_config))
#define false
const char * res
Definition hmac_keccak.cpp:42
static int dev
Definition ipfrdr.c:126
uint32_t address
Definition getifaddr.c:269
#define AUTO_VAL_INIT(v)
Definition misc_language.h:36
boost::shared_ptr< call_befor_die_base > auto_scope_leave_caller
Definition misc_language.h:80
bool sleep_no_w(long ms)
Definition misc_language.cpp:35
auto_scope_leave_caller create_scope_leave_handler(t_scope_leave_handler f)
Definition misc_language.h:97
T & check_and_get(std::shared_ptr< T > &ptr)
Definition abstract_tcp_server2.inl:71
@ ipv6
Definition enums.h:44
@ invalid
Definition enums.h:42
std::string to_string(t_connection_type type)
Definition connection_basic.cpp:70
bool is_ssl(const unsigned char *data, size_t len)
Definition net_ssl.cpp:424
constexpr size_t get_ssl_magic_size()
Definition net_ssl.h:150
t_connection_type
Definition connection_basic.hpp:93
@ e_connection_type_NET
Definition connection_basic.hpp:94
@ e_connection_type_RPC
Definition connection_basic.hpp:95
@ e_connection_type_P2P
Definition connection_basic.hpp:96
ssl_support_t
Definition net_ssl.h:49
@ e_ssl_support_disabled
Definition net_ssl.h:50
@ e_ssl_support_autodetect
Definition net_ssl.h:52
@ e_ssl_support_enabled
Definition net_ssl.h:51
PUSH_WARNINGS bool get_xtype_from_string(OUT XType &val, const std::string &str_id)
Definition string_tools_lexical.h:45
TODO: (mj-xmr) This will be reduced in an another PR.
Definition byte_slice.h:40
int time
Definition gen_wide_data.py:40
Definition speed.py:1
Definition enums.h:68
if(!cryptonote::get_account_address_from_str_or_url(info, cryptonote::TESTNET, "9uVsvEryzpN8WH2t1WWhFFCG5tS8cBNdmJYNRuckLENFimfauV5pZKeS1P2CbxGkSDTUPHXWwiYE5ZGSXDAGbaZgDxobqDN"))
Definition signature.cpp:53
unsigned int uint32_t
Definition stdint.h:126
unsigned __int64 uint64_t
Definition stdint.h:136
The io_context used to perform asynchronous operations.
Definition abstract_tcp_server2.h:509
Definition abstract_tcp_server2.h:274
std::size_t response_soft_limit
Definition abstract_tcp_server2.h:286
Definition abstract_tcp_server2.h:76
Definition abstract_tcp_server2.h:83
Definition blake256.h:36
#define CRITICAL_REGION_LOCAL(x)
Definition syncobj.h:153
#define CRITICAL_REGION_END()
Definition syncobj.h:158
#define CRITICAL_REGION_BEGIN(x)
Definition syncobj.h:154
#define T(x)