My Project 3.4.0
C++ Distributed Hash Table
Loading...
Searching...
No Matches
dhtrunner.h
1/*
2 * Copyright (C) 2014-2023 Savoir-faire Linux Inc.
3 * Authors: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
4 * Simon Désaulniers <simon.desaulniers@savoirfairelinux.com>
5 * Sébastien Blin <sebastien.blin@savoirfairelinux.com>
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program. If not, see <https://www.gnu.org/licenses/>.
19 */
20
21#pragma once
22
23#include "def.h"
24#include "infohash.h"
25#include "value.h"
26#include "callbacks.h"
27#include "sockaddr.h"
28#include "logger.h"
29#include "network_utils.h"
30#include "node_export.h"
31
32#include <thread>
33#include <mutex>
34#include <atomic>
35#include <condition_variable>
36#include <future>
37#include <exception>
38#include <queue>
39#include <chrono>
40
41namespace dht {
42
43struct Node;
44class SecureDht;
45class PeerDiscovery;
46struct SecureDhtConfig;
47
54class OPENDHT_PUBLIC DhtRunner {
55
56public:
57 using StatusCallback = std::function<void(NodeStatus, NodeStatus)>;
58
59 struct Config {
60 SecureDhtConfig dht_config {};
61 bool threaded {true};
62 std::string proxy_server {};
63 std::string push_node_id {};
64 std::string push_token {};
65 std::string push_topic {};
66 std::string push_platform {};
67 std::string proxy_user_agent {};
68 bool peer_discovery {false};
69 bool peer_publish {false};
70 std::shared_ptr<dht::crypto::Certificate> server_ca;
71 dht::crypto::Identity client_identity;
72 SockAddr bind4 {}, bind6 {};
73 };
74
75 struct Context {
76 std::shared_ptr<Logger> logger {};
77 std::unique_ptr<net::DatagramSocket> sock;
78 std::shared_ptr<PeerDiscovery> peerDiscovery {};
79 StatusCallback statusChangedCallback {};
80 CertificateStoreQuery certificateStore {};
81 IdentityAnnouncedCb identityAnnouncedCb {};
82 PublicAddressChangedCb publicAddressChangedCb {};
83 std::unique_ptr<std::mt19937_64> rng {};
84 Context() {}
85 };
86
87 DhtRunner();
88 virtual ~DhtRunner();
89
90 void get(InfoHash id, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter f = {}, Where w = {}) {
91 get(id, bindGetCb(cb), donecb, f, w);
92 }
93
94 void get(InfoHash id, GetCallbackSimple cb, DoneCallbackSimple donecb={}, Value::Filter f = {}, Where w = {}) {
95 get(id, bindGetCb(cb), donecb, f, w);
96 }
97
98 void get(InfoHash hash, GetCallback vcb, DoneCallback dcb, Value::Filter f={}, Where w = {});
99
100 void get(InfoHash id, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter f = {}, Where w = {}) {
101 get(id, cb, bindDoneCb(donecb), f, w);
102 }
103 void get(const std::string& key, GetCallback vcb, DoneCallbackSimple dcb={}, Value::Filter f = {}, Where w = {});
104
105 template <class T>
106 void get(InfoHash hash, std::function<bool(std::vector<T>&&)> cb, DoneCallbackSimple dcb={})
107 {
108 get(hash, [cb=std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals) {
109 return cb(unpackVector<T>(vals));
110 },
111 dcb,
112 getFilterSet<T>());
113 }
114 template <class T>
115 void get(InfoHash hash, std::function<bool(T&&)> cb, DoneCallbackSimple dcb={})
116 {
117 get(hash, [cb=std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals) {
118 for (const auto& v : vals) {
119 try {
120 if (not cb(Value::unpack<T>(*v)))
121 return false;
122 } catch (const std::exception&) {
123 continue;
124 }
125 }
126 return true;
127 },
128 dcb,
129 getFilterSet<T>());
130 }
131
132 std::future<std::vector<std::shared_ptr<dht::Value>>> get(InfoHash key, Value::Filter f = {}, Where w = {}) {
133 auto p = std::make_shared<std::promise<std::vector<std::shared_ptr< dht::Value >>>>();
134 auto values = std::make_shared<std::vector<std::shared_ptr< dht::Value >>>();
135 get(key, [=](const std::vector<std::shared_ptr<dht::Value>>& vlist) {
136 values->insert(values->end(), vlist.begin(), vlist.end());
137 return true;
138 }, [=](bool) {
139 p->set_value(std::move(*values));
140 },
141 f, w);
142 return p->get_future();
143 }
144
145 template <class T>
146 std::future<std::vector<T>> get(InfoHash key) {
147 auto p = std::make_shared<std::promise<std::vector<T>>>();
148 auto values = std::make_shared<std::vector<T>>();
149 get<T>(key, [=](T&& v) {
150 values->emplace_back(std::move(v));
151 return true;
152 }, [=](bool) {
153 p->set_value(std::move(*values));
154 });
155 return p->get_future();
156 }
157
158 void query(const InfoHash& hash, QueryCallback cb, DoneCallback done_cb = {}, Query q = {});
159 void query(const InfoHash& hash, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query q = {}) {
160 query(hash, cb, bindDoneCb(done_cb), q);
161 }
162
163 std::future<size_t> listen(InfoHash key, ValueCallback vcb, Value::Filter f = {}, Where w = {});
164
165 std::future<size_t> listen(InfoHash key, GetCallback cb, Value::Filter f={}, Where w={}) {
166 return listen(key, [cb=std::move(cb)](const std::vector<Sp<Value>>& vals, bool expired){
167 if (not expired)
168 return cb(vals);
169 return true;
170 }, std::forward<Value::Filter>(f), std::forward<Where>(w));
171 }
172 std::future<size_t> listen(const std::string& key, GetCallback vcb, Value::Filter f = {}, Where w = {});
173 std::future<size_t> listen(InfoHash key, GetCallbackSimple cb, Value::Filter f = {}, Where w = {}) {
174 return listen(key, bindGetCb(cb), f, w);
175 }
176
177 template <class T>
178 std::future<size_t> listen(InfoHash hash, std::function<bool(std::vector<T>&&)> cb)
179 {
180 return listen(hash, [cb=std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals) {
181 return cb(unpackVector<T>(vals));
182 },
183 getFilterSet<T>());
184 }
185 template <class T>
186 std::future<size_t> listen(InfoHash hash, std::function<bool(std::vector<T>&&, bool)> cb)
187 {
188 return listen(hash, [cb=std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals, bool expired) {
189 return cb(unpackVector<T>(vals), expired);
190 },
191 getFilterSet<T>());
192 }
193
194 template <typename T>
195 std::future<size_t> listen(InfoHash hash, std::function<bool(T&&)> cb, Value::Filter f = {}, Where w = {})
196 {
197 return listen(hash, [cb=std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals) {
198 for (const auto& v : vals) {
199 try {
200 if (not cb(Value::unpack<T>(*v)))
201 return false;
202 } catch (const std::exception&) {
203 continue;
204 }
205 }
206 return true;
207 },
208 getFilterSet<T>(f), w);
209 }
210 template <typename T>
211 std::future<size_t> listen(InfoHash hash, std::function<bool(T&&, bool)> cb, Value::Filter f = {}, Where w = {})
212 {
213 return listen(hash, [cb=std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals, bool expired) {
214 for (const auto& v : vals) {
215 try {
216 if (not cb(Value::unpack<T>(*v), expired))
217 return false;
218 } catch (const std::exception&) {
219 continue;
220 }
221 }
222 return true;
223 },
224 getFilterSet<T>(f), w);
225 }
226
227 void cancelListen(InfoHash h, size_t token);
228 void cancelListen(InfoHash h, std::shared_future<size_t> token);
229
230 void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={}, time_point created=time_point::max(), bool permanent = false);
231 void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb, time_point created=time_point::max(), bool permanent = false) {
232 put(hash, value, bindDoneCb(cb), created, permanent);
233 }
234
235 void put(InfoHash hash, Value&& value, DoneCallback cb={}, time_point created=time_point::max(), bool permanent = false);
236 void put(InfoHash hash, Value&& value, DoneCallbackSimple cb, time_point created=time_point::max(), bool permanent = false) {
237 put(hash, std::forward<Value>(value), bindDoneCb(cb), created, permanent);
238 }
239 void put(const std::string& key, Value&& value, DoneCallbackSimple cb={}, time_point created=time_point::max(), bool permanent = false);
240
241 void cancelPut(const InfoHash& h, Value::Id id);
242 void cancelPut(const InfoHash& h, const std::shared_ptr<Value>& value);
243
244 void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={}, bool permanent = false);
245 void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb, bool permanent = false) {
246 putSigned(hash, value, bindDoneCb(cb), permanent);
247 }
248
249 void putSigned(InfoHash hash, Value&& value, DoneCallback cb={}, bool permanent = false);
250 void putSigned(InfoHash hash, Value&& value, DoneCallbackSimple cb, bool permanent = false) {
251 putSigned(hash, std::forward<Value>(value), bindDoneCb(cb), permanent);
252 }
253 void putSigned(const std::string& key, Value&& value, DoneCallbackSimple cb={}, bool permanent = false);
254
255 void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallback cb={}, bool permanent = false);
256 void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallbackSimple cb, bool permanent = false) {
257 putEncrypted(hash, to, value, bindDoneCb(cb), permanent);
258 }
259
260 void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallback cb={}, bool permanent = false);
261 void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallbackSimple cb, bool permanent = false) {
262 putEncrypted(hash, to, std::forward<Value>(value), bindDoneCb(cb), permanent);
263 }
264 void putEncrypted(const std::string& key, InfoHash to, Value&& value, DoneCallback cb={}, bool permanent = false);
265
266 void putEncrypted(InfoHash hash, const std::shared_ptr<crypto::PublicKey>& to, std::shared_ptr<Value> value, DoneCallback cb={}, bool permanent = false);
267 void putEncrypted(InfoHash hash, const std::shared_ptr<crypto::PublicKey>& to, std::shared_ptr<Value> value, DoneCallbackSimple cb, bool permanent = false) {
268 putEncrypted(hash, to, value, bindDoneCb(cb), permanent);
269 }
270
271 void putEncrypted(InfoHash hash, const std::shared_ptr<crypto::PublicKey>& to, Value&& value, DoneCallback cb={}, bool permanent = false);
272 void putEncrypted(InfoHash hash, const std::shared_ptr<crypto::PublicKey>& to, Value&& value, DoneCallbackSimple cb, bool permanent = false) {
273 putEncrypted(hash, to, std::forward<Value>(value), bindDoneCb(cb), permanent);
274 }
275
276
281 void bootstrap(std::vector<SockAddr> nodes, DoneCallbackSimple cb={});
282 void bootstrap(SockAddr addr, DoneCallbackSimple cb={});
283
288 void bootstrap(std::vector<NodeExport> nodes);
289
296 void bootstrap(const std::string& host, const std::string& service);
297 void bootstrap(const std::string& hostService);
298
303 void bootstrap(const InfoHash& id, const SockAddr& address);
304
309
316
317 void dumpTables() const;
318
322 InfoHash getId() const;
323 std::shared_ptr<crypto::PublicKey> getPublicKey() const;
324
328 InfoHash getNodeId() const;
329
334 SockAddr getBound(sa_family_t f = AF_INET) const;
335
340 in_port_t getBoundPort(sa_family_t f = AF_INET) const;
341
342 std::pair<size_t, size_t> getStoreSize() const;
343
344 void getStorageLimit() const;
345 void setStorageLimit(size_t limit = DEFAULT_STORAGE_LIMIT);
346
347 std::vector<NodeExport> exportNodes() const;
348
349 std::vector<ValuesExport> exportValues() const;
350
351 void setLogger(const Sp<Logger>& logger = {});
352 void setLogger(const Logger& logger) {
353 setLogger(std::make_shared<Logger>(logger));
354 }
355
359 void setLogFilter(const InfoHash& f = {});
360
361 void registerType(const ValueType& type);
362
363 void importValues(const std::vector<ValuesExport>& values);
364
365 bool isRunning() const {
366 return running != State::Idle;
367 }
368
369 NodeStats getNodesStats(sa_family_t af) const;
370 unsigned getNodesStats(sa_family_t af, unsigned *good_return, unsigned *dubious_return, unsigned *cached_return, unsigned *incoming_return) const;
371 NodeInfo getNodeInfo() const;
372 void getNodeInfo(std::function<void(std::shared_ptr<NodeInfo>)>);
373
374 std::vector<unsigned> getNodeMessageStats(bool in = false) const;
375 std::string getStorageLog() const;
376 std::string getStorageLog(const InfoHash&) const;
377 std::string getRoutingTablesLog(sa_family_t af) const;
378 std::string getSearchesLog(sa_family_t af = AF_UNSPEC) const;
379 std::string getSearchLog(const InfoHash&, sa_family_t af = AF_UNSPEC) const;
380 std::vector<SockAddr> getPublicAddress(sa_family_t af = AF_UNSPEC) const;
381 std::vector<std::string> getPublicAddressStr(sa_family_t af = AF_UNSPEC) const;
382 void getPublicAddress(std::function<void(std::vector<SockAddr>&&)>, sa_family_t af = AF_UNSPEC);
383
384 // securedht methods
385
386 void findCertificate(InfoHash hash, std::function<void(const std::shared_ptr<crypto::Certificate>&)>);
387 void registerCertificate(const std::shared_ptr<crypto::Certificate>& cert);
388 void setLocalCertificateStore(CertificateStoreQuery&& query_method);
389
396 void run(in_port_t port = dht::net::DHT_DEFAULT_PORT, const crypto::Identity& identity = {}, bool threaded = true, NetId network = 0) {
397 Config config;
398 config.dht_config.node_config.network = network;
399 config.dht_config.id = identity;
400 config.threaded = threaded;
401 run(port, config);
402 }
403 void run(in_port_t port, Config& config, Context&& context = {});
404
408 void run(const char* ip4, const char* ip6, const char* service, Config& config, Context&& context = {});
409
410 void run(const Config& config, Context&& context);
411
412 void setOnStatusChanged(StatusCallback&& cb) {
413 if (cb)
414 statusCbs.emplace_back(std::move(cb));
415 }
416
422 time_point loop() {
423 std::lock_guard<std::mutex> lck(dht_mtx);
424 return loop_();
425 }
426
430 void shutdown(ShutdownCallback cb = {}, bool stop = false);
431
437 void join();
438
439 std::shared_ptr<PeerDiscovery> getPeerDiscovery() const { return peerDiscovery_; };
440
441 void setProxyServer(const std::string& proxy, const std::string& pushNodeId = "");
442
447 void enableProxy(bool proxify);
448
449 /* Push notification methods */
450
454 void setPushNotificationToken(const std::string& token);
455
459 void setPushNotificationTopic(const std::string& topic);
460
464 void setPushNotificationPlatform(const std::string& platform);
465
469 std::future<PushNotificationResult> pushNotificationReceived(const std::map<std::string, std::string>& data);
470
471 /* Proxy server mothods */
472 void forwardAllMessages(bool forward);
473
474private:
475 enum class State {
476 Idle,
477 Running,
478 Stopping
479 };
480
481 time_point loop_();
482
483 NodeStatus getStatus() const {
484 return std::max(status4, status6);
485 }
486
487 bool checkShutdown();
488 void opEnded();
489 DoneCallback bindOpDoneCallback(DoneCallback&& cb);
490 DoneCallbackSimple bindOpDoneCallback(DoneCallbackSimple&& cb);
491
493 std::unique_ptr<SecureDht> dht_;
494
496 std::atomic_bool use_proxy {false};
497
499 Config config_;
500 IdentityAnnouncedCb identityAnnouncedCb_;
501
505 void resetDht();
506
507 mutable std::mutex dht_mtx {};
508 std::thread dht_thread {};
509 std::condition_variable cv {};
510 std::mutex sock_mtx {};
511 net::PacketList rcv {};
512 decltype(rcv) rcv_free {};
513
514 std::queue<std::function<void(SecureDht&)>> pending_ops_prio {};
515 std::queue<std::function<void(SecureDht&)>> pending_ops {};
516 std::mutex storage_mtx {};
517
518 std::atomic<State> running {State::Idle};
519 std::atomic_size_t ongoing_ops {0};
520 std::vector<ShutdownCallback> shutdownCallbacks_;
521
522 NodeStatus status4 {NodeStatus::Disconnected},
523 status6 {NodeStatus::Disconnected};
524
525 std::vector<StatusCallback> statusCbs {};
526
528 std::shared_ptr<PeerDiscovery> peerDiscovery_;
529
534 std::shared_ptr<dht::Logger> logger_;
535};
536
537}
InfoHash getId() const
in_port_t getBoundPort(sa_family_t f=AF_INET) const
void clearBootstrap()
void shutdown(ShutdownCallback cb={}, bool stop=false)
void bootstrap(const std::string &host, const std::string &service)
void setPushNotificationToken(const std::string &token)
void connectivityChanged()
time_point loop()
Definition dhtrunner.h:422
InfoHash getNodeId() const
void run(in_port_t port=dht::net::DHT_DEFAULT_PORT, const crypto::Identity &identity={}, bool threaded=true, NetId network=0)
Definition dhtrunner.h:396
void bootstrap(std::vector< SockAddr > nodes, DoneCallbackSimple cb={})
void setLogFilter(const InfoHash &f={})
SockAddr getBound(sa_family_t f=AF_INET) const
void run(const char *ip4, const char *ip6, const char *service, Config &config, Context &&context={})
void enableProxy(bool proxify)
void setPushNotificationPlatform(const std::string &platform)
void bootstrap(const InfoHash &id, const SockAddr &address)
std::future< PushNotificationResult > pushNotificationReceived(const std::map< std::string, std::string > &data)
void setPushNotificationTopic(const std::string &topic)
void bootstrap(std::vector< NodeExport > nodes)
NodeStatus
Definition callbacks.h:42
NetId network
Definition callbacks.h:114