My Project 3.4.0
C++ Distributed Hash Table
Loading...
Searching...
No Matches
dht_proxy_server.h
1/*
2 * Copyright (C) 2014-2023 Savoir-faire Linux Inc.
3 * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
4 * Adrien Béraud <adrien.beraud@savoirfairelinux.com>
5 * Vsevolod Ivanov <vsevolod.ivanov@savoirfairelinux.com>
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program. If not, see <https://www.gnu.org/licenses/>.
19 */
20
21#pragma once
22
23#include "callbacks.h"
24#include "def.h"
25#include "infohash.h"
26#include "proxy.h"
27#include "scheduler.h"
28#include "sockaddr.h"
29#include "value.h"
30#include "http.h"
31
32#include <restinio/all.hpp>
33#include <restinio/tls.hpp>
34#include <json/json.h>
35
36#include <memory>
37#include <mutex>
38
39namespace dht {
40enum class PushType {
41 None = 0,
42 Android,
43 iOS,
44 UnifiedPush
45};
46}
47MSGPACK_ADD_ENUM(dht::PushType)
48
49namespace Json {
50class Value;
51}
52
53namespace dht {
54
55namespace http {
56class Request;
57struct ListenerSession;
58}
59
60class DhtRunner;
61
62using RestRouter = restinio::router::express_router_t<>;
63using RequestStatus = restinio::request_handling_status_t;
64
65struct OPENDHT_PUBLIC ProxyServerConfig {
66 std::string address {};
67 in_port_t port {8000};
68 std::string pushServer {};
69 std::string persistStatePath {};
70 dht::crypto::Identity identity {};
71 std::string bundleId {};
72};
73
77class OPENDHT_PUBLIC DhtProxyServer
78{
79public:
88 DhtProxyServer(const std::shared_ptr<DhtRunner>& dht,
89 const ProxyServerConfig& config = {},
90 const std::shared_ptr<log::Logger>& logger = {});
91
92 virtual ~DhtProxyServer();
93
94 DhtProxyServer(const DhtProxyServer& other) = delete;
95 DhtProxyServer(DhtProxyServer&& other) = delete;
96 DhtProxyServer& operator=(const DhtProxyServer& other) = delete;
97 DhtProxyServer& operator=(DhtProxyServer&& other) = delete;
98
99 asio::io_context& io_context() const;
100
101 using clock = std::chrono::steady_clock;
102 using time_point = clock::time_point;
103
104 struct PushStats {
105 uint64_t highPriorityCount {0};
106 uint64_t normalPriorityCount {0};
107
108 void increment(bool highPriority) {
109 if (highPriority)
110 highPriorityCount++;
111 else
112 normalPriorityCount++;
113 }
114
115 Json::Value toJson() const {
116 Json::Value val;
117 val["highPriorityCount"] = static_cast<Json::UInt64>(highPriorityCount);
118 val["normalPriorityCount"] = static_cast<Json::UInt64>(normalPriorityCount);
119 return val;
120 }
121
122 std::string toString() const {
123 return fmt::format("{} high priority, {} normal priority", highPriorityCount, normalPriorityCount);
124 }
125 };
126
127 struct ServerStats {
129 size_t listenCount {0};
131 size_t putCount {0};
136
138 time_point serverStartTime;
140 time_point lastUpdated;
144 PushStats iosPush;
145 PushStats unifiedPush;
146
148 double requestRate {0};
150 std::shared_ptr<NodeInfo> nodeInfo {};
151
152 std::string toString() const {
153 auto ret = fmt::format("Listens: {}, Puts: {}, PushListeners: {}\n"
154 "Push requests in the last {}: [Android: {}], [iOS: {}], [Unified: {}]\n"
155 "Requests: {} per second.",
156 listenCount, putCount, pushListenersCount,
157 print_duration(lastUpdated - serverStartTime),
158 androidPush.toString(), iosPush.toString(), unifiedPush.toString(),
159 requestRate);
160 if (nodeInfo) {
161 auto& ipv4 = nodeInfo->ipv4;
162 if (ipv4.table_depth > 1)
163 ret += fmt::format("IPv4 Network estimation: {}\n", ipv4.getNetworkSizeEstimation());
164 auto& ipv6 = nodeInfo->ipv6;
165 if (ipv6.table_depth > 1)
166 ret += fmt::format("IPv6 Network estimation: {}\n", ipv6.getNetworkSizeEstimation());
167 }
168 return ret;
169 }
170
174 Json::Value toJson() const {
175 Json::Value result;
176 result["listenCount"] = static_cast<Json::UInt64>(listenCount);
177 result["putCount"] = static_cast<Json::UInt64>(putCount);
178 result["totalPermanentPuts"] = static_cast<Json::UInt64>(totalPermanentPuts);
179 result["pushListenersCount"] = static_cast<Json::UInt64>(pushListenersCount);
180 result["serverStartTime"] = static_cast<Json::LargestInt>(to_time_t(serverStartTime));
181 result["lastUpdated"] = static_cast<Json::LargestInt>(to_time_t(lastUpdated));
182 result["androidPush"] = androidPush.toJson();
183 result["iosPush"] = iosPush.toJson();
184 result["unifiedPush"] = unifiedPush.toJson();
185 result["requestRate"] = requestRate;
186 if (nodeInfo)
187 result["nodeInfo"] = nodeInfo->toJson();
188 return result;
189 }
190 };
191
192 std::shared_ptr<ServerStats> stats() const { return stats_; }
193
194 std::shared_ptr<ServerStats> updateStats(std::shared_ptr<NodeInfo> info) const;
195
196 std::shared_ptr<DhtRunner> getNode() const { return dht_; }
197
198private:
199 class ConnectionListener;
200 struct RestRouterTraitsTls;
201 struct RestRouterTraits;
202
203 template <typename HttpResponse>
204 static HttpResponse initHttpResponse(HttpResponse response);
205 static restinio::request_handling_status_t serverError(restinio::request_t& request);
206
207 template< typename ServerSettings >
208 void addServerSettings(ServerSettings& serverSettings,
209 const unsigned int max_pipelined_requests = 16);
210
211 std::unique_ptr<RestRouter> createRestRouter();
212
213 void onConnectionClosed(restinio::connection_id_t);
214
222 RequestStatus getNodeInfo(restinio::request_handle_t request,
223 restinio::router::route_params_t params) const;
224
231 RequestStatus getStats(restinio::request_handle_t request,
232 restinio::router::route_params_t params);
233
244 RequestStatus get(restinio::request_handle_t request,
245 restinio::router::route_params_t params);
246
257 RequestStatus listen(restinio::request_handle_t request,
258 restinio::router::route_params_t params);
259
269 RequestStatus put(restinio::request_handle_t request,
270 restinio::router::route_params_t params);
271
272 void handleCancelPermamentPut(const asio::error_code &ec, const InfoHash& key, Value::Id vid);
273
274#ifdef OPENDHT_PROXY_SERVER_IDENTITY
284 RequestStatus putSigned(restinio::request_handle_t request,
285 restinio::router::route_params_t params) const;
286
296 RequestStatus putEncrypted(restinio::request_handle_t request,
297 restinio::router::route_params_t params);
298
299#endif // OPENDHT_PROXY_SERVER_IDENTITY
300
311 RequestStatus getFiltered(restinio::request_handle_t request,
312 restinio::router::route_params_t params);
313
321 RequestStatus options(restinio::request_handle_t request,
322 restinio::router::route_params_t params);
323
324 struct PushSessionContext {
325 std::mutex lock;
326 std::string sessionId;
327 PushSessionContext(const std::string& id) : sessionId(id) {}
328 };
329
330#ifdef OPENDHT_PUSH_NOTIFICATIONS
331 PushType getTypeFromString(const std::string& type);
332 std::string getDefaultTopic(PushType type);
333
334 RequestStatus pingPush(restinio::request_handle_t request,
335 restinio::router::route_params_t /*params*/);
345 RequestStatus subscribe(restinio::request_handle_t request,
346 restinio::router::route_params_t params);
347
355 RequestStatus unsubscribe(restinio::request_handle_t request,
356 restinio::router::route_params_t params);
357
363 void sendPushNotification(const std::string& key, Json::Value&& json, PushType type, bool highPriority, const std::string& topic);
364
373 void handleNotifyPushListenExpire(const asio::error_code &ec, const std::string pushToken,
374 std::function<Json::Value()> json, PushType type, const std::string& topic);
375
383 void handleCancelPushListen(const asio::error_code &ec, const std::string pushToken,
384 const InfoHash key, const std::string clientId);
385
399 bool handlePushListen(const InfoHash& infoHash, const std::string& pushToken,
400 PushType type, const std::string& clientId,
401 const std::shared_ptr<DhtProxyServer::PushSessionContext>& sessionCtx, const std::string& topic,
402 const std::vector<std::shared_ptr<Value>>& values, bool expired);
403
404#endif //OPENDHT_PUSH_NOTIFICATIONS
405
406 void handlePrintStats(const asio::error_code &ec);
407 void updateStats();
408
409 template <typename Os>
410 void saveState(Os& stream);
411
412 template <typename Is>
413 void loadState(Is& is, size_t size);
414
415 std::shared_ptr<asio::io_context> ioContext_;
416 std::shared_ptr<DhtRunner> dht_;
417 Json::StreamWriterBuilder jsonBuilder_;
418 Json::CharReaderBuilder jsonReaderBuilder_;
419 std::mt19937_64 rd {crypto::getSeededRandomEngine<std::mt19937_64>()};
420
421 std::string persistPath_;
422
423 // http server
424 std::thread serverThread_;
425 std::unique_ptr<restinio::http_server_t<RestRouterTraitsTls>> httpsServer_;
426 std::unique_ptr<restinio::http_server_t<RestRouterTraits>> httpServer_;
427
428 // http client
429 std::pair<std::string, std::string> pushHostPort_;
430
431 mutable std::mutex requestLock_;
432 std::map<unsigned int /*id*/, std::shared_ptr<http::Request>> requests_;
433
434 std::shared_ptr<log::Logger> logger_;
435
436 std::shared_ptr<ServerStats> stats_;
437 std::shared_ptr<NodeInfo> nodeInfo_ {};
438 std::unique_ptr<asio::steady_timer> printStatsTimer_;
439 const time_point serverStartTime_;
440 mutable std::mutex pushStatsMutex_;
441 PushStats androidPush_;
442 PushStats iosPush_;
443 PushStats unifiedPush_;
444
445 // Thread-safe access to listeners map.
446 std::mutex lockListener_;
447 // Shared with connection listener.
448 std::map<restinio::connection_id_t, http::ListenerSession> listeners_;
449 // Connection Listener observing conn state changes.
450 std::shared_ptr<ConnectionListener> connListener_;
451 struct PermanentPut {
452 time_point expiration;
453 std::string pushToken;
454 std::string clientId;
455 std::shared_ptr<PushSessionContext> sessionCtx;
456 std::unique_ptr<asio::steady_timer> expireTimer;
457 std::unique_ptr<asio::steady_timer> expireNotifyTimer;
458 Sp<Value> value;
459 PushType type;
460 std::string topic;
461
462 template <typename Packer>
463 void msgpack_pack(Packer& p) const
464 {
465 p.pack_map(2 + (sessionCtx ? 1 : 0) + (clientId.empty() ? 0 : 1) + (type == PushType::None ? 0 : 2) + (topic.empty() ? 0 : 1));
466 p.pack("value"); p.pack(value);
467 p.pack("exp"); p.pack(to_time_t(expiration));
468 if (not clientId.empty()) {
469 p.pack("cid"); p.pack(clientId);
470 }
471 if (sessionCtx) {
472 std::lock_guard<std::mutex> l(sessionCtx->lock);
473 p.pack("sid"); p.pack(sessionCtx->sessionId);
474 }
475 if (type != PushType::None) {
476 p.pack("t"); p.pack(type);
477 p.pack("token"); p.pack(pushToken);
478 }
479 if (not topic.empty()) {
480 p.pack("top"); p.pack(topic);
481 }
482 }
483
484 void msgpack_unpack(const msgpack::object& o);
485 };
486 struct SearchPuts {
487 std::map<dht::Value::Id, PermanentPut> puts;
488 MSGPACK_DEFINE_ARRAY(puts)
489 };
490 std::mutex lockSearchPuts_;
491 std::map<InfoHash, SearchPuts> puts_;
492
493 mutable std::atomic<size_t> requestNum_ {0};
494 mutable std::atomic<time_point> lastStatsReset_ {time_point::min()};
495
496 std::string pushServer_;
497 std::string bundleId_;
498
499#ifdef OPENDHT_PUSH_NOTIFICATIONS
500 struct Listener {
501 time_point expiration;
502 std::string clientId;
503 std::shared_ptr<PushSessionContext> sessionCtx;
504 std::future<size_t> internalToken;
505 std::unique_ptr<asio::steady_timer> expireTimer;
506 std::unique_ptr<asio::steady_timer> expireNotifyTimer;
507 PushType type;
508 std::string topic;
509
510 template <typename Packer>
511 void msgpack_pack(Packer& p) const
512 {
513 p.pack_map(3 + (sessionCtx ? 1 : 0) + (topic.empty() ? 0 : 1));
514 p.pack("cid"); p.pack(clientId);
515 p.pack("exp"); p.pack(to_time_t(expiration));
516 if (sessionCtx) {
517 std::lock_guard<std::mutex> l(sessionCtx->lock);
518 p.pack("sid"); p.pack(sessionCtx->sessionId);
519 }
520 p.pack("t"); p.pack(type);
521 if (!topic.empty()) {
522 p.pack("top"); p.pack(topic);
523 }
524 }
525
526 void msgpack_unpack(const msgpack::object& o);
527 };
528 struct PushListener {
529 std::map<InfoHash, std::vector<Listener>> listeners;
530 MSGPACK_DEFINE_ARRAY(listeners)
531 };
532 std::map<std::string, PushListener> pushListeners_;
533#endif //OPENDHT_PUSH_NOTIFICATIONS
534};
535
536}
DhtProxyServer(const std::shared_ptr< DhtRunner > &dht, const ProxyServerConfig &config={}, const std::shared_ptr< log::Logger > &logger={})
std::shared_ptr< NodeInfo > nodeInfo