libquentier  0.8.0
The library for rich desktop clients of Evernote service
Future.h
1 /*
2  * Copyright 2021-2022 Dmitry Ivanov
3  *
4  * This file is part of libquentier
5  *
6  * libquentier is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU Lesser General Public License as published by
8  * the Free Software Foundation, version 3 of the License.
9  *
10  * libquentier is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public License
16  * along with libquentier. If not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #pragma once
20 
21 #include <quentier/utility/Linkage.h>
22 
23 #include <QAbstractEventDispatcher>
24 #include <QFuture>
25 #include <QFutureWatcher>
26 #include <QMutex>
27 #include <QMutexLocker>
28 #include <QObject>
29 #include <QPointer>
30 
31 #include <quentier/threading/QtFutureContinuations.h>
32 
33 #if QT_VERSION >= QT_VERSION_CHECK(6, 0, 0)
34 #include <QPromise>
35 #include <exception>
36 #else
37 #include <quentier/threading/Qt5Promise.h>
38 #endif
39 
40 #include <algorithm>
41 #include <cmath>
42 #include <memory>
43 #include <type_traits>
44 #include <utility>
45 
46 namespace quentier::threading {
47 
51 template <class T>
52 [[nodiscard]] std::enable_if_t<
53  std::negation_v<std::is_same<std::decay_t<T>, void>>,
54  QFuture<std::decay_t<T>>>
55  makeReadyFuture(T t)
56 {
57  QPromise<std::decay_t<T>> promise;
58  QFuture<std::decay_t<T>> future = promise.future();
59 
60  promise.start();
61  promise.addResult(std::move(t));
62  promise.finish();
63 
64  return future;
65 }
66 
67 [[nodiscard]] QFuture<void> QUENTIER_EXPORT makeReadyFuture();
68 
73 template <class T, class E>
74 [[nodiscard]] std::enable_if_t<std::is_base_of_v<QException, E>, QFuture<T>>
75  makeExceptionalFuture(const E & e)
76 {
77  QPromise<std::decay_t<T>> promise;
78  QFuture<std::decay_t<T>> future = promise.future();
79 
80  promise.start();
81  promise.setException(e);
82  promise.finish();
83 
84  return future;
85 }
86 
87 #if QT_VERSION >= QT_VERSION_CHECK(6, 0, 0)
88 
92 template <class T>
93 [[nodiscard]] QFuture<T> makeExceptionalFuture(std::exception_ptr e)
94 {
95  QPromise<std::decay_t<T>> promise;
96  QFuture<std::decay_t<T>> future = promise.future();
97 
98  promise.start();
99  promise.setException(std::move(e));
100  promise.finish();
101 
102  return future;
103 }
104 #endif // QT_VERSION
105 
110 template <class T, class U>
111 void bindCancellation(const QFuture<T> & from, QFuture<U> to)
112 {
113  auto watcher = std::make_unique<QFutureWatcher<T>>();
114  auto * rawWatcher = watcher.get();
115 
116  QObject::connect(
117  rawWatcher, &QFutureWatcher<T>::canceled, rawWatcher,
118  [rawWatcher, to]() mutable {
119  to.cancel();
120  rawWatcher->deleteLater();
121  });
122 
123  QObject::connect(
124  rawWatcher, &QFutureWatcher<T>::finished, rawWatcher,
125  [rawWatcher] { rawWatcher->deleteLater(); });
126 
127  watcher->setFuture(from);
128  Q_UNUSED(watcher.release());
129 }
130 
138 [[nodiscard]] QFuture<void> QUENTIER_EXPORT
139  whenAll(QList<QFuture<void>> futures);
140 
150 template <class T>
151 [[nodiscard]] std::enable_if_t<
152  !std::is_void_v<std::decay_t<T>>, QFuture<QList<std::decay_t<T>>>>
153  whenAll(QList<QFuture<std::decay_t<T>>> futures)
154 {
155  if (Q_UNLIKELY(futures.isEmpty())) {
156  return makeReadyFuture<QList<std::decay_t<T>>>({});
157  }
158 
159  auto promise = std::make_shared<QPromise<QList<std::decay_t<T>>>>();
160  auto future = promise->future();
161 
162  for (auto & f: futures) {
163  threading::bindCancellation(future, f);
164  }
165 
166  const auto totalItemCount = futures.size();
167  promise->setProgressRange(0, static_cast<int>(totalItemCount));
168  promise->setProgressValue(0);
169 
170  promise->start();
171 
172  auto resultIndexedList =
173  std::make_shared<QList<std::pair<int, std::decay_t<T>>>>();
174 
175  auto processedItemsCount = std::make_shared<int>(0);
176  auto exceptionFlag = std::make_shared<bool>(false);
177  auto mutex = std::make_shared<QMutex>();
178 
179  for (int i = 0; i < futures.size(); ++i) {
180  auto & f = futures[i];
181  auto thenFuture = then(
182  std::move(f),
183  [promise, processedItemsCount, totalItemCount, exceptionFlag, mutex,
184  resultIndexedList, i](std::decay_t<T> result) {
185  if (promise->isCanceled()) {
186  return;
187  }
188 
189  int count = 0;
190  {
191  const QMutexLocker locker{mutex.get()};
192 
193  if (*exceptionFlag) {
194  return;
195  }
196 
197  ++(*processedItemsCount);
198  count = *processedItemsCount;
199  promise->setProgressValue(count);
200 
201  resultIndexedList->append(
202  std::make_pair(i, std::move(result)));
203  }
204 
205  if (count == totalItemCount) {
206  std::sort(
207  resultIndexedList->begin(), resultIndexedList->end(),
208  [](const auto & lhs, const auto & rhs) {
209  return lhs.first < rhs.first;
210  });
211 
212  auto resultList =
213  std::make_shared<QList<std::decay_t<T>>>();
214  resultList->reserve(resultIndexedList->size());
215  for (auto & [i, v]: *resultIndexedList) {
216  resultList->append(std::move(v));
217  }
218 
219  promise->addResult(*resultList);
220  promise->finish();
221  }
222  });
223 
224  onFailed(
225  std::move(thenFuture),
226  [promise, mutex, exceptionFlag](const QException & e) {
227  if (promise->isCanceled()) {
228  return;
229  }
230 
231  {
232  const QMutexLocker locker{mutex.get()};
233 
234  if (*exceptionFlag) {
235  return;
236  }
237 
238  *exceptionFlag = true;
239  }
240 
241  promise->setException(e);
242  promise->finish();
243  });
244  }
245 
246  return future;
247 }
248 
254 template <class T, class U>
255 void mapFutureProgress(
256  const QFuture<T> & future, const std::shared_ptr<QPromise<U>> & promise)
257 {
258  const auto futureProgressMinimum = future.progressMinimum();
259  const auto futureProgressRange =
260  future.progressMaximum() - futureProgressMinimum;
261 
262  Q_ASSERT(futureProgressRange >= 0);
263 
264  const auto promiseFuture = promise->future();
265  const auto promiseProgressMinimum = promiseFuture.progressMinimum();
266  const auto promiseProgressMaximum = promiseFuture.progressMaximum();
267 
268  const auto promiseProgressRange =
269  promiseProgressMaximum - promiseProgressMinimum;
270 
271  Q_ASSERT(promiseProgressRange >= 0);
272 
273  auto futureWatcher = std::make_unique<QFutureWatcher<T>>();
274 
275  QObject::connect(
276  futureWatcher.get(), &QFutureWatcher<T>::progressValueChanged,
277  futureWatcher.get(),
278  [promise, futureProgressMinimum, futureProgressRange,
279  promiseProgressRange, promiseProgressMinimum,
280  promiseProgressMaximum](int progressValue) {
281  if (Q_UNLIKELY(futureProgressRange == 0)) {
282  promise->setProgressValue(0);
283  return;
284  }
285 
286  const auto progressPart =
287  static_cast<double>(progressValue - futureProgressMinimum) /
288  static_cast<double>(futureProgressRange);
289 
290  const auto mappedProgressValue = static_cast<int>(
291  std::round(progressPart * promiseProgressRange));
292 
293  promise->setProgressValue(std::clamp(
294  promiseProgressMinimum + mappedProgressValue,
295  promiseProgressMinimum, promiseProgressMaximum));
296  });
297 
298  QObject::connect(
299  futureWatcher.get(), &QFutureWatcher<T>::finished, futureWatcher.get(),
300  [futureWatcherWeak = QPointer<QFutureWatcher<T>>(futureWatcher.get())] {
301  if (!futureWatcherWeak.isNull()) {
302  futureWatcherWeak->deleteLater();
303  }
304  });
305 
306  QObject::connect(
307  futureWatcher.get(), &QFutureWatcher<T>::canceled, futureWatcher.get(),
308  [futureWatcherWeak = QPointer<QFutureWatcher<T>>(futureWatcher.get())] {
309  if (!futureWatcherWeak.isNull()) {
310  futureWatcherWeak->deleteLater();
311  }
312  });
313 
314  futureWatcher->setFuture(future);
315  Q_UNUSED(futureWatcher.release());
316 }
317 
318 } // namespace quentier::threading
Definition: Qt5Promise.h:27
Definition: threading/Factory.h:24