Electroneum
Loading...
Searching...
No Matches
threadpool.cpp
Go to the documentation of this file.
1// Copyright (c) 2017-2019, The Monero Project
2//
3// All rights reserved.
4//
5// Redistribution and use in source and binary forms, with or without modification, are
6// permitted provided that the following conditions are met:
7//
8// 1. Redistributions of source code must retain the above copyright notice, this list of
9// conditions and the following disclaimer.
10//
11// 2. Redistributions in binary form must reproduce the above copyright notice, this list
12// of conditions and the following disclaimer in the documentation and/or other
13// materials provided with the distribution.
14//
15// 3. Neither the name of the copyright holder nor the names of its contributors may be
16// used to endorse or promote products derived from this software without specific
17// prior written permission.
18//
19// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
20// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
21// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
22// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
26// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
27// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28#include "misc_log_ex.h"
29#include "common/threadpool.h"
30
31#include "cryptonote_config.h"
32#include "common/util.h"
33
34static __thread int depth = 0;
35static __thread bool is_leaf = false;
36
37namespace tools
38{
39threadpool::threadpool(unsigned int max_threads) : running(true), active(0) {
40 boost::thread::attributes attrs;
41 attrs.set_stack_size(THREAD_STACK_SIZE);
42 max = max_threads ? max_threads : tools::get_max_concurrency();
43 size_t i = max ? max - 1 : 0;
44 while(i--) {
45 threads.push_back(boost::thread(attrs, boost::bind(&threadpool::run, this, false)));
46 }
47}
48
50 try
51 {
52 const boost::unique_lock<boost::mutex> lock(mutex);
53 running = false;
54 has_work.notify_all();
55 }
56 catch (...)
57 {
58 // if the lock throws, we're just do it without a lock and hope,
59 // since the alternative is terminate
60 running = false;
61 has_work.notify_all();
62 }
63 for (size_t i = 0; i<threads.size(); i++) {
64 try { threads[i].join(); }
65 catch (...) { /* ignore */ }
66 }
67}
68
69void threadpool::submit(waiter *obj, std::function<void()> f, bool leaf) {
70 CHECK_AND_ASSERT_THROW_MES(!is_leaf, "A leaf routine is using a thread pool");
71 boost::unique_lock<boost::mutex> lock(mutex);
72 if (!leaf && ((active == max && !queue.empty()) || depth > 0)) {
73 // if all available threads are already running
74 // and there's work waiting, just run in current thread
75 lock.unlock();
76 ++depth;
77 is_leaf = leaf;
78 f();
79 --depth;
80 is_leaf = false;
81 } else {
82 if (obj)
83 obj->inc();
84 if (leaf)
85 queue.push_front({obj, f, leaf});
86 else
87 queue.push_back({obj, f, leaf});
88 has_work.notify_one();
89 }
90}
91
92unsigned int threadpool::get_max_concurrency() const {
93 return max;
94}
95
97{
98 try
99 {
100 boost::unique_lock<boost::mutex> lock(mt);
101 if (num)
102 MERROR("wait should have been called before waiter dtor - waiting now");
103 }
104 catch (...) { /* ignore */ }
105 try
106 {
107 wait(NULL);
108 }
109 catch (const std::exception &e)
110 {
111 /* ignored */
112 }
113}
114
115void threadpool::waiter::wait(threadpool *tpool) {
116 if (tpool)
117 tpool->run(true);
118 boost::unique_lock<boost::mutex> lock(mt);
119 while(num)
120 cv.wait(lock);
121}
122
124 const boost::unique_lock<boost::mutex> lock(mt);
125 num++;
126}
127
129 const boost::unique_lock<boost::mutex> lock(mt);
130 num--;
131 if (!num)
132 cv.notify_all();
133}
134
135void threadpool::run(bool flush) {
136 boost::unique_lock<boost::mutex> lock(mutex);
137 while (running) {
138 entry e;
139 while(queue.empty() && running)
140 {
141 if (flush)
142 return;
143 has_work.wait(lock);
144 }
145 if (!running) break;
146
147 active++;
148 e = queue.front();
149 queue.pop_front();
150 lock.unlock();
151 ++depth;
152 is_leaf = e.leaf;
153 e.f();
154 --depth;
155 is_leaf = false;
156
157 if (e.wo)
158 e.wo->dec();
159 lock.lock();
160 active--;
161 }
162}
163}
void wait(threadpool *tpool)
void submit(waiter *waiter, std::function< void()> f, bool leaf=false)
unsigned int get_max_concurrency() const
#define THREAD_STACK_SIZE
#define MERROR(x)
Definition misc_log_ex.h:73
#define CHECK_AND_ASSERT_THROW_MES(expr, message)
Various Tools.
Definition tools.cpp:31
unsigned get_max_concurrency()
Definition util.cpp:868
#define true