54class OPENDHT_PUBLIC DhtRunner {
62 std::string proxy_server {};
63 std::string push_node_id {};
64 std::string push_token {};
65 std::string push_topic {};
66 std::string push_platform {};
67 std::string proxy_user_agent {};
68 bool peer_discovery {
false};
69 bool peer_publish {
false};
70 std::shared_ptr<dht::crypto::Certificate> server_ca;
71 dht::crypto::Identity client_identity;
76 std::shared_ptr<Logger> logger {};
77 std::unique_ptr<net::DatagramSocket> sock;
78 std::shared_ptr<PeerDiscovery> peerDiscovery {};
79 StatusCallback statusChangedCallback {};
80 CertificateStoreQuery certificateStore {};
81 IdentityAnnouncedCb identityAnnouncedCb {};
82 PublicAddressChangedCb publicAddressChangedCb {};
83 std::unique_ptr<std::mt19937_64> rng {};
90 void get(InfoHash
id, GetCallbackSimple cb, DoneCallback donecb={},
Value::Filter f = {}, Where w = {}) {
91 get(
id, bindGetCb(cb), donecb, f, w);
94 void get(InfoHash
id, GetCallbackSimple cb, DoneCallbackSimple donecb={}, Value::Filter f = {}, Where w = {}) {
95 get(
id, bindGetCb(cb), donecb, f, w);
98 void get(InfoHash hash, GetCallback vcb, DoneCallback dcb, Value::Filter f={}, Where w = {});
100 void get(InfoHash
id, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter f = {}, Where w = {}) {
101 get(
id, cb, bindDoneCb(donecb), f, w);
103 void get(
const std::string& key, GetCallback vcb, DoneCallbackSimple dcb={}, Value::Filter f = {}, Where w = {});
106 void get(InfoHash hash, std::function<
bool(std::vector<T>&&)> cb, DoneCallbackSimple dcb={})
108 get(hash, [cb=std::move(cb)](
const std::vector<std::shared_ptr<Value>>& vals) {
109 return cb(unpackVector<T>(vals));
115 void get(InfoHash hash, std::function<
bool(T&&)> cb, DoneCallbackSimple dcb={})
117 get(hash, [cb=std::move(cb)](
const std::vector<std::shared_ptr<Value>>& vals) {
118 for (
const auto& v : vals) {
120 if (not cb(Value::unpack<T>(*v)))
122 }
catch (
const std::exception&) {
132 std::future<std::vector<std::shared_ptr<dht::Value>>> get(InfoHash key, Value::Filter f = {}, Where w = {}) {
133 auto p = std::make_shared<std::promise<std::vector<std::shared_ptr< dht::Value >>>>();
134 auto values = std::make_shared<std::vector<std::shared_ptr< dht::Value >>>();
135 get(key, [=](
const std::vector<std::shared_ptr<dht::Value>>& vlist) {
136 values->insert(values->end(), vlist.begin(), vlist.end());
139 p->set_value(std::move(*values));
142 return p->get_future();
146 std::future<std::vector<T>> get(InfoHash key) {
147 auto p = std::make_shared<std::promise<std::vector<T>>>();
148 auto values = std::make_shared<std::vector<T>>();
149 get<T>(key, [=](T&& v) {
150 values->emplace_back(std::move(v));
153 p->set_value(std::move(*values));
155 return p->get_future();
158 void query(
const InfoHash& hash, QueryCallback cb, DoneCallback done_cb = {}, Query q = {});
159 void query(
const InfoHash& hash, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query q = {}) {
160 query(hash, cb, bindDoneCb(done_cb), q);
163 std::future<size_t> listen(InfoHash key, ValueCallback vcb, Value::Filter f = {}, Where w = {});
165 std::future<size_t> listen(InfoHash key, GetCallback cb, Value::Filter f={}, Where w={}) {
166 return listen(key, [cb=std::move(cb)](
const std::vector<Sp<Value>>& vals,
bool expired){
170 }, std::forward<Value::Filter>(f), std::forward<Where>(w));
172 std::future<size_t> listen(
const std::string& key, GetCallback vcb, Value::Filter f = {}, Where w = {});
173 std::future<size_t> listen(InfoHash key, GetCallbackSimple cb, Value::Filter f = {}, Where w = {}) {
174 return listen(key, bindGetCb(cb), f, w);
178 std::future<size_t> listen(InfoHash hash, std::function<
bool(std::vector<T>&&)> cb)
180 return listen(hash, [cb=std::move(cb)](
const std::vector<std::shared_ptr<Value>>& vals) {
181 return cb(unpackVector<T>(vals));
186 std::future<size_t> listen(InfoHash hash, std::function<
bool(std::vector<T>&&,
bool)> cb)
188 return listen(hash, [cb=std::move(cb)](
const std::vector<std::shared_ptr<Value>>& vals,
bool expired) {
189 return cb(unpackVector<T>(vals), expired);
194 template <
typename T>
195 std::future<size_t> listen(InfoHash hash, std::function<
bool(T&&)> cb, Value::Filter f = {}, Where w = {})
197 return listen(hash, [cb=std::move(cb)](
const std::vector<std::shared_ptr<Value>>& vals) {
198 for (
const auto& v : vals) {
200 if (not cb(Value::unpack<T>(*v)))
202 }
catch (
const std::exception&) {
208 getFilterSet<T>(f), w);
210 template <
typename T>
211 std::future<size_t> listen(InfoHash hash, std::function<
bool(T&&,
bool)> cb, Value::Filter f = {}, Where w = {})
213 return listen(hash, [cb=std::move(cb)](
const std::vector<std::shared_ptr<Value>>& vals,
bool expired) {
214 for (
const auto& v : vals) {
216 if (not cb(Value::unpack<T>(*v), expired))
218 }
catch (
const std::exception&) {
224 getFilterSet<T>(f), w);
227 void cancelListen(InfoHash h,
size_t token);
228 void cancelListen(InfoHash h, std::shared_future<size_t> token);
230 void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={}, time_point created=time_point::max(),
bool permanent =
false);
231 void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb, time_point created=time_point::max(),
bool permanent =
false) {
232 put(hash, value, bindDoneCb(cb), created, permanent);
235 void put(InfoHash hash, Value&& value, DoneCallback cb={}, time_point created=time_point::max(),
bool permanent =
false);
236 void put(InfoHash hash, Value&& value, DoneCallbackSimple cb, time_point created=time_point::max(),
bool permanent =
false) {
237 put(hash, std::forward<Value>(value), bindDoneCb(cb), created, permanent);
239 void put(
const std::string& key, Value&& value, DoneCallbackSimple cb={}, time_point created=time_point::max(),
bool permanent =
false);
241 void cancelPut(
const InfoHash& h, Value::Id
id);
242 void cancelPut(
const InfoHash& h,
const std::shared_ptr<Value>& value);
244 void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={},
bool permanent =
false);
245 void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb,
bool permanent =
false) {
246 putSigned(hash, value, bindDoneCb(cb), permanent);
249 void putSigned(InfoHash hash, Value&& value, DoneCallback cb={},
bool permanent =
false);
250 void putSigned(InfoHash hash, Value&& value, DoneCallbackSimple cb,
bool permanent =
false) {
251 putSigned(hash, std::forward<Value>(value), bindDoneCb(cb), permanent);
253 void putSigned(
const std::string& key, Value&& value, DoneCallbackSimple cb={},
bool permanent =
false);
255 void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallback cb={},
bool permanent =
false);
256 void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallbackSimple cb,
bool permanent =
false) {
257 putEncrypted(hash, to, value, bindDoneCb(cb), permanent);
260 void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallback cb={},
bool permanent =
false);
261 void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallbackSimple cb,
bool permanent =
false) {
262 putEncrypted(hash, to, std::forward<Value>(value), bindDoneCb(cb), permanent);
264 void putEncrypted(
const std::string& key, InfoHash to, Value&& value, DoneCallback cb={},
bool permanent =
false);
266 void putEncrypted(InfoHash hash,
const std::shared_ptr<crypto::PublicKey>& to, std::shared_ptr<Value> value, DoneCallback cb={},
bool permanent =
false);
267 void putEncrypted(InfoHash hash,
const std::shared_ptr<crypto::PublicKey>& to, std::shared_ptr<Value> value, DoneCallbackSimple cb,
bool permanent =
false) {
268 putEncrypted(hash, to, value, bindDoneCb(cb), permanent);
271 void putEncrypted(InfoHash hash,
const std::shared_ptr<crypto::PublicKey>& to, Value&& value, DoneCallback cb={},
bool permanent =
false);
272 void putEncrypted(InfoHash hash,
const std::shared_ptr<crypto::PublicKey>& to, Value&& value, DoneCallbackSimple cb,
bool permanent =
false) {
273 putEncrypted(hash, to, std::forward<Value>(value), bindDoneCb(cb), permanent);
281 void bootstrap(std::vector<SockAddr> nodes, DoneCallbackSimple cb={});
282 void bootstrap(
SockAddr addr, DoneCallbackSimple cb={});
296 void bootstrap(
const std::string& host,
const std::string& service);
297 void bootstrap(
const std::string& hostService);
317 void dumpTables()
const;
323 std::shared_ptr<crypto::PublicKey> getPublicKey()
const;
342 std::pair<size_t, size_t> getStoreSize()
const;
344 void getStorageLimit()
const;
345 void setStorageLimit(
size_t limit = DEFAULT_STORAGE_LIMIT);
347 std::vector<NodeExport> exportNodes()
const;
349 std::vector<ValuesExport> exportValues()
const;
351 void setLogger(
const Sp<Logger>& logger = {});
352 void setLogger(
const Logger& logger) {
353 setLogger(std::make_shared<Logger>(logger));
361 void registerType(
const ValueType& type);
363 void importValues(
const std::vector<ValuesExport>& values);
365 bool isRunning()
const {
366 return running != State::Idle;
369 NodeStats getNodesStats(sa_family_t af)
const;
370 unsigned getNodesStats(sa_family_t af,
unsigned *good_return,
unsigned *dubious_return,
unsigned *cached_return,
unsigned *incoming_return)
const;
371 NodeInfo getNodeInfo()
const;
372 void getNodeInfo(std::function<
void(std::shared_ptr<NodeInfo>)>);
374 std::vector<unsigned> getNodeMessageStats(
bool in =
false)
const;
375 std::string getStorageLog()
const;
376 std::string getStorageLog(
const InfoHash&)
const;
377 std::string getRoutingTablesLog(sa_family_t af)
const;
378 std::string getSearchesLog(sa_family_t af = AF_UNSPEC)
const;
379 std::string getSearchLog(
const InfoHash&, sa_family_t af = AF_UNSPEC)
const;
380 std::vector<SockAddr> getPublicAddress(sa_family_t af = AF_UNSPEC)
const;
381 std::vector<std::string> getPublicAddressStr(sa_family_t af = AF_UNSPEC)
const;
382 void getPublicAddress(std::function<
void(std::vector<SockAddr>&&)>, sa_family_t af = AF_UNSPEC);
386 void findCertificate(InfoHash hash, std::function<
void(
const std::shared_ptr<crypto::Certificate>&)>);
387 void registerCertificate(
const std::shared_ptr<crypto::Certificate>& cert);
388 void setLocalCertificateStore(CertificateStoreQuery&& query_method);
396 void run(in_port_t port = dht::net::DHT_DEFAULT_PORT,
const crypto::Identity& identity = {},
bool threaded =
true, NetId network = 0) {
398 config.dht_config.node_config.
network = network;
399 config.dht_config.id = identity;
400 config.threaded = threaded;
403 void run(in_port_t port, Config& config, Context&& context = {});
408 void run(
const char* ip4,
const char* ip6,
const char* service,
Config& config,
Context&& context = {});
410 void run(
const Config& config, Context&& context);
412 void setOnStatusChanged(StatusCallback&& cb) {
414 statusCbs.emplace_back(std::move(cb));
423 std::lock_guard<std::mutex> lck(dht_mtx);
430 void shutdown(ShutdownCallback cb = {},
bool stop =
false);
439 std::shared_ptr<PeerDiscovery> getPeerDiscovery()
const {
return peerDiscovery_; };
441 void setProxyServer(
const std::string& proxy,
const std::string& pushNodeId =
"");
472 void forwardAllMessages(
bool forward);
484 return std::max(status4, status6);
487 bool checkShutdown();
489 DoneCallback bindOpDoneCallback(DoneCallback&& cb);
490 DoneCallbackSimple bindOpDoneCallback(DoneCallbackSimple&& cb);
493 std::unique_ptr<SecureDht> dht_;
496 std::atomic_bool use_proxy {
false};
500 IdentityAnnouncedCb identityAnnouncedCb_;
507 mutable std::mutex dht_mtx {};
508 std::thread dht_thread {};
509 std::condition_variable cv {};
510 std::mutex sock_mtx {};
511 net::PacketList rcv {};
512 decltype(rcv) rcv_free {};
514 std::queue<std::function<void(SecureDht&)>> pending_ops_prio {};
515 std::queue<std::function<void(SecureDht&)>> pending_ops {};
516 std::mutex storage_mtx {};
518 std::atomic<State> running {State::Idle};
519 std::atomic_size_t ongoing_ops {0};
520 std::vector<ShutdownCallback> shutdownCallbacks_;
522 NodeStatus status4 {NodeStatus::Disconnected},
523 status6 {NodeStatus::Disconnected};
525 std::vector<StatusCallback> statusCbs {};
528 std::shared_ptr<PeerDiscovery> peerDiscovery_;
534 std::shared_ptr<dht::Logger> logger_;