Bitcoin Core  31.0.0
P2P Digital Currency
threadpool.cpp
Go to the documentation of this file.
1 // Copyright (c) The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include <logging.h>
6 #include <util/threadpool.h>
7 
9 #include <test/fuzz/fuzz.h>
10 
11 #include <atomic>
12 #include <future>
13 #include <queue>
14 
15 struct ExpectedException : std::runtime_error {
16  explicit ExpectedException(const std::string& msg) : std::runtime_error(msg) {}
17 };
18 
19 struct ThrowTask {
20  void operator()() const { throw ExpectedException("fail"); }
21 };
22 
23 struct CounterTask {
24  std::atomic_uint32_t& m_counter;
25  explicit CounterTask(std::atomic_uint32_t& counter) : m_counter{counter} {}
26  void operator()() const { m_counter.fetch_add(1, std::memory_order_relaxed); }
27 };
28 
29 // Waits for a future to complete. Increments 'fail_counter' if the expected exception is thrown.
30 static void GetFuture(std::future<void>& future, uint32_t& fail_counter)
31 {
32  try {
33  future.get();
34  } catch (const ExpectedException&) {
35  fail_counter++;
36  } catch (...) {
37  assert(false && "Unexpected exception type");
38  }
39 }
40 
41 // Global thread pool for fuzzing. Persisting it across iterations prevents
42 // the excessive thread creation/destruction overhead that can lead to
43 // instability in the fuzzing environment.
44 // This is also how we use it in the app's lifecycle.
47 // Global to verify we always have the same number of threads.
48 size_t g_num_workers = 3;
49 
51 {
53  if (g_pool.WorkersCount() == g_num_workers) return;
55 }
56 
57 static void setup_threadpool_test()
58 {
59  // Disable logging entirely. It seems to cause memory leaks.
61 }
62 
64 {
65  // Because LibAFL calls fork() after calling the init setup function,
66  // the child processes end up having one thread active and no workers.
67  // To work around this limitation, start thread pool inside the first runner.
69 
70  FuzzedDataProvider fuzzed_data_provider(buffer.data(), buffer.size());
71 
72  const uint32_t num_tasks = fuzzed_data_provider.ConsumeIntegralInRange<uint32_t>(0, 1024);
74  assert(g_pool.WorkQueueSize() == 0);
75 
76  // Counters
77  std::atomic_uint32_t task_counter{0};
78  uint32_t fail_counter{0};
79  uint32_t expected_task_counter{0};
80  uint32_t expected_fail_tasks{0};
81 
82  std::queue<std::future<void>> futures;
83  for (uint32_t i = 0; i < num_tasks; ++i) {
84  const bool will_throw = fuzzed_data_provider.ConsumeBool();
85  const bool wait_immediately = fuzzed_data_provider.ConsumeBool();
86 
87  std::future<void> fut;
88  if (will_throw) {
89  expected_fail_tasks++;
90  fut = *Assert(g_pool.Submit(ThrowTask{}));
91  } else {
92  expected_task_counter++;
93  fut = *Assert(g_pool.Submit(CounterTask{task_counter}));
94  }
95 
96  // If caller wants to wait immediately, consume the future here (safe).
97  if (wait_immediately) {
98  // Waits for this task to complete immediately; prior queued tasks may also complete
99  // as they were queued earlier.
100  GetFuture(fut, fail_counter);
101  } else {
102  // Store task for a posterior check
103  futures.emplace(std::move(fut));
104  }
105  }
106 
107  // Drain remaining futures
108  while (!futures.empty()) {
109  auto fut = std::move(futures.front());
110  futures.pop();
111  GetFuture(fut, fail_counter);
112  }
113 
114  assert(g_pool.WorkQueueSize() == 0);
115  assert(task_counter.load() == expected_task_counter);
116  assert(fail_counter == expected_fail_tasks);
117 }
void DisableLogging() EXCLUSIVE_LOCKS_REQUIRED(!m_cs)
Disable logging This offers a slight speedup and slightly smaller memory usage compared to leaving th...
Definition: logging.cpp:111
BCLog::Logger & LogInstance()
Definition: logging.cpp:26
assert(!tx.IsCoinBase())
size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Definition: threadpool.h:229
Mutex g_pool_mutex
Definition: threadpool.cpp:46
Definition: common.h:29
std::atomic_uint32_t & m_counter
Definition: threadpool.cpp:24
FUZZ_TARGET(threadpool,.init=setup_threadpool_test) EXCLUSIVE_LOCKS_REQUIRED(!g_pool_mutex)
Definition: threadpool.cpp:63
static void StartPoolIfNeeded() EXCLUSIVE_LOCKS_REQUIRED(!g_pool_mutex)
Definition: threadpool.cpp:50
Fixed-size thread pool for running arbitrary tasks concurrently.
Definition: threadpool.h:46
#define LOCK(cs)
Definition: sync.h:258
size_t g_num_workers
Definition: threadpool.cpp:48
ThreadPool g_pool
Definition: threadpool.cpp:45
void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Start worker threads.
Definition: threadpool.h:104
#define EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: threadsafety.h:51
util::Expected< std::future< std::invoke_result_t< F > >, SubmitError > Submit(F &&fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Enqueues a new task for asynchronous execution.
Definition: threadpool.h:174
FuzzedDataProvider & fuzzed_data_provider
Definition: fees.cpp:38
size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Definition: threadpool.h:224
ExpectedException(const std::string &msg)
Definition: threadpool.cpp:16
CounterTask(std::atomic_uint32_t &counter)
Definition: threadpool.cpp:25
T ConsumeIntegralInRange(T min, T max)
static void setup_threadpool_test()
Definition: threadpool.cpp:57
void operator()() const
Definition: threadpool.cpp:26
#define Assert(val)
Identity function.
Definition: check.h:113
void operator()() const
Definition: threadpool.cpp:20
static void GetFuture(std::future< void > &future, uint32_t &fail_counter)
Definition: threadpool.cpp:30