Bitcoin Core  31.0.0
P2P Digital Currency
threadpool.h
Go to the documentation of this file.
1 // Copyright (c) The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or https://www.opensource.org/licenses/mit-license.php.
4 
5 #ifndef BITCOIN_UTIL_THREADPOOL_H
6 #define BITCOIN_UTIL_THREADPOOL_H
7 
8 #include <sync.h>
9 #include <tinyformat.h>
10 #include <util/expected.h>
11 #include <util/check.h>
12 #include <util/thread.h>
13 
14 #include <algorithm>
15 #include <condition_variable>
16 #include <functional>
17 #include <future>
18 #include <queue>
19 #include <thread>
20 #include <type_traits>
21 #include <utility>
22 #include <vector>
23 
47 {
48 private:
49  std::string m_name;
51  std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
52  std::condition_variable m_cv;
53  // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
54  // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
55  // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
56  bool m_interrupt GUARDED_BY(m_mutex){false};
57  std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
58 
60  {
61  WAIT_LOCK(m_mutex, wait_lock);
62  for (;;) {
63  std::packaged_task<void()> task;
64  {
65  // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
66  if (!m_interrupt && m_work_queue.empty()) {
67  // Block until the pool is interrupted or a task is available.
68  m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
69  }
70 
71  // If stopped and no work left, exit worker
72  if (m_interrupt && m_work_queue.empty()) {
73  return;
74  }
75 
76  task = std::move(m_work_queue.front());
77  m_work_queue.pop();
78  }
79 
80  {
81  // Execute the task without the lock
82  REVERSE_LOCK(wait_lock, m_mutex);
83  task();
84  }
85  }
86  }
87 
88 public:
89  explicit ThreadPool(const std::string& name) : m_name(name) {}
90 
92  {
93  Stop(); // In case it hasn't been stopped.
94  }
95 
104  void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
105  {
106  assert(num_workers > 0);
107  LOCK(m_mutex);
108  if (m_interrupt) throw std::runtime_error("Thread pool has been interrupted or is stopping");
109  if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
110 
111  // Create workers
112  m_workers.reserve(num_workers);
113  for (int i = 0; i < num_workers; i++) {
114  m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
115  }
116  }
117 
128  {
129  // Notify workers and join them
130  std::vector<std::thread> threads_to_join;
131  {
132  LOCK(m_mutex);
133  // Ensure Stop() is not called from a worker thread while workers are still registered,
134  // otherwise a self-join deadlock would occur.
135  auto id = std::this_thread::get_id();
136  for (const auto& worker : m_workers) assert(worker.get_id() != id);
137  // Early shutdown to return right away on any concurrent Submit() call
138  m_interrupt = true;
139  threads_to_join.swap(m_workers);
140  }
141  m_cv.notify_all();
142  // Help draining queue
143  while (ProcessTask()) {}
144  // Free resources
145  for (auto& worker : threads_to_join) worker.join();
146 
147  // Since we currently wait for tasks completion, sanity-check empty queue
148  LOCK(m_mutex);
149  Assume(m_work_queue.empty());
150  // Re-allow Start() now that all workers have exited
151  m_interrupt = false;
152  }
153 
154  enum class SubmitError {
155  Inactive,
156  Interrupted,
157  };
158 
173  template <class F>
175  {
176  std::packaged_task<std::invoke_result_t<F>()> task{std::forward<F>(fn)};
177  auto future{task.get_future()};
178  {
179  LOCK(m_mutex);
180  if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
181  if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
182 
183  m_work_queue.emplace(std::move(task));
184  }
185  m_cv.notify_one();
186  return {std::move(future)};
187  }
188 
194  {
195  std::packaged_task<void()> task;
196  {
197  LOCK(m_mutex);
198  if (m_work_queue.empty()) return false;
199 
200  // Pop the task
201  task = std::move(m_work_queue.front());
202  m_work_queue.pop();
203  }
204  task();
205  return true;
206  }
207 
219  {
220  WITH_LOCK(m_mutex, m_interrupt = true);
221  m_cv.notify_all();
222  }
223 
225  {
226  return WITH_LOCK(m_mutex, return m_work_queue.size());
227  }
228 
230  {
231  return WITH_LOCK(m_mutex, return m_workers.size());
232  }
233 };
234 
235 constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept {
236  switch (err) {
238  return "No active workers";
240  return "Interrupted";
241  }
242  Assume(false); // Unreachable
243  return "Unknown error";
244 }
245 
246 #endif // BITCOIN_UTIL_THREADPOOL_H
assert(!tx.IsCoinBase())
size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Definition: threadpool.h:229
#define strprintf
Format arguments and return the string or write to given std::ostream (see tinyformat::format doc for...
Definition: tinyformat.h:1172
std::queue< std::packaged_task< void()> > m_work_queue GUARDED_BY(m_mutex)
#define REVERSE_LOCK(g, cs)
Definition: sync.h:244
void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Definition: threadpool.h:59
Fixed-size thread pool for running arbitrary tasks concurrently.
Definition: threadpool.h:46
The util::Unexpected class represents an unexpected value stored in util::Expected.
Definition: expected.h:21
#define LOCK(cs)
Definition: sync.h:258
const char * name
Definition: rest.cpp:48
constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept
Definition: threadpool.h:235
ThreadPool(const std::string &name)
Definition: threadpool.h:89
bool m_interrupt GUARDED_BY(m_mutex)
Definition: threadpool.h:56
void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Start worker threads.
Definition: threadpool.h:104
#define WAIT_LOCK(cs, name)
Definition: sync.h:264
Mutex m_mutex
Definition: threadpool.h:50
#define WITH_LOCK(cs, code)
Run code while locking a mutex.
Definition: sync.h:289
#define Assume(val)
Assume is the identity function.
Definition: check.h:125
void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop accepting new tasks and begin asynchronous shutdown.
Definition: threadpool.h:218
#define EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: threadsafety.h:51
The util::Expected class provides a standard way for low-level functions to return either error value...
Definition: expected.h:44
util::Expected< std::future< std::invoke_result_t< F > >, SubmitError > Submit(F &&fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Enqueues a new task for asynchronous execution.
Definition: threadpool.h:174
std::string m_name
Definition: threadpool.h:49
void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop all worker threads and wait for them to exit.
Definition: threadpool.h:127
bool ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Execute a single queued task synchronously.
Definition: threadpool.h:193
size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Definition: threadpool.h:224
std::condition_variable m_cv
Definition: threadpool.h:52
void TraceThread(std::string_view thread_name, std::function< void()> thread_func)
A wrapper for do-something-once thread functions.
Definition: thread.cpp:16