Bitcoin Core  31.0.0
P2P Digital Currency
threadpool_tests.cpp
Go to the documentation of this file.
1 // Copyright (c) The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include <common/system.h>
6 #include <logging.h>
7 #include <random.h>
8 #include <test/util/common.h>
9 #include <util/string.h>
10 #include <util/threadpool.h>
11 #include <util/time.h>
12 
13 #include <boost/test/unit_test.hpp>
14 #include <latch>
15 #include <semaphore>
16 
17 // General test values
19 constexpr char POOL_NAME[] = "test";
20 constexpr auto WAIT_TIMEOUT = 120s;
21 
25  LogInfo("thread pool workers count: %d", NUM_WORKERS_DEFAULT);
26  }
27 };
28 
29 // Test Cases Overview
30 // 0) Submit task to a non-started pool.
31 // 1) Submit tasks and verify completion.
32 // 2) Maintain all threads busy except one.
33 // 3) Wait for work to finish.
34 // 4) Wait for result object.
35 // 5) The task throws an exception, catch must be done in the consumer side.
36 // 6) Busy workers, help them by processing tasks externally.
37 // 7) Recursive submission of tasks.
38 // 8) Submit task when all threads are busy, stop pool and verify task gets executed.
39 // 9) Congestion test; create more workers than available cores.
40 // 10) Ensure Interrupt() prevents further submissions.
41 // 11) Start() must not cause a deadlock when called during Stop().
42 // 12) Ensure queued tasks complete after Interrupt().
43 // 13) Ensure the Stop() calling thread helps drain the queue.
45 
46 #define WAIT_FOR(futures) \
47  do { \
48  for (const auto& f : futures) { \
49  BOOST_REQUIRE(f.wait_for(WAIT_TIMEOUT) == std::future_status::ready); \
50  } \
51  } while (0)
52 
53 // Helper to unwrap a valid pool submission
54 template <typename F>
55 [[nodiscard]] auto Submit(ThreadPool& pool, F&& fn)
56 {
57  return std::move(*Assert(pool.Submit(std::forward<F>(fn))));
58 }
59 
60 // Block a number of worker threads by submitting tasks that wait on `release_sem`.
61 // Returns the futures of the blocking tasks, ensuring all have started and are waiting.
62 std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::counting_semaphore<>& release_sem, size_t num_of_threads_to_block)
63 {
64  assert(threadPool.WorkersCount() >= num_of_threads_to_block);
65  std::latch ready{static_cast<std::ptrdiff_t>(num_of_threads_to_block)};
66  std::vector<std::future<void>> blocking_tasks(num_of_threads_to_block);
67  for (auto& f : blocking_tasks) f = Submit(threadPool, [&] {
68  ready.count_down();
69  release_sem.acquire();
70  });
71  ready.wait();
72  return blocking_tasks;
73 }
74 
75 // Test 0, submit task to a non-started, interrupted, or stopped pool
76 BOOST_AUTO_TEST_CASE(submit_fails_with_correct_error)
77 {
78  ThreadPool threadPool(POOL_NAME);
79  const auto fn_empty = [&] {};
80 
81  // Never started: Inactive
82  auto res = threadPool.Submit(fn_empty);
83  BOOST_CHECK(!res);
84  BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers");
85 
86  // Interrupted (workers still alive): Interrupted, and Start() must be rejected too
87  std::counting_semaphore<> blocker(0);
88  threadPool.Start(NUM_WORKERS_DEFAULT);
89  const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
90  threadPool.Interrupt();
91  res = threadPool.Submit(fn_empty);
92  BOOST_CHECK(!res);
93  BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "Interrupted");
94  BOOST_CHECK_EXCEPTION(threadPool.Start(NUM_WORKERS_DEFAULT), std::runtime_error, HasReason("Thread pool has been interrupted or is stopping"));
95  blocker.release(NUM_WORKERS_DEFAULT);
96  WAIT_FOR(blocking_tasks);
97 
98  // Interrupted then stopped: Inactive
99  threadPool.Stop();
100  res = threadPool.Submit(fn_empty);
101  BOOST_CHECK(!res);
102  BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers");
103 
104  // Started then stopped: Inactive
105  threadPool.Start(NUM_WORKERS_DEFAULT);
106  threadPool.Stop();
107  res = threadPool.Submit(fn_empty);
108  BOOST_CHECK(!res);
109  BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers");
110 }
111 
112 // Test 1, submit tasks and verify completion
113 BOOST_AUTO_TEST_CASE(submit_tasks_complete_successfully)
114 {
115  int num_tasks = 50;
116 
117  ThreadPool threadPool(POOL_NAME);
118  threadPool.Start(NUM_WORKERS_DEFAULT);
119  std::atomic<int> counter = 0;
120 
121  // Store futures to ensure completion before checking counter.
122  std::vector<std::future<void>> futures;
123  futures.reserve(num_tasks);
124  for (int i = 1; i <= num_tasks; i++) {
125  futures.emplace_back(Submit(threadPool, [&counter, i]() {
126  counter.fetch_add(i, std::memory_order_relaxed);
127  }));
128  }
129 
130  // Wait for all tasks to finish
131  WAIT_FOR(futures);
132  int expected_value = (num_tasks * (num_tasks + 1)) / 2; // Gauss sum.
133  BOOST_CHECK_EQUAL(counter.load(), expected_value);
134  BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
135 }
136 
137 // Test 2, maintain all threads busy except one
138 BOOST_AUTO_TEST_CASE(single_available_worker_executes_all_tasks)
139 {
140  ThreadPool threadPool(POOL_NAME);
141  threadPool.Start(NUM_WORKERS_DEFAULT);
142  std::counting_semaphore<> blocker(0);
143  const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT - 1);
144 
145  // Now execute tasks on the single available worker
146  // and check that all the tasks are executed.
147  int num_tasks = 15;
148  int counter = 0;
149 
150  // Store futures to wait on
151  std::vector<std::future<void>> futures(num_tasks);
152  for (auto& f : futures) f = Submit(threadPool, [&counter]{ counter++; });
153 
154  WAIT_FOR(futures);
155  BOOST_CHECK_EQUAL(counter, num_tasks);
156 
157  blocker.release(NUM_WORKERS_DEFAULT - 1);
158  WAIT_FOR(blocking_tasks);
159  threadPool.Stop();
160  BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
161 }
162 
163 // Test 3, wait for work to finish
164 BOOST_AUTO_TEST_CASE(wait_for_task_to_finish)
165 {
166  ThreadPool threadPool(POOL_NAME);
167  threadPool.Start(NUM_WORKERS_DEFAULT);
168  std::atomic<bool> flag = false;
169  std::future<void> future = Submit(threadPool, [&flag]() {
170  UninterruptibleSleep(200ms);
171  flag.store(true, std::memory_order_release);
172  });
173  BOOST_CHECK(future.wait_for(WAIT_TIMEOUT) == std::future_status::ready);
174  BOOST_CHECK(flag.load(std::memory_order_acquire));
175 }
176 
177 // Test 4, obtain result object
178 BOOST_AUTO_TEST_CASE(get_result_from_completed_task)
179 {
180  ThreadPool threadPool(POOL_NAME);
181  threadPool.Start(NUM_WORKERS_DEFAULT);
182  std::future<bool> future_bool = Submit(threadPool, []() { return true; });
183  BOOST_CHECK(future_bool.get());
184 
185  std::future<std::string> future_str = Submit(threadPool, []() { return std::string("true"); });
186  std::string result = future_str.get();
187  BOOST_CHECK_EQUAL(result, "true");
188 }
189 
190 // Test 5, throw exception and catch it on the consumer side
191 BOOST_AUTO_TEST_CASE(task_exception_propagates_to_future)
192 {
193  ThreadPool threadPool(POOL_NAME);
194  threadPool.Start(NUM_WORKERS_DEFAULT);
195 
196  const auto make_err{[&](size_t n) { return strprintf("error on thread #%s", n); }};
197 
198  const int num_tasks = 5;
199  std::vector<std::future<void>> futures;
200  futures.reserve(num_tasks);
201  for (int i = 0; i < num_tasks; i++) {
202  futures.emplace_back(Submit(threadPool, [&make_err, i] { throw std::runtime_error(make_err(i)); }));
203  }
204 
205  for (int i = 0; i < num_tasks; i++) {
206  BOOST_CHECK_EXCEPTION(futures[i].get(), std::runtime_error, HasReason{make_err(i)});
207  }
208 }
209 
210 // Test 6, all workers are busy, help them by processing tasks from outside
211 BOOST_AUTO_TEST_CASE(process_tasks_manually_when_workers_busy)
212 {
213  ThreadPool threadPool(POOL_NAME);
214  threadPool.Start(NUM_WORKERS_DEFAULT);
215 
216  std::counting_semaphore<> blocker(0);
217  const auto& blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
218 
219  // Now submit tasks and check that none of them are executed.
220  int num_tasks = 20;
221  std::atomic<int> counter = 0;
222  for (int i = 0; i < num_tasks; i++) {
223  (void)Submit(threadPool, [&counter]() {
224  counter.fetch_add(1, std::memory_order_relaxed);
225  });
226  }
227  UninterruptibleSleep(100ms);
228  BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
229 
230  // Now process manually
231  for (int i = 0; i < num_tasks; i++) {
232  threadPool.ProcessTask();
233  }
234  BOOST_CHECK_EQUAL(counter.load(), num_tasks);
235  BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
236  blocker.release(NUM_WORKERS_DEFAULT);
237  threadPool.Stop();
238  WAIT_FOR(blocking_tasks);
239 }
240 
241 // Test 7, submit tasks from other tasks
242 BOOST_AUTO_TEST_CASE(recursive_task_submission)
243 {
244  ThreadPool threadPool(POOL_NAME);
245  threadPool.Start(NUM_WORKERS_DEFAULT);
246 
247  std::promise<void> signal;
248  (void)Submit(threadPool, [&]() {
249  (void)Submit(threadPool, [&]() {
250  signal.set_value();
251  });
252  });
253 
254  signal.get_future().wait();
255  threadPool.Stop();
256 }
257 
258 // Test 8, submit task when all threads are busy and then stop the pool
259 BOOST_AUTO_TEST_CASE(task_submitted_while_busy_completes)
260 {
261  ThreadPool threadPool(POOL_NAME);
262  threadPool.Start(NUM_WORKERS_DEFAULT);
263 
264  std::counting_semaphore<> blocker(0);
265  const auto& blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
266 
267  // Submit an extra task that should execute once a worker is free
268  std::future<bool> future = Submit(threadPool, []() { return true; });
269 
270  // At this point, all workers are blocked, and the extra task is queued
271  BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
272 
273  // Wait a short moment before unblocking the threads to mimic a concurrent shutdown
274  std::thread thread_unblocker([&blocker]() {
275  UninterruptibleSleep(300ms);
276  blocker.release(NUM_WORKERS_DEFAULT);
277  });
278 
279  // Stop the pool while the workers are still blocked
280  threadPool.Stop();
281 
282  // Expect the submitted task to complete
283  BOOST_CHECK(future.get());
284  thread_unblocker.join();
285 
286  // Obviously all the previously blocking tasks should be completed at this point too
287  WAIT_FOR(blocking_tasks);
288 
289  // Pool should be stopped and no workers remaining
290  BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
291 }
292 
293 // Test 9, more workers than available cores (congestion test)
294 BOOST_AUTO_TEST_CASE(congestion_more_workers_than_cores)
295 {
296  ThreadPool threadPool(POOL_NAME);
297  threadPool.Start(std::max(1, GetNumCores() * 2)); // Oversubscribe by 2×
298 
299  int num_tasks = 200;
300  std::atomic<int> counter{0};
301 
302  std::vector<std::future<void>> futures;
303  futures.reserve(num_tasks);
304  for (int i = 0; i < num_tasks; i++) {
305  futures.emplace_back(Submit(threadPool, [&counter] {
306  counter.fetch_add(1, std::memory_order_relaxed);
307  }));
308  }
309 
310  WAIT_FOR(futures);
311  BOOST_CHECK_EQUAL(counter.load(), num_tasks);
312 }
313 
314 // Test 10, Interrupt() prevents further submissions
315 BOOST_AUTO_TEST_CASE(interrupt_blocks_new_submissions)
316 {
317  // 1) Interrupt from main thread
318  ThreadPool threadPool(POOL_NAME);
319  threadPool.Start(NUM_WORKERS_DEFAULT);
320  threadPool.Interrupt();
321 
322  auto res = threadPool.Submit([]{});
323  BOOST_CHECK(!res);
324  BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "Interrupted");
325 
326  // Reset pool
327  threadPool.Stop();
328 
329  // 2) Interrupt() from a worker thread
330  // One worker is blocked, another calls Interrupt(), and the remaining one waits for tasks.
331  threadPool.Start(/*num_workers=*/3);
332  std::atomic<int> counter{0};
333  std::counting_semaphore<> blocker(0);
334  const auto blocking_tasks = BlockWorkers(threadPool, blocker, 1);
335  Submit(threadPool, [&threadPool, &counter]{
336  threadPool.Interrupt();
337  counter.fetch_add(1, std::memory_order_relaxed);
338  }).get();
339  blocker.release(1); // unblock worker
340 
341  BOOST_CHECK_EQUAL(counter.load(), 1);
342  threadPool.Stop();
343  WAIT_FOR(blocking_tasks);
344  BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
345 }
346 
347 // Test 11, Start() must not cause a deadlock when called during Stop()
348 BOOST_AUTO_TEST_CASE(start_mid_stop_does_not_deadlock)
349 {
350  ThreadPool threadPool(POOL_NAME);
351  threadPool.Start(NUM_WORKERS_DEFAULT);
352 
353  // Keep all workers busy so Stop() gets stuck waiting for them to finish during join()
354  std::counting_semaphore<> workers_blocker(0);
355  const auto blocking_tasks = BlockWorkers(threadPool, workers_blocker, NUM_WORKERS_DEFAULT);
356 
357  std::thread stopper_thread([&threadPool] { threadPool.Stop(); });
358 
359  // Stop() takes ownership of the workers before joining them, so WorkersCount()
360  // hits 0 the moment Stop() is waiting for them to join. That is our signal
361  // to call Start() right into the middle of the joining phase.
362  while (threadPool.WorkersCount() != 0) {
363  std::this_thread::yield(); // let the OS breathe so it can switch context
364  }
365  // Now we know for sure the stopper thread is hanging while workers are still alive.
366  // Restart the pool and resume workers so the stopper thread can proceed.
367  // This will throw an exception only if the pool handles Start-Stop race properly,
368  // otherwise it will proceed and hang the stopper_thread.
369  try {
370  threadPool.Start(NUM_WORKERS_DEFAULT);
371  } catch (std::exception& e) {
372  BOOST_CHECK_EQUAL(e.what(), "Thread pool has been interrupted or is stopping");
373  }
374  workers_blocker.release(NUM_WORKERS_DEFAULT);
375  WAIT_FOR(blocking_tasks);
376 
377  // If Stop() is stuck, joining the stopper thread will deadlock
378  stopper_thread.join();
379 }
380 
381 // Test 12, queued tasks complete after Interrupt()
382 BOOST_AUTO_TEST_CASE(queued_tasks_complete_after_interrupt)
383 {
384  ThreadPool threadPool(POOL_NAME);
385  threadPool.Start(NUM_WORKERS_DEFAULT);
386 
387  std::counting_semaphore<> blocker(0);
388  const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
389 
390  // Queue tasks while all workers are busy, then interrupt
391  std::atomic<int> counter{0};
392  const int num_tasks = 10;
393  std::vector<std::future<void>> futures;
394  futures.reserve(num_tasks);
395  for (int i = 0; i < num_tasks; i++) {
396  futures.emplace_back(Submit(threadPool, [&counter]{ counter.fetch_add(1, std::memory_order_relaxed); }));
397  }
398  threadPool.Interrupt();
399 
400  // Queued tasks must still complete despite the interrupt
401  blocker.release(NUM_WORKERS_DEFAULT);
402  WAIT_FOR(futures);
403  BOOST_CHECK_EQUAL(counter.load(), num_tasks);
404 
405  threadPool.Stop();
406  WAIT_FOR(blocking_tasks);
407 }
408 
409 // Test 13, ensure the Stop() calling thread helps drain the queue
410 BOOST_AUTO_TEST_CASE(stop_active_wait_drains_queue)
411 {
412  ThreadPool threadPool(POOL_NAME);
413  threadPool.Start(NUM_WORKERS_DEFAULT);
414 
415  std::counting_semaphore<> blocker(0);
416  const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
417 
418  auto main_thread_id = std::this_thread::get_id();
419  std::atomic<int> main_thread_tasks{0};
420  const size_t num_tasks = 20;
421  for (size_t i = 0; i < num_tasks; i++) {
422  (void)Submit(threadPool, [&main_thread_tasks, main_thread_id]() {
423  if (std::this_thread::get_id() == main_thread_id)
424  main_thread_tasks.fetch_add(1, std::memory_order_relaxed);
425  });
426  }
427  BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
428 
429  // Delay release so Stop() drains all tasks from the calling thread
430  std::thread unblocker([&blocker, &threadPool]() {
431  while (threadPool.WorkQueueSize() > 0) {
432  std::this_thread::yield();
433  }
434  blocker.release(NUM_WORKERS_DEFAULT);
435  });
436 
437  threadPool.Stop();
438  unblocker.join();
439 
440  // Check the main thread processed all tasks
441  BOOST_CHECK_EQUAL(main_thread_tasks.load(), num_tasks);
442  WAIT_FOR(blocking_tasks);
443 }
444 
std::vector< std::future< void > > BlockWorkers(ThreadPool &threadPool, std::counting_semaphore<> &release_sem, size_t num_of_threads_to_block)
constexpr auto WAIT_TIMEOUT
auto Submit(ThreadPool &pool, F &&fn)
assert(!tx.IsCoinBase())
size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Definition: threadpool.h:229
#define strprintf
Format arguments and return the string or write to given std::ostream (see tinyformat::format doc for...
Definition: tinyformat.h:1172
void UninterruptibleSleep(const std::chrono::microseconds &n)
Definition: time.cpp:24
#define WAIT_FOR(futures)
Fixed-size thread pool for running arbitrary tasks concurrently.
Definition: threadpool.h:46
BOOST_AUTO_TEST_CASE(submit_fails_with_correct_error)
constexpr char POOL_NAME[]
BOOST_FIXTURE_TEST_SUITE(cuckoocache_tests, BasicTestingSetup)
Test Suite for CuckooCache.
#define LogInfo(...)
Definition: log.h:95
constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept
Definition: threadpool.h:235
Fast randomness source.
Definition: random.h:385
BOOST_AUTO_TEST_SUITE_END()
void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Start worker threads.
Definition: threadpool.h:104
BOOST_CHECK_EXCEPTION predicates to check the specific validation error.
Definition: common.h:17
void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop accepting new tasks and begin asynchronous shutdown.
Definition: threadpool.h:218
util::Expected< std::future< std::invoke_result_t< F > >, SubmitError > Submit(F &&fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Enqueues a new task for asynchronous execution.
Definition: threadpool.h:174
auto result
Definition: common-types.h:74
#define BOOST_CHECK_EQUAL(v1, v2)
Definition: object.cpp:17
void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop all worker threads and wait for them to exit.
Definition: threadpool.h:127
bool ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Execute a single queued task synchronously.
Definition: threadpool.h:193
I randrange(I range) noexcept
Generate a random integer in the range [0..range), with range > 0.
Definition: random.h:254
size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Definition: threadpool.h:224
int NUM_WORKERS_DEFAULT
int GetNumCores()
Return the number of cores available on the current system.
Definition: system.cpp:109
#define Assert(val)
Identity function.
Definition: check.h:113
#define BOOST_CHECK(expr)
Definition: object.cpp:16