90 const std::shared_ptr<log::Logger>& logger = {});
92 virtual ~DhtProxyServer();
94 DhtProxyServer(
const DhtProxyServer& other) =
delete;
95 DhtProxyServer(DhtProxyServer&& other) =
delete;
96 DhtProxyServer& operator=(
const DhtProxyServer& other) =
delete;
97 DhtProxyServer& operator=(DhtProxyServer&& other) =
delete;
99 asio::io_context& io_context()
const;
101 using clock = std::chrono::steady_clock;
102 using time_point = clock::time_point;
105 uint64_t highPriorityCount {0};
106 uint64_t normalPriorityCount {0};
108 void increment(
bool highPriority) {
112 normalPriorityCount++;
115 Json::Value toJson()
const {
117 val[
"highPriorityCount"] =
static_cast<Json::UInt64
>(highPriorityCount);
118 val[
"normalPriorityCount"] =
static_cast<Json::UInt64
>(normalPriorityCount);
122 std::string toString()
const {
123 return fmt::format(
"{} high priority, {} normal priority", highPriorityCount, normalPriorityCount);
152 std::string toString()
const {
153 auto ret = fmt::format(
"Listens: {}, Puts: {}, PushListeners: {}\n"
154 "Push requests in the last {}: [Android: {}], [iOS: {}], [Unified: {}]\n"
155 "Requests: {} per second.",
156 listenCount, putCount, pushListenersCount,
157 print_duration(lastUpdated - serverStartTime),
158 androidPush.toString(), iosPush.toString(), unifiedPush.toString(),
161 auto& ipv4 = nodeInfo->ipv4;
162 if (ipv4.table_depth > 1)
163 ret += fmt::format(
"IPv4 Network estimation: {}\n", ipv4.getNetworkSizeEstimation());
164 auto& ipv6 = nodeInfo->ipv6;
165 if (ipv6.table_depth > 1)
166 ret += fmt::format(
"IPv6 Network estimation: {}\n", ipv6.getNetworkSizeEstimation());
176 result[
"listenCount"] =
static_cast<Json::UInt64
>(
listenCount);
177 result[
"putCount"] =
static_cast<Json::UInt64
>(
putCount);
180 result[
"serverStartTime"] =
static_cast<Json::LargestInt
>(to_time_t(
serverStartTime));
181 result[
"lastUpdated"] =
static_cast<Json::LargestInt
>(to_time_t(
lastUpdated));
183 result[
"iosPush"] = iosPush.toJson();
184 result[
"unifiedPush"] = unifiedPush.toJson();
187 result[
"nodeInfo"] =
nodeInfo->toJson();
192 std::shared_ptr<ServerStats> stats()
const {
return stats_; }
194 std::shared_ptr<ServerStats> updateStats(std::shared_ptr<NodeInfo> info)
const;
196 std::shared_ptr<DhtRunner> getNode()
const {
return dht_; }
199 class ConnectionListener;
200 struct RestRouterTraitsTls;
201 struct RestRouterTraits;
203 template <
typename HttpResponse>
204 static HttpResponse initHttpResponse(HttpResponse response);
205 static restinio::request_handling_status_t serverError(restinio::request_t& request);
207 template<
typename ServerSettings >
208 void addServerSettings(ServerSettings& serverSettings,
209 const unsigned int max_pipelined_requests = 16);
211 std::unique_ptr<RestRouter> createRestRouter();
213 void onConnectionClosed(restinio::connection_id_t);
222 RequestStatus getNodeInfo(restinio::request_handle_t request,
223 restinio::router::route_params_t params)
const;
231 RequestStatus getStats(restinio::request_handle_t request,
232 restinio::router::route_params_t params);
244 RequestStatus get(restinio::request_handle_t request,
245 restinio::router::route_params_t params);
257 RequestStatus listen(restinio::request_handle_t request,
258 restinio::router::route_params_t params);
269 RequestStatus put(restinio::request_handle_t request,
270 restinio::router::route_params_t params);
272 void handleCancelPermamentPut(
const asio::error_code &ec,
const InfoHash& key, Value::Id vid);
274#ifdef OPENDHT_PROXY_SERVER_IDENTITY
284 RequestStatus putSigned(restinio::request_handle_t request,
285 restinio::router::route_params_t params)
const;
296 RequestStatus putEncrypted(restinio::request_handle_t request,
297 restinio::router::route_params_t params);
311 RequestStatus getFiltered(restinio::request_handle_t request,
312 restinio::router::route_params_t params);
321 RequestStatus options(restinio::request_handle_t request,
322 restinio::router::route_params_t params);
324 struct PushSessionContext {
326 std::string sessionId;
327 PushSessionContext(
const std::string&
id) : sessionId(id) {}
330#ifdef OPENDHT_PUSH_NOTIFICATIONS
331 PushType getTypeFromString(
const std::string& type);
332 std::string getDefaultTopic(PushType type);
334 RequestStatus pingPush(restinio::request_handle_t request,
335 restinio::router::route_params_t );
345 RequestStatus subscribe(restinio::request_handle_t request,
346 restinio::router::route_params_t params);
355 RequestStatus unsubscribe(restinio::request_handle_t request,
356 restinio::router::route_params_t params);
363 void sendPushNotification(
const std::string& key, Json::Value&& json, PushType type,
bool highPriority,
const std::string& topic);
373 void handleNotifyPushListenExpire(
const asio::error_code &ec,
const std::string pushToken,
374 std::function<Json::Value()> json, PushType type,
const std::string& topic);
383 void handleCancelPushListen(
const asio::error_code &ec,
const std::string pushToken,
384 const InfoHash key,
const std::string clientId);
399 bool handlePushListen(
const InfoHash& infoHash,
const std::string& pushToken,
400 PushType type,
const std::string& clientId,
401 const std::shared_ptr<DhtProxyServer::PushSessionContext>& sessionCtx,
const std::string& topic,
402 const std::vector<std::shared_ptr<Value>>& values,
bool expired);
406 void handlePrintStats(
const asio::error_code &ec);
409 template <
typename Os>
410 void saveState(Os& stream);
412 template <
typename Is>
413 void loadState(Is& is,
size_t size);
415 std::shared_ptr<asio::io_context> ioContext_;
416 std::shared_ptr<DhtRunner> dht_;
417 Json::StreamWriterBuilder jsonBuilder_;
418 Json::CharReaderBuilder jsonReaderBuilder_;
419 std::mt19937_64 rd {crypto::getSeededRandomEngine<std::mt19937_64>()};
421 std::string persistPath_;
424 std::thread serverThread_;
425 std::unique_ptr<restinio::http_server_t<RestRouterTraitsTls>> httpsServer_;
426 std::unique_ptr<restinio::http_server_t<RestRouterTraits>> httpServer_;
429 std::pair<std::string, std::string> pushHostPort_;
431 mutable std::mutex requestLock_;
432 std::map<
unsigned int , std::shared_ptr<http::Request>> requests_;
434 std::shared_ptr<log::Logger> logger_;
436 std::shared_ptr<ServerStats> stats_;
437 std::shared_ptr<NodeInfo> nodeInfo_ {};
438 std::unique_ptr<asio::steady_timer> printStatsTimer_;
439 const time_point serverStartTime_;
440 mutable std::mutex pushStatsMutex_;
441 PushStats androidPush_;
443 PushStats unifiedPush_;
446 std::mutex lockListener_;
448 std::map<restinio::connection_id_t, http::ListenerSession> listeners_;
450 std::shared_ptr<ConnectionListener> connListener_;
451 struct PermanentPut {
452 time_point expiration;
453 std::string pushToken;
454 std::string clientId;
455 std::shared_ptr<PushSessionContext> sessionCtx;
456 std::unique_ptr<asio::steady_timer> expireTimer;
457 std::unique_ptr<asio::steady_timer> expireNotifyTimer;
462 template <
typename Packer>
463 void msgpack_pack(Packer& p)
const
465 p.pack_map(2 + (sessionCtx ? 1 : 0) + (clientId.empty() ? 0 : 1) + (type == PushType::None ? 0 : 2) + (topic.empty() ? 0 : 1));
466 p.pack(
"value"); p.pack(value);
467 p.pack(
"exp"); p.pack(to_time_t(expiration));
468 if (not clientId.empty()) {
469 p.pack(
"cid"); p.pack(clientId);
472 std::lock_guard<std::mutex> l(sessionCtx->lock);
473 p.pack(
"sid"); p.pack(sessionCtx->sessionId);
475 if (type != PushType::None) {
476 p.pack(
"t"); p.pack(type);
477 p.pack(
"token"); p.pack(pushToken);
479 if (not topic.empty()) {
480 p.pack(
"top"); p.pack(topic);
484 void msgpack_unpack(
const msgpack::object& o);
487 std::map<dht::Value::Id, PermanentPut> puts;
488 MSGPACK_DEFINE_ARRAY(puts)
490 std::mutex lockSearchPuts_;
491 std::map<InfoHash, SearchPuts> puts_;
493 mutable std::atomic<size_t> requestNum_ {0};
494 mutable std::atomic<time_point> lastStatsReset_ {time_point::min()};
496 std::string pushServer_;
497 std::string bundleId_;
499#ifdef OPENDHT_PUSH_NOTIFICATIONS
501 time_point expiration;
502 std::string clientId;
503 std::shared_ptr<PushSessionContext> sessionCtx;
504 std::future<size_t> internalToken;
505 std::unique_ptr<asio::steady_timer> expireTimer;
506 std::unique_ptr<asio::steady_timer> expireNotifyTimer;
510 template <
typename Packer>
511 void msgpack_pack(Packer& p)
const
513 p.pack_map(3 + (sessionCtx ? 1 : 0) + (topic.empty() ? 0 : 1));
514 p.pack(
"cid"); p.pack(clientId);
515 p.pack(
"exp"); p.pack(to_time_t(expiration));
517 std::lock_guard<std::mutex> l(sessionCtx->lock);
518 p.pack(
"sid"); p.pack(sessionCtx->sessionId);
520 p.pack(
"t"); p.pack(type);
521 if (!topic.empty()) {
522 p.pack(
"top"); p.pack(topic);
526 void msgpack_unpack(
const msgpack::object& o);
528 struct PushListener {
529 std::map<InfoHash, std::vector<Listener>> listeners;
530 MSGPACK_DEFINE_ARRAY(listeners)
532 std::map<std::string, PushListener> pushListeners_;