Bitcoin Core  29.1.0
P2P Digital Currency
checkqueue.h
Go to the documentation of this file.
1 // Copyright (c) 2012-2022 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #ifndef BITCOIN_CHECKQUEUE_H
6 #define BITCOIN_CHECKQUEUE_H
7 
8 #include <logging.h>
9 #include <sync.h>
10 #include <tinyformat.h>
11 #include <util/threadnames.h>
12 
13 #include <algorithm>
14 #include <iterator>
15 #include <optional>
16 #include <vector>
17 
32 template <typename T, typename R = std::remove_cvref_t<decltype(std::declval<T>()().value())>>
34 {
35 private:
38 
40  std::condition_variable m_worker_cv;
41 
43  std::condition_variable m_master_cv;
44 
47  std::vector<T> queue GUARDED_BY(m_mutex);
48 
50  int nIdle GUARDED_BY(m_mutex){0};
51 
53  int nTotal GUARDED_BY(m_mutex){0};
54 
56  std::optional<R> m_result GUARDED_BY(m_mutex);
57 
63  unsigned int nTodo GUARDED_BY(m_mutex){0};
64 
66  const unsigned int nBatchSize;
67 
68  std::vector<std::thread> m_worker_threads;
69  bool m_request_stop GUARDED_BY(m_mutex){false};
70 
72  std::optional<R> Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
73  {
74  std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
75  std::vector<T> vChecks;
76  vChecks.reserve(nBatchSize);
77  unsigned int nNow = 0;
78  std::optional<R> local_result;
79  bool do_work;
80  do {
81  {
82  WAIT_LOCK(m_mutex, lock);
83  // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
84  if (nNow) {
85  if (local_result.has_value() && !m_result.has_value()) {
86  std::swap(local_result, m_result);
87  }
88  nTodo -= nNow;
89  if (nTodo == 0 && !fMaster) {
90  // We processed the last element; inform the master it can exit and return the result
91  m_master_cv.notify_one();
92  }
93  } else {
94  // first iteration
95  nTotal++;
96  }
97  // logically, the do loop starts here
98  while (queue.empty() && !m_request_stop) {
99  if (fMaster && nTodo == 0) {
100  nTotal--;
101  std::optional<R> to_return = std::move(m_result);
102  // reset the status for new work later
103  m_result = std::nullopt;
104  // return the current status
105  return to_return;
106  }
107  nIdle++;
108  cond.wait(lock); // wait
109  nIdle--;
110  }
111  if (m_request_stop) {
112  // return value does not matter, because m_request_stop is only set in the destructor.
113  return std::nullopt;
114  }
115 
116  // Decide how many work units to process now.
117  // * Do not try to do everything at once, but aim for increasingly smaller batches so
118  // all workers finish approximately simultaneously.
119  // * Try to account for idle jobs which will instantly start helping.
120  // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
121  nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
122  auto start_it = queue.end() - nNow;
123  vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end()));
124  queue.erase(start_it, queue.end());
125  // Check whether we need to do work at all
126  do_work = !m_result.has_value();
127  }
128  // execute work
129  if (do_work) {
130  for (T& check : vChecks) {
131  local_result = check();
132  if (local_result.has_value()) break;
133  }
134  }
135  vChecks.clear();
136  } while (true);
137  }
138 
139 public:
142 
144  explicit CCheckQueue(unsigned int batch_size, int worker_threads_num)
145  : nBatchSize(batch_size)
146  {
147  LogInfo("Script verification uses %d additional threads", worker_threads_num);
148  m_worker_threads.reserve(worker_threads_num);
149  for (int n = 0; n < worker_threads_num; ++n) {
150  m_worker_threads.emplace_back([this, n]() {
151  util::ThreadRename(strprintf("scriptch.%i", n));
152  Loop(false /* worker thread */);
153  });
154  }
155  }
156 
157  // Since this class manages its own resources, which is a thread
158  // pool `m_worker_threads`, copy and move operations are not appropriate.
159  CCheckQueue(const CCheckQueue&) = delete;
160  CCheckQueue& operator=(const CCheckQueue&) = delete;
161  CCheckQueue(CCheckQueue&&) = delete;
162  CCheckQueue& operator=(CCheckQueue&&) = delete;
163 
167  {
168  return Loop(true /* master thread */);
169  }
170 
172  void Add(std::vector<T>&& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
173  {
174  if (vChecks.empty()) {
175  return;
176  }
177 
178  {
179  LOCK(m_mutex);
180  queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end()));
181  nTodo += vChecks.size();
182  }
183 
184  if (vChecks.size() == 1) {
185  m_worker_cv.notify_one();
186  } else {
187  m_worker_cv.notify_all();
188  }
189  }
190 
192  {
193  WITH_LOCK(m_mutex, m_request_stop = true);
194  m_worker_cv.notify_all();
195  for (std::thread& t : m_worker_threads) {
196  t.join();
197  }
198  }
199 
200  bool HasThreads() const { return !m_worker_threads.empty(); }
201 };
202 
207 template <typename T, typename R = std::remove_cvref_t<decltype(std::declval<T>()().value())>>
209 {
210 private:
212  bool fDone;
213 
214 public:
215  CCheckQueueControl() = delete;
216  CCheckQueueControl(const CCheckQueueControl&) = delete;
218  explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
219  {
220  // passed queue is supposed to be unused, or nullptr
221  if (pqueue != nullptr) {
222  ENTER_CRITICAL_SECTION(pqueue->m_control_mutex);
223  }
224  }
225 
226  std::optional<R> Complete()
227  {
228  if (pqueue == nullptr) return std::nullopt;
229  auto ret = pqueue->Complete();
230  fDone = true;
231  return ret;
232  }
233 
234  void Add(std::vector<T>&& vChecks)
235  {
236  if (pqueue != nullptr) {
237  pqueue->Add(std::move(vChecks));
238  }
239  }
240 
242  {
243  if (!fDone)
244  Complete();
245  if (pqueue != nullptr) {
246  LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex);
247  }
248  }
249 };
250 
251 #endif // BITCOIN_CHECKQUEUE_H
int ret
std::optional< R > Complete() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Join the execution until completion.
Definition: checkqueue.h:166
#define strprintf
Format arguments and return the string or write to given std::ostream (see tinyformat::format doc for...
Definition: tinyformat.h:1172
bool HasThreads() const
Definition: checkqueue.h:200
CCheckQueueControl()=delete
int nIdle GUARDED_BY(m_mutex)
The number of workers (including the master) that are idle.
Definition: checkqueue.h:50
RAII-style controller object for a CCheckQueue that guarantees the passed queue is finished before co...
Definition: checkqueue.h:208
std::condition_variable m_worker_cv
Worker threads block on this when out of work.
Definition: checkqueue.h:40
void ThreadRename(const std::string &)
Rename a thread both in terms of an internal (in-memory) name as well as its system thread name...
Definition: threadnames.cpp:57
CCheckQueueControl(CCheckQueue< T > *const pqueueIn)
Definition: checkqueue.h:218
void Add(std::vector< T > &&vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Add a batch of checks to the queue.
Definition: checkqueue.h:172
#define LEAVE_CRITICAL_SECTION(cs)
Definition: sync.h:270
#define LOCK(cs)
Definition: sync.h:257
std::condition_variable m_master_cv
Master thread blocks on this when out of work.
Definition: checkqueue.h:43
#define WAIT_LOCK(cs, name)
Definition: sync.h:262
Queue for verifications that have to be performed.
Definition: checkqueue.h:33
Mutex m_control_mutex
Mutex to ensure only one concurrent CCheckQueueControl.
Definition: checkqueue.h:141
void Add(std::vector< T > &&vChecks)
Definition: checkqueue.h:234
#define WITH_LOCK(cs, code)
Run code while locking a mutex.
Definition: sync.h:301
int nTotal GUARDED_BY(m_mutex)
The total number of workers (including the master).
Definition: checkqueue.h:53
#define LogInfo(...)
Definition: logging.h:356
std::optional< R > Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Internal function that does bulk of the verification work.
Definition: checkqueue.h:72
Mutex m_mutex
Mutex to protect the inner state.
Definition: checkqueue.h:37
#define ENTER_CRITICAL_SECTION(cs)
Definition: sync.h:264
#define EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: threadsafety.h:49
std::vector< std::thread > m_worker_threads
Definition: checkqueue.h:68
const unsigned int nBatchSize
The maximum number of elements to be processed in one batch.
Definition: checkqueue.h:63
CCheckQueueControl & operator=(const CCheckQueueControl &)=delete
std::vector< T > queue GUARDED_BY(m_mutex)
The queue of elements to be processed.
bool m_request_stop GUARDED_BY(m_mutex)
Definition: checkqueue.h:69
CCheckQueue(unsigned int batch_size, int worker_threads_num)
Create a new check queue.
Definition: checkqueue.h:144
CCheckQueue< T, R > *const pqueue
Definition: checkqueue.h:211
CCheckQueue & operator=(const CCheckQueue &)=delete
std::optional< R > Complete()
Definition: checkqueue.h:226