Electroneum
Loading...
Searching...
No Matches
clt.cpp
Go to the documentation of this file.
1// Copyrights(c) 2017-2021, The Electroneum Project
2// Copyrights(c) 2014-2019, The Monero Project
3//
4// All rights reserved.
5//
6// Redistribution and use in source and binary forms, with or without modification, are
7// permitted provided that the following conditions are met:
8//
9// 1. Redistributions of source code must retain the above copyright notice, this list of
10// conditions and the following disclaimer.
11//
12// 2. Redistributions in binary form must reproduce the above copyright notice, this list
13// of conditions and the following disclaimer in the documentation and/or other
14// materials provided with the distribution.
15//
16// 3. Neither the name of the copyright holder nor the names of its contributors may be
17// used to endorse or promote products derived from this software without specific
18// prior written permission.
19//
20// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
21// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
22// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
23// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
25// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
27// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
28// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29//
30// Parts of this file are originally copyright (c) 2012-2013 The Cryptonote developers
31
32#include <atomic>
33#include <chrono>
34#include <functional>
35#include <numeric>
36#include <boost/thread/thread.hpp>
37#include <vector>
38
39#include "gtest/gtest.h"
40
41#include "include_base_utils.h"
42#include "misc_language.h"
43#include "misc_log_ex.h"
45#include "common/util.h"
46
47#include "net_load_tests.h"
48
49using namespace net_load_tests;
50
51namespace
52{
53 const size_t CONNECTION_COUNT = 100000;
54 const size_t CONNECTION_TIMEOUT = 10000;
55 const size_t DEFAULT_OPERATION_TIMEOUT = 30000;
56 const size_t RESERVED_CONN_CNT = 1;
57
58 template<typename t_predicate>
59 bool busy_wait_for(size_t timeout_ms, const t_predicate& predicate, size_t sleep_ms = 10)
60 {
61 for (size_t i = 0; i < timeout_ms / sleep_ms; ++i)
62 {
63 if (predicate())
64 return true;
65 //std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
66 epee::misc_utils::sleep_no_w(static_cast<long>(sleep_ms));
67 }
68 return false;
69 }
70
71 class t_connection_opener_1
72 {
73 public:
74 t_connection_opener_1(test_tcp_server& tcp_server, size_t open_request_target)
75 : m_tcp_server(tcp_server)
76 , m_open_request_target(open_request_target)
77 , m_next_id(0)
78 , m_error_count(0)
79 , m_connections(open_request_target)
80 {
81 for (auto& conn_id : m_connections)
82 conn_id = boost::uuids::nil_uuid();
83 }
84
85 bool open()
86 {
87 size_t id = m_next_id.fetch_add(1, std::memory_order_relaxed);
88 if (m_open_request_target <= id)
89 return false;
90
91 bool r = m_tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [=](const test_connection_context& context, const boost::system::error_code& ec) {
92 if (!ec)
93 {
94 m_connections[id] = context.m_connection_id;
95 }
96 else
97 {
98 m_error_count.fetch_add(1, std::memory_order_relaxed);
99 }
100 });
101
102 if (!r)
103 {
104 m_error_count.fetch_add(1, std::memory_order_relaxed);
105 }
106
107 return true;
108 }
109
110 bool close(size_t id)
111 {
112 if (!m_connections[id].is_nil())
113 {
114 m_tcp_server.get_config_object().close(m_connections[id]);
115 return true;
116 }
117 else
118 {
119 return false;
120 }
121 }
122
123 size_t error_count() const { return m_error_count.load(std::memory_order_relaxed); }
124
125 private:
126 test_tcp_server& m_tcp_server;
127 size_t m_open_request_target;
128 std::atomic<size_t> m_next_id;
129 std::atomic<size_t> m_error_count;
130 std::vector<boost::uuids::uuid> m_connections;
131 };
132
133 class t_connection_opener_2
134 {
135 public:
136 t_connection_opener_2(test_tcp_server& tcp_server, size_t open_request_target, size_t max_opened_connection_count)
137 : m_tcp_server(tcp_server)
138 , m_open_request_target(open_request_target)
139 , m_open_request_count(0)
140 , m_error_count(0)
141 , m_open_close_test_helper(tcp_server, open_request_target, max_opened_connection_count)
142 {
143 }
144
145 bool open_and_close()
146 {
147 size_t req_count = m_open_request_count.fetch_add(1, std::memory_order_relaxed);
148 if (m_open_request_target <= req_count)
149 return false;
150
151 bool r = m_tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [=](const test_connection_context& context, const boost::system::error_code& ec) {
152 if (!ec)
153 {
154 m_open_close_test_helper.handle_new_connection(context.m_connection_id);
155 }
156 else
157 {
158 m_error_count.fetch_add(1, std::memory_order_relaxed);
159 }
160 });
161
162 if (!r)
163 {
164 m_error_count.fetch_add(1, std::memory_order_relaxed);
165 }
166
167 return true;
168 }
169
170 void close_remaining_connections()
171 {
172 m_open_close_test_helper.close_remaining_connections();
173 }
174
175 size_t opened_connection_count() const { return m_open_close_test_helper.opened_connection_count(); }
176 size_t error_count() const { return m_error_count.load(std::memory_order_relaxed); }
177
178 private:
179 test_tcp_server& m_tcp_server;
180 size_t m_open_request_target;
181 std::atomic<size_t> m_open_request_count;
182 std::atomic<size_t> m_error_count;
183 open_close_test_helper m_open_close_test_helper;
184 };
185
186 class net_load_test_clt : public ::testing::Test
187 {
188 public:
189 net_load_test_clt()
190 : m_tcp_server(epee::net_utils::e_connection_type_RPC) // RPC disables network limit for unit tests
191 {
192 }
193 protected:
194 virtual void SetUp()
195 {
196 m_thread_count = (std::max)(min_thread_count, boost::thread::hardware_concurrency() / 2);
197
198 m_tcp_server.get_config_object().set_handler(&m_commands_handler);
199 m_tcp_server.get_config_object().m_invoke_timeout = CONNECTION_TIMEOUT;
200
201 ASSERT_TRUE(m_tcp_server.init_server(clt_port, "127.0.0.1"));
202 ASSERT_TRUE(m_tcp_server.run_server(m_thread_count, false));
203
204 // Connect to server
205 std::atomic<int> conn_status(0);
206 m_cmd_conn_id = boost::uuids::nil_uuid();
207 ASSERT_TRUE(m_tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [&](const test_connection_context& context, const boost::system::error_code& ec) {
208 if (!ec)
209 {
210 m_cmd_conn_id = context.m_connection_id;
211 }
212 else
213 {
214 LOG_ERROR("Connection error: " << ec.message());
215 }
216 conn_status.store(1, std::memory_order_seq_cst);
217 }));
218
219 EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) << "connect_async timed out";
220 ASSERT_EQ(1, conn_status.load(std::memory_order_seq_cst));
221 ASSERT_FALSE(m_cmd_conn_id.is_nil());
222
223 conn_status.store(0, std::memory_order_seq_cst);
224 CMD_RESET_STATISTICS::request req;
226 m_tcp_server.get_config_object(), [&](int code, const CMD_RESET_STATISTICS::response& rsp, const test_connection_context&) {
227 conn_status.store(code, std::memory_order_seq_cst);
228 }));
229
230 EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) << "reset statistics timed out";
231 ASSERT_LT(0, conn_status.load(std::memory_order_seq_cst));
232 }
233
234 virtual void TearDown()
235 {
236 m_tcp_server.send_stop_signal();
237 ASSERT_TRUE(m_tcp_server.timed_wait_server_stop(DEFAULT_OPERATION_TIMEOUT));
238 }
239
240 static void TearDownTestCase()
241 {
242 // Stop server
243 test_levin_commands_handler *commands_handler_ptr = new test_levin_commands_handler();
244 test_levin_commands_handler &commands_handler = *commands_handler_ptr;
246 tcp_server.get_config_object().set_handler(commands_handler_ptr, [](epee::levin::levin_commands_handler<test_connection_context> *handler)->void { delete handler; });
247 tcp_server.get_config_object().m_invoke_timeout = CONNECTION_TIMEOUT;
248
249 if (!tcp_server.init_server(clt_port, "127.0.0.1")) return;
250 if (!tcp_server.run_server(2, false)) return;
251
252 // Connect to server and invoke shutdown command
253 std::atomic<int> conn_status(0);
254 boost::uuids::uuid cmd_conn_id = boost::uuids::nil_uuid();
255 tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [&](const test_connection_context& context, const boost::system::error_code& ec) {
256 cmd_conn_id = context.m_connection_id;
257 conn_status.store(!ec ? 1 : -1, std::memory_order_seq_cst);
258 });
259
260 if (!busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) return;
261 if (1 != conn_status.load(std::memory_order_seq_cst)) return;
262
264
265 busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != commands_handler.close_connection_counter(); });
266 }
267
268 template<typename Func>
269 static auto call_func(size_t /*thread_index*/, const Func& func, int) -> decltype(func())
270 {
271 func();
272 }
273
274 template<typename Func>
275 static auto call_func(size_t thread_index, const Func& func, long) -> decltype(func(thread_index))
276 {
277 func(thread_index);
278 }
279
280 template<typename Func>
281 void parallel_exec(const Func& func)
282 {
283 unit_test::call_counter properly_finished_threads;
284 std::vector<boost::thread> threads(m_thread_count);
285 for (size_t i = 0; i < threads.size(); ++i)
286 {
287 threads[i] = boost::thread([&, i] {
288 call_func(i, func, 0);
289 properly_finished_threads.inc();
290 });
291 }
292
293 for (auto& th : threads)
294 th.join();
295
296 ASSERT_EQ(properly_finished_threads.get(), m_thread_count);
297 }
298
299 void get_server_statistics(CMD_GET_STATISTICS::response& statistics)
300 {
301 std::atomic<int> req_status(0);
304 m_tcp_server.get_config_object(), [&](int code, const CMD_GET_STATISTICS::response& rsp, const test_connection_context&) {
305 if (0 < code)
306 {
307 statistics = rsp;
308 }
309 else
310 {
311 LOG_ERROR("Get server statistics error: " << code);
312 }
313 req_status.store(0 < code ? 1 : -1, std::memory_order_seq_cst);
314 }));
315
316 EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != req_status.load(std::memory_order_seq_cst); })) << "get_server_statistics timed out";
317 ASSERT_EQ(1, req_status.load(std::memory_order_seq_cst));
318 }
319
320 template <typename t_predicate>
321 bool busy_wait_for_server_statistics(CMD_GET_STATISTICS::response& statistics, const t_predicate& predicate)
322 {
323 for (size_t i = 0; i < 30; ++i)
324 {
325 get_server_statistics(statistics);
326 if (predicate(statistics))
327 {
328 return true;
329 }
330
331 //std::this_thread::sleep_for(std::chrono::seconds(1));
333 }
334
335 return false;
336 }
337
338 void ask_for_data_requests(size_t request_size = 0)
339 {
341 req.request_size = request_size;
342 epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_SEND_DATA_REQUESTS::ID, req, m_tcp_server.get_config_object());
343 }
344
345 protected:
346 test_tcp_server m_tcp_server;
347 test_levin_commands_handler m_commands_handler;
348 size_t m_thread_count;
349 boost::uuids::uuid m_cmd_conn_id;
350 };
351}
352
353TEST_F(net_load_test_clt, a_lot_of_client_connections_and_connections_closed_by_client)
354{
355 // Open connections
356 t_connection_opener_1 connection_opener(m_tcp_server, CONNECTION_COUNT);
357 parallel_exec([&] {
358 while (connection_opener.open());
359 });
360
361 // Wait for all open requests to complete
362 EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
363 LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
364 " / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")");
365
366 // Check
367 ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
368 ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
369 ASSERT_EQ(m_commands_handler.new_connection_counter() - m_commands_handler.close_connection_counter(), m_tcp_server.get_config_object().get_connections_count());
370
371 // Close connections
372 parallel_exec([&](size_t thread_idx) {
373 for (size_t i = thread_idx; i < CONNECTION_COUNT; i += m_thread_count)
374 {
375 connection_opener.close(i);
376 }
377 });
378
379 // Wait for all opened connections to close
380 EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); }));
381 LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
382 " / " << m_commands_handler.close_connection_counter());
383
384 // Check all connections are closed
385 ASSERT_EQ(m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT, m_commands_handler.close_connection_counter());
386 ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
387
388 // Wait for server to handle all open and close requests
390 busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
391 LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
392
393 // Check server status
394 // It's OK, if server didn't close all opened connections, because of it could receive not all FIN packets
395 ASSERT_LE(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
396 ASSERT_LE(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
397
398 // Request data from server, it causes to close rest connections
399 ask_for_data_requests();
400
401 // Wait for server to close rest connections
402 busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
403 LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
404
405 // Check server status. All connections should be closed
406 ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
407 ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
408}
409
410TEST_F(net_load_test_clt, a_lot_of_client_connections_and_connections_closed_by_server)
411{
412 // Open connections
413 t_connection_opener_1 connection_opener(m_tcp_server, CONNECTION_COUNT);
414 parallel_exec([&] {
415 while (connection_opener.open());
416 });
417
418 // Wait for all open requests to complete
419 EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
420 LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
421 " / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")");
422
423 // Check
424 ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
425 ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
426 ASSERT_EQ(m_commands_handler.new_connection_counter() - m_commands_handler.close_connection_counter(), m_tcp_server.get_config_object().get_connections_count());
427
428 // Wait for server accepts all connections
430 int last_new_connection_counter = -1;
431 busy_wait_for_server_statistics(srv_stat, [&last_new_connection_counter](const CMD_GET_STATISTICS::response& stat) {
432 if (last_new_connection_counter == static_cast<int>(stat.new_connection_counter)) return true;
433 else { last_new_connection_counter = static_cast<int>(stat.new_connection_counter); return false; }
434 });
435
436 // Close connections
438 ASSERT_TRUE(epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_CLOSE_ALL_CONNECTIONS::ID, req, m_tcp_server.get_config_object()));
439
440 // Wait for all opened connections to close
441 busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); });
442 LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
443 " / " << m_commands_handler.close_connection_counter());
444
445 // It's OK, if server didn't close all connections, because it could accept not all our connections
446 ASSERT_LE(m_commands_handler.close_connection_counter(), m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT);
447 ASSERT_LE(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
448
449 // Wait for server to handle all open and close requests
450 busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
451 LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
452
453 // Check server status
454 ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
455 ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
456
457 // Close rest connections
458 m_tcp_server.get_config_object().foreach_connection([&](test_connection_context& ctx) {
459 if (ctx.m_connection_id != m_cmd_conn_id)
460 {
463 m_tcp_server.get_config_object(), [=](int code, const CMD_DATA_REQUEST::response& rsp, const test_connection_context&) {
464 if (code <= 0)
465 {
466 LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST. code = " << code);
467 }
468 });
469 if (!r)
470 LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST");
471 }
472 return true;
473 });
474
475 // Wait for all opened connections to close
476 EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); }));
477 LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
478 " / " << m_commands_handler.close_connection_counter());
479
480 // Check
481 ASSERT_EQ(m_commands_handler.close_connection_counter(), m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT);
482 ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
483}
484
485TEST_F(net_load_test_clt, permament_open_and_close_and_connections_closed_by_client)
486{
487 static const size_t MAX_OPENED_CONN_COUNT = 100;
488
489 // Open/close connections
490 t_connection_opener_2 connection_opener(m_tcp_server, CONNECTION_COUNT, MAX_OPENED_CONN_COUNT);
491 parallel_exec([&] {
492 while (connection_opener.open_and_close());
493 });
494
495 // Wait for all open requests to complete
496 EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
497 LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
498 " / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")");
499
500 // Check
501 ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
502 ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
503
504 // Wait for all close requests to complete
505 EXPECT_TRUE(busy_wait_for(4 * DEFAULT_OPERATION_TIMEOUT, [&](){ return connection_opener.opened_connection_count() <= MAX_OPENED_CONN_COUNT; }));
506 LOG_PRINT_L0("actual number of opened connections: " << connection_opener.opened_connection_count());
507
508 // Check
509 ASSERT_EQ(MAX_OPENED_CONN_COUNT, connection_opener.opened_connection_count());
510
511 connection_opener.close_remaining_connections();
512
513 // Wait for all close requests to complete
514 EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() <= m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT; }));
515 LOG_PRINT_L0("actual number of opened connections: " << connection_opener.opened_connection_count());
516
517 ASSERT_EQ(m_commands_handler.new_connection_counter(), m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT);
518 ASSERT_EQ(0, connection_opener.opened_connection_count());
519 ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
520
521 // Wait for server to handle all open and close requests
523 busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
524 LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
525
526 // Check server status
527 // It's OK, if server didn't close all opened connections, because of it could receive not all FIN packets
528 ASSERT_LE(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
529 ASSERT_LE(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
530
531 // Request data from server, it causes to close rest connections
532 ask_for_data_requests();
533
534 // Wait for server to close rest connections
535 busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
536 LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
537
538 // Check server status. All connections should be closed
539 ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
540 ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
541}
542
543TEST_F(net_load_test_clt, permament_open_and_close_and_connections_closed_by_server)
544{
545 static const size_t MAX_OPENED_CONN_COUNT = 100;
546
547 // Init test
548 std::atomic<int> test_state(0);
550 req_start.open_request_target = CONNECTION_COUNT;
551 req_start.max_opened_conn_count = MAX_OPENED_CONN_COUNT;
553 m_tcp_server.get_config_object(), [&](int code, const CMD_START_OPEN_CLOSE_TEST::response&, const test_connection_context&) {
554 test_state.store(0 < code ? 1 : -1, std::memory_order_seq_cst);
555 }));
556
557 // Wait for server response
558 EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 1 == test_state.load(std::memory_order_seq_cst); }));
559 ASSERT_EQ(1, test_state.load(std::memory_order_seq_cst));
560
561 // Open connections
562 t_connection_opener_1 connection_opener(m_tcp_server, CONNECTION_COUNT);
563 parallel_exec([&] {
564 while (connection_opener.open());
565 });
566
567 // Wait for all open requests to complete
568 EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
569 LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
570 " / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")");
571 LOG_PRINT_L0("actual number of opened connections: " << m_tcp_server.get_config_object().get_connections_count());
572
573 ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
574 ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
575
576 // Wait for server accepts all connections
578 int last_new_connection_counter = -1;
579 busy_wait_for_server_statistics(srv_stat, [&last_new_connection_counter](const CMD_GET_STATISTICS::response& stat) {
580 if (last_new_connection_counter == static_cast<int>(stat.new_connection_counter)) return true;
581 else { last_new_connection_counter = static_cast<int>(stat.new_connection_counter); return false; }
582 });
583
584 // Ask server to close rest connections
586 ASSERT_TRUE(epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_CLOSE_ALL_CONNECTIONS::ID, req, m_tcp_server.get_config_object()));
587
588 // Wait for almost all connections to be closed by server
589 busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() <= m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT; });
590
591 // It's OK, if there are opened connections, because server could accept not all our connections
592 ASSERT_LE(m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT, m_commands_handler.new_connection_counter());
593 ASSERT_LE(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
594
595 // Wait for server to handle all open and close requests
596 busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
597 LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
598
599 // Check server status
600 ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
601 ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
602
603 // Close rest connections
604 m_tcp_server.get_config_object().foreach_connection([&](test_connection_context& ctx) {
605 if (ctx.m_connection_id != m_cmd_conn_id)
606 {
609 m_tcp_server.get_config_object(), [=](int code, const CMD_DATA_REQUEST::response& rsp, const test_connection_context&) {
610 if (code <= 0)
611 {
612 LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST. code = " << code);
613 }
614 });
615 if (!r)
616 LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST");
617 }
618 return true;
619 });
620
621 // Wait for all opened connections to close
622 EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); }));
623 LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
624 " / " << m_commands_handler.close_connection_counter());
625
626 // Check
627 ASSERT_EQ(m_commands_handler.close_connection_counter(), m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT);
628 ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
629}
630
631int main(int argc, char** argv)
632{
633 TRY_ENTRY();
636 //set up logging options
637 mlog_configure(mlog_get_default_log_path("net_load_tests_clt.log"), true);
638
639 ::testing::InitGoogleTest(&argc, argv);
640 return RUN_ALL_TESTS();
641 CATCH_ENTRY_L0("main", 1);
642}
int main()
void set_handler(levin_commands_handler< t_connection_context > *handler, void(*destroy)(levin_commands_handler< t_connection_context > *)=NULL)
bool init_server(uint32_t port, const std::string address="0.0.0.0", ssl_options_t ssl_options=ssl_support_t::e_ssl_support_autodetect)
bool connect_async(const std::string &adr, const std::string &port, uint32_t conn_timeot, const t_callback &cb, const std::string &bind_ip="0.0.0.0", epee::net_utils::ssl_support_t ssl_support=epee::net_utils::ssl_support_t::e_ssl_support_autodetect)
t_protocol_handler::config_type & get_config_object()
bool run_server(size_t threads_count, bool wait=true, const boost::thread::attributes &attrs=boost::thread::attributes())
Run the server's io_service loop.
size_t get() volatile const
#define TEST_F(test_fixture, test_name)
Definition gtest.h:2216
#define ASSERT_GT(val1, val2)
Definition gtest.h:1976
#define ASSERT_EQ(val1, val2)
Definition gtest.h:1956
#define ASSERT_LE(val1, val2)
Definition gtest.h:1964
#define ASSERT_FALSE(condition)
Definition gtest.h:1868
int RUN_ALL_TESTS() GTEST_MUST_USE_RESULT_
Definition gtest.h:2232
#define EXPECT_TRUE(condition)
Definition gtest.h:1859
#define ASSERT_TRUE(condition)
Definition gtest.h:1865
#define ASSERT_LT(val1, val2)
Definition gtest.h:1968
void mlog_configure(const std::string &filename_base, bool console, const std::size_t max_log_file_size=MAX_LOG_FILE_SIZE, const std::size_t max_log_files=MAX_LOG_FILES)
Definition mlog.cpp:148
std::string mlog_get_default_log_path(const char *default_filename)
Definition mlog.cpp:72
#define CATCH_ENTRY_L0(lacation, return_val)
#define TRY_ENTRY()
#define LOG_PRINT_L0(x)
Definition misc_log_ex.h:99
bool get_set_enable_assert(bool set=false, bool v=false)
bool sleep_no_w(long ms)
bool async_invoke_remote_command2(boost::uuids::uuid conn_id, int command, const t_arg &out_struct, t_transport &transport, const callback_t &cb, size_t inv_timeout=LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
bool notify_remote_command2(int command, const t_arg &out_struct, t_transport &transport)
std::unique_ptr< void, terminate > context
Unique ZMQ context handle, calls zmq_term on destruction.
Definition zmq.h:98
const std::string clt_port("36230")
const std::string srv_port("36231")
const unsigned int min_thread_count
epee::net_utils::boosted_tcp_server< test_levin_protocol_handler > test_tcp_server
GTEST_API_ void InitGoogleTest(int *argc, char **argv)
bool on_startup()
Definition util.cpp:778