Bitcoin Core  31.0.0
P2P Digital Currency
proxy-io.h
Go to the documentation of this file.
1 // Copyright (c) The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #ifndef MP_PROXY_IO_H
6 #define MP_PROXY_IO_H
7 
8 #include <mp/proxy.h>
9 #include <mp/util.h>
10 
11 #include <mp/proxy.capnp.h>
12 
13 #include <capnp/rpc-twoparty.h>
14 
15 #include <assert.h>
16 #include <condition_variable>
17 #include <functional>
18 #include <kj/function.h>
19 #include <map>
20 #include <memory>
21 #include <optional>
22 #include <sstream>
23 #include <string>
24 #include <thread>
25 
26 namespace mp {
27 struct ThreadContext;
28 
30 {
32 };
33 
35 {
39  {
40  }
41 };
42 
43 template <typename ProxyServer, typename CallContext_>
45 {
46  using CallContext = CallContext_;
47 
50  int req;
56  Lock* cancel_lock{nullptr};
63  bool request_canceled{false};
64 
67  {
68  }
69 };
70 
71 template <typename Interface, typename Params, typename Results>
72 using ServerContext = ServerInvokeContext<ProxyServer<Interface>, ::capnp::CallContext<Params, Results>>;
73 
74 template <>
75 struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
76 {
78  // https://stackoverflow.com/questions/22357887/comparing-two-mapiterators-why-does-it-need-the-copy-constructor-of-stdpair
79  ProxyClient(const ProxyClient&) = delete;
80  ~ProxyClient();
81 
91  std::optional<CleanupIt> m_disconnect_cb;
92 };
93 
94 template <>
95 struct ProxyServer<Thread> final : public Thread::Server
96 {
97 public:
98  ProxyServer(Connection& connection, ThreadContext& thread_context, std::thread&& thread);
99  ~ProxyServer();
100  kj::Promise<void> getName(GetNameContext context) override;
101 
106  template<typename T, typename Fn>
107  kj::Promise<T> post(Fn&& fn);
108 
111  std::thread m_thread;
114  kj::Promise<void> m_thread_ready{kj::READY_NOW};
115 };
116 
118 class LoggingErrorHandler : public kj::TaskSet::ErrorHandler
119 {
120 public:
122  void taskFailed(kj::Exception&& exception) override;
124 };
125 
127 enum class Log {
128  Trace = 0,
129  Debug,
130  Info,
131  Warning,
132  Error,
133  Raise,
134 };
135 
136 kj::StringPtr KJ_STRINGIFY(Log flags);
137 
138 struct LogMessage {
139 
141  std::string message;
142 
145 };
146 
147 using LogFn = std::function<void(LogMessage)>;
148 
149 struct LogOptions {
150 
153 
156  size_t max_chars{200};
157 
161 };
162 
163 class Logger
164 {
165 public:
166  Logger(const LogOptions& options, Log log_level) : m_options(options), m_log_level(log_level) {}
167 
168  Logger(Logger&&) = delete;
169  Logger& operator=(Logger&&) = delete;
170  Logger(const Logger&) = delete;
171  Logger& operator=(const Logger&) = delete;
172 
173  ~Logger() noexcept(false)
174  {
175  if (enabled()) m_options.log_fn({std::move(m_buffer).str(), m_log_level});
176  }
177 
178  template <typename T>
179  friend Logger& operator<<(Logger& logger, T&& value)
180  {
181  if (logger.enabled()) logger.m_buffer << std::forward<T>(value);
182  return logger;
183  }
184 
185  template <typename T>
186  friend Logger& operator<<(Logger&& logger, T&& value)
187  {
188  return logger << std::forward<T>(value);
189  }
190 
191  explicit operator bool() const
192  {
193  return enabled();
194  }
195 
196 private:
197  bool enabled() const
198  {
200  }
201 
204  std::ostringstream m_buffer;
205 };
206 
207 #define MP_LOGPLAIN(loop, ...) if (mp::Logger logger{(loop).m_log_opts, __VA_ARGS__}; logger) logger
208 
209 #define MP_LOG(loop, ...) MP_LOGPLAIN(loop, __VA_ARGS__) << "{" << LongThreadName((loop).m_exe_name) << "} "
210 
211 std::string LongThreadName(const char* exe_name);
212 
239 {
240 public:
242  EventLoop(const char* exe_name, LogFn log_fn, void* context = nullptr)
243  : EventLoop(exe_name, LogOptions{std::move(log_fn)}, context){}
244 
246  EventLoop(const char* exe_name, LogOptions log_opts, void* context = nullptr);
247 
249  EventLoop(const char* exe_name, std::function<void(bool, std::string)> old_callback, void* context = nullptr)
250  : EventLoop(exe_name,
251  LogFn{[old_callback = std::move(old_callback)](LogMessage log_data) {old_callback(log_data.level == Log::Raise, std::move(log_data.message));}},
252  context){}
253 
254  ~EventLoop();
255 
259  void loop();
260 
263  void post(kj::Function<void()> fn);
264 
268  template <typename Callable>
269  void sync(Callable&& callable)
270  {
271  post(std::forward<Callable>(callable));
272  }
273 
276  void addAsyncCleanup(std::function<void()> fn);
277 
290 
292  bool done() const MP_REQUIRES(m_mutex);
293 
296  const char* m_exe_name;
297 
299  std::thread::id m_thread_id = std::this_thread::get_id();
300 
304 
306  kj::Function<void()>* m_post_fn MP_GUARDED_BY(m_mutex) = nullptr;
307 
309  std::optional<CleanupList> m_async_fns MP_GUARDED_BY(m_mutex);
310 
312  int m_wait_fd = -1;
313 
315  int m_post_fd = -1;
316 
319  int m_num_clients MP_GUARDED_BY(m_mutex) = 0;
320 
324  std::condition_variable m_cv;
325 
327  kj::AsyncIoContext m_io_context;
328 
331 
333  std::unique_ptr<kj::TaskSet> m_task_set;
334 
336  std::list<Connection> m_incoming_connections;
337 
340 
342  void* m_context;
343 
345  std::function<void()> testing_hook_makethread;
346 
350  std::function<void()> testing_hook_makethread_created;
351 
355  std::function<void()> testing_hook_async_request_start;
356 
358  std::function<void()> testing_hook_async_request_done;
359 };
360 
372 struct Waiter
373 {
374  Waiter() = default;
375 
376  template <typename Fn>
377  bool post(Fn&& fn)
378  {
379  const Lock lock(m_mutex);
380  if (m_fn) return false;
381  m_fn = std::forward<Fn>(fn);
382  m_cv.notify_all();
383  return true;
384  }
385 
386  template <class Predicate>
387  void wait(Lock& lock, Predicate pred)
388  {
389  m_cv.wait(lock.m_lock, [&]() MP_REQUIRES(m_mutex) {
390  // Important for this to be "while (m_fn)", not "if (m_fn)" to avoid
391  // a lost-wakeup bug. A new m_fn and m_cv notification might be sent
392  // after the fn() call and before the lock.lock() call in this loop
393  // in the case where a capnp response is sent and a brand new
394  // request is immediately received.
395  while (m_fn) {
396  auto fn = std::move(*m_fn);
397  m_fn.reset();
398  Unlock(lock, fn);
399  }
400  const bool done = pred();
401  return done;
402  });
403  }
404 
413  std::condition_variable m_cv;
414  std::optional<kj::Function<void()>> m_fn MP_GUARDED_BY(m_mutex);
415 };
416 
423 {
424 public:
425  Connection(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_)
426  : m_loop(loop), m_stream(kj::mv(stream_)),
427  m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
428  m_rpc_system(::capnp::makeRpcClient(m_network)) {}
430  kj::Own<kj::AsyncIoStream>&& stream_,
431  const std::function<::capnp::Capability::Client(Connection&)>& make_client)
432  : m_loop(loop), m_stream(kj::mv(stream_)),
433  m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
434  m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this))) {}
435 
441  ~Connection();
442 
446  CleanupIt addSyncCleanup(std::function<void()> fn);
447  void removeSyncCleanup(CleanupIt it);
448 
450  template <typename F>
451  void onDisconnect(F&& f)
452  {
453  // Add disconnect handler to local TaskSet to ensure it is canceled and
454  // will never run after connection object is destroyed. But when disconnect
455  // handler fires, do not call the function f right away, instead add it
456  // to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
457  // error in the typical case where f deletes this Connection object.
458  m_on_disconnect.add(m_network.onDisconnect().then(
459  [f = std::forward<F>(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
460  }
461 
463  kj::Own<kj::AsyncIoStream> m_stream;
469  ::capnp::TwoPartyVatNetwork m_network;
470  std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
471 
472  // ThreadMap interface client, used to create a remote server thread when an
473  // client IPC call is being made for the first time from a new thread.
474  ThreadMap::Client m_thread_map{nullptr};
475 
478  ::capnp::CapabilityServerSet<Thread> m_threads;
479 
483  kj::Canceler m_canceler;
484 
489 };
490 
500 {
501  ::capnp::word scratch[4]{};
502  ::capnp::MallocMessageBuilder message{scratch};
503  ::capnp::rpc::twoparty::VatId::Builder vat_id{message.getRoot<::capnp::rpc::twoparty::VatId>()};
504  ServerVatId() { vat_id.setSide(::capnp::rpc::twoparty::Side::SERVER); }
505 };
506 
507 template <typename Interface, typename Impl>
508 ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client client,
509  Connection* connection,
510  bool destroy_connection)
511  : m_client(std::move(client)), m_context(connection)
512 
513 {
514  // Handler for the connection getting destroyed before this client object.
515  auto disconnect_cb = m_context.connection->addSyncCleanup([this]() {
516  // Release client capability by move-assigning to temporary.
517  {
518  typename Interface::Client(std::move(m_client));
519  }
520  Lock lock{m_context.loop->m_mutex};
521  m_context.connection = nullptr;
522  });
523 
524  // Two shutdown sequences are supported:
525  //
526  // - A normal sequence where client proxy objects are deleted by external
527  // code that no longer needs them
528  //
529  // - A garbage collection sequence where the connection or event loop shuts
530  // down while external code is still holding client references.
531  //
532  // The first case is handled here when m_context.connection is not null. The
533  // second case is handled by the disconnect_cb function, which sets
534  // m_context.connection to null so nothing happens here.
535  m_context.cleanup_fns.emplace_front([this, destroy_connection, disconnect_cb]{
536  {
537  // If the capnp interface defines a destroy method, call it to destroy
538  // the remote object, waiting for it to be deleted server side. If the
539  // capnp interface does not define a destroy method, this will just call
540  // an empty stub defined in the ProxyClientBase class and do nothing.
541  Sub::destroy(*this);
542 
543  // FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
544  m_context.loop->sync([&]() {
545  // Remove disconnect callback on cleanup so it doesn't run and try
546  // to access this object after it's destroyed. This call needs to
547  // run inside loop->sync() on the event loop thread because
548  // otherwise, if there were an ill-timed disconnect, the
549  // onDisconnect handler could fire and delete the Connection object
550  // before the removeSyncCleanup call.
551  if (m_context.connection) m_context.connection->removeSyncCleanup(disconnect_cb);
552 
553  // Release client capability by move-assigning to temporary.
554  {
555  typename Interface::Client(std::move(m_client));
556  }
557  if (destroy_connection) {
558  delete m_context.connection;
559  m_context.connection = nullptr;
560  }
561  });
562  }
563  });
564  Sub::construct(*this);
565 }
566 
567 template <typename Interface, typename Impl>
569 {
570  CleanupRun(m_context.cleanup_fns);
571 }
572 
573 template <typename Interface, typename Impl>
574 ProxyServerBase<Interface, Impl>::ProxyServerBase(std::shared_ptr<Impl> impl, Connection& connection)
575  : m_impl(std::move(impl)), m_context(&connection)
576 {
577  assert(m_impl);
578 }
579 
592 template <typename Interface, typename Impl>
594 {
595  if (m_impl) {
596  // If impl is non-null at this point, it means no client is waiting for
597  // the m_impl server object to be destroyed synchronously. This can
598  // happen either if the interface did not define a "destroy" method (see
599  // invokeDestroy method below), or if a destroy method was defined, but
600  // the connection was broken before it could be called.
601  //
602  // In either case, be conservative and run the cleanup on an
603  // asynchronous thread, to avoid destructors or cleanup functions
604  // blocking or deadlocking the current EventLoop thread, since they
605  // could be making IPC calls.
606  //
607  // Technically this is a little too conservative since if the interface
608  // defines a "destroy" method, but the destroy method does not accept a
609  // Context parameter specifying a worker thread, the cleanup method
610  // would run on the EventLoop thread normally (when connection is
611  // unbroken), but will not run on the EventLoop thread now (when
612  // connection is broken). Probably some refactoring of the destructor
613  // and invokeDestroy function is possible to make this cleaner and more
614  // consistent.
615  m_context.loop->addAsyncCleanup([impl=std::move(m_impl), fns=std::move(m_context.cleanup_fns)]() mutable {
616  impl.reset();
617  CleanupRun(fns);
618  });
619  }
620  assert(m_context.cleanup_fns.empty());
621 }
622 
640 template <typename Interface, typename Impl>
642 {
643  m_impl.reset();
644  CleanupRun(m_context.cleanup_fns);
645 }
646 
653 using ConnThreads = std::map<Connection*, std::optional<ProxyClient<Thread>>>;
654 using ConnThread = ConnThreads::iterator;
655 
656 // Retrieve ProxyClient<Thread> object associated with this connection from a
657 // map, or create a new one and insert it into the map. Return map iterator and
658 // inserted bool.
659 std::tuple<ConnThread, bool> SetThread(GuardedRef<ConnThreads> threads, Connection* connection, const std::function<Thread::Client()>& make_thread);
660 
675 {
677  std::string thread_name;
678 
694  std::unique_ptr<Waiter> waiter = nullptr;
695 
713  ConnThreads callback_threads MP_GUARDED_BY(waiter->m_mutex);
714 
724  ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex);
725 
729  bool loop_thread = false;
730 };
731 
732 template<typename T, typename Fn>
733 kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
734 {
735  auto ready = kj::newPromiseAndFulfiller<void>(); // Signaled when waiter is ready to post again.
736  auto cancel_monitor_ptr = kj::heap<CancelMonitor>();
737  CancelMonitor& cancel_monitor = *cancel_monitor_ptr;
738  // Keep a reference to the ProxyServer<Thread> instance by assigning it to
739  // the self variable. ProxyServer instances are reference-counted and if the
740  // client drops its reference, this variable keeps the instance alive until
741  // the thread finishes executing. The self variable needs to be destroyed on
742  // the event loop thread so it is freed in a sync() call below.
743  auto self = thisCap();
744  auto ret = m_thread_ready.then([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
745  auto result = kj::newPromiseAndFulfiller<T>(); // Signaled when fn() is called, with its return value.
746  bool posted = m_thread_context.waiter->post([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready_fulfiller), result_fulfiller = kj::mv(result.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
747  // Fulfill ready.promise now, as soon as the Waiter starts executing
748  // this lambda, so the next ProxyServer<Thread>::post() call can
749  // immediately call waiter->post(). It is important to do this
750  // before calling fn() because fn() can make an IPC call back to the
751  // client, which can make another IPC call to this server thread.
752  // (This typically happens when IPC methods take std::function
753  // parameters.) When this happens the second call to the server
754  // thread should not be blocked waiting for the first call.
755  m_loop->sync([ready_fulfiller = kj::mv(ready_fulfiller)]() mutable {
756  ready_fulfiller->fulfill();
757  ready_fulfiller = nullptr;
758  });
759  std::optional<T> result_value;
760  kj::Maybe<kj::Exception> exception{kj::runCatchingExceptions([&]{ result_value.emplace(fn(*cancel_monitor_ptr)); })};
761  m_loop->sync([this, &result_value, &exception, self = kj::mv(self), result_fulfiller = kj::mv(result_fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
762  // Destroy CancelMonitor here before fulfilling or rejecting the
763  // promise so it doesn't get triggered when the promise is
764  // destroyed.
765  cancel_monitor_ptr = nullptr;
766  // Send results to the fulfiller. Technically it would be ok to
767  // skip this if promise was canceled, but it's simpler to just
768  // do it unconditionally.
769  KJ_IF_MAYBE(e, exception) {
770  assert(!result_value);
771  result_fulfiller->reject(kj::mv(*e));
772  } else {
773  assert(result_value);
774  result_fulfiller->fulfill(kj::mv(*result_value));
775  result_value.reset();
776  }
777  result_fulfiller = nullptr;
778  // Use evalLater to destroy the ProxyServer<Thread> self
779  // reference, if it is the last reference, because the
780  // ProxyServer<Thread> destructor needs to join the thread,
781  // which can't happen until this sync() block has exited.
782  m_loop->m_task_set->add(kj::evalLater([self = kj::mv(self)] {}));
783  });
784  });
785  // Assert that calling Waiter::post did not fail. It could only return
786  // false if a new function was posted before the previous one finished
787  // executing, but new functions are only posted when m_thread_ready is
788  // signaled, so this should never happen.
789  assert(posted);
790  return kj::mv(result.promise);
791  }).attach(kj::heap<CancelProbe>(cancel_monitor));
792  m_thread_ready = kj::mv(ready.promise);
793  return ret;
794 }
795 
799 template <typename InitInterface>
800 std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int fd)
801 {
802  typename InitInterface::Client init_client(nullptr);
803  std::unique_ptr<Connection> connection;
804  loop.sync([&] {
805  auto stream =
806  loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
807  connection = std::make_unique<Connection>(loop, kj::mv(stream));
808  init_client = connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<InitInterface>();
809  Connection* connection_ptr = connection.get();
810  connection->onDisconnect([&loop, connection_ptr] {
811  MP_LOG(loop, Log::Warning) << "IPC client: unexpected network disconnect.";
812  delete connection_ptr;
813  });
814  });
815  return std::make_unique<ProxyClient<InitInterface>>(
816  kj::mv(init_client), connection.release(), /* destroy_connection= */ true);
817 }
818 
823 template <typename InitInterface, typename InitImpl>
824 void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init)
825 {
826  loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), [&](Connection& connection) {
827  // Disable deleter so proxy server object doesn't attempt to delete the
828  // init implementation when the proxy client is destroyed or
829  // disconnected.
830  return kj::heap<ProxyServer<InitInterface>>(std::shared_ptr<InitImpl>(&init, [](InitImpl*){}), connection);
831  });
832  auto it = loop.m_incoming_connections.begin();
833  MP_LOG(loop, Log::Info) << "IPC server: socket connected.";
834  it->onDisconnect([&loop, it] {
835  MP_LOG(loop, Log::Info) << "IPC server: socket disconnected.";
836  loop.m_incoming_connections.erase(it);
837  });
838 }
839 
843 template <typename InitInterface, typename InitImpl>
844 void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitImpl& init)
845 {
846  auto* ptr = listener.get();
847  loop.m_task_set->add(ptr->accept().then(
848  [&loop, &init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
849  _Serve<InitInterface>(loop, kj::mv(stream), init);
850  _Listen<InitInterface>(loop, kj::mv(listener), init);
851  }));
852 }
853 
856 template <typename InitInterface, typename InitImpl>
857 void ServeStream(EventLoop& loop, int fd, InitImpl& init)
858 {
859  _Serve<InitInterface>(
860  loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init);
861 }
862 
865 template <typename InitInterface, typename InitImpl>
867 {
868  loop.sync([&]() {
869  _Listen<InitInterface>(loop,
870  loop.m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
871  init);
872  });
873 }
874 
875 extern thread_local ThreadContext g_thread_context; // NOLINT(bitcoin-nontrivial-threadlocal)
876 // Silence nonstandard bitcoin tidy error "Variable with non-trivial destructor
877 // cannot be thread_local" which should not be a problem on modern platforms, and
878 // could lead to a small memory leak at worst on older ones.
879 
880 } // namespace mp
881 
882 #endif // MP_PROXY_IO_H
std::thread::id m_thread_id
ID of the event loop thread.
Definition: proxy-io.h:299
::capnp::TwoPartyVatNetwork m_network
Definition: proxy-io.h:469
int ret
std::thread m_async_thread
Handle of an async worker thread.
Definition: proxy-io.h:303
void ServeStream(EventLoop &loop, int fd, InitImpl &init)
Given stream file descriptor and an init object, handle requests on the stream by calling methods on ...
Definition: proxy-io.h:857
::capnp::MallocMessageBuilder message
Definition: proxy-io.h:502
Single element task queue used to handle recursive capnp calls.
Definition: proxy-io.h:372
assert(!tx.IsCoinBase())
Mutex m_mutex
Mutex and condition variable used to post tasks to event loop and async thread.
Definition: proxy-io.h:323
std::unique_ptr< kj::TaskSet > m_task_set
Capnp list of pending promises.
Definition: proxy-io.h:333
int m_post_fd
Pipe write handle used to wake up the event loop thread.
Definition: proxy-io.h:315
Handler for kj::TaskSet failed task events.
Definition: proxy-io.h:118
The thread_local ThreadContext g_thread_context struct provides information about individual threads ...
Definition: proxy-io.h:674
const LogOptions & m_options
Definition: proxy-io.h:202
void onDisconnect(F &&f)
Add disconnect handler.
Definition: proxy-io.h:451
bool request_canceled
For IPC methods that execute asynchronously, not on the event-loop thread, this is set to true if the...
Definition: proxy-io.h:63
Base class for generated ProxyClient classes that implement a C++ interface and forward calls to a ca...
Definition: proxy.h:80
std::function< void()> testing_hook_async_request_done
Hook called on the worker thread just before returning results.
Definition: proxy-io.h:358
std::function< void(LogMessage)> LogFn
Definition: proxy-io.h:147
std::string LongThreadName(const char *exe_name)
Definition: proxy.cpp:436
::capnp::word scratch[4]
Definition: proxy-io.h:501
EventLoop(const char *exe_name, std::function< void(bool, std::string)> old_callback, void *context=nullptr)
Backwards-compatible constructor for previous (deprecated) logging callback signature.
Definition: proxy-io.h:249
Object holding network & rpc state associated with either an incoming server connection, or an outgoing client connection.
Definition: proxy-io.h:422
ClientInvokeContext(Connection &conn, ThreadContext &thread_context)
Definition: proxy-io.h:37
friend Logger & operator<<(Logger &&logger, T &&value)
Definition: proxy-io.h:186
ConnThreads::iterator ConnThread
Definition: proxy-io.h:654
friend Logger & operator<<(Logger &logger, T &&value)
Definition: proxy-io.h:179
Definition: common.h:29
std::condition_variable m_cv
Definition: proxy-io.h:324
std::ostringstream m_buffer
Definition: proxy-io.h:204
LoggingErrorHandler m_error_handler
Definition: proxy-io.h:464
std::function< void()> testing_hook_makethread
Hook called when ProxyServer<ThreadMap>::makeThread() is called.
Definition: proxy-io.h:345
Context m_context
Definition: protocol.cpp:141
kj::TaskSet m_on_disconnect
TaskSet used to cancel the m_network.onDisconnect() handler for remote disconnections, if the connection is closed locally first by deleting this Connection object.
Definition: proxy-io.h:468
bool post(Fn &&fn)
Definition: proxy-io.h:377
Logger & operator=(Logger &&)=delete
#define MP_REQUIRES(x)
Definition: util.h:156
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId > > m_rpc_system
Definition: proxy-io.h:470
~Connection()
Run cleanup functions.
Definition: proxy.cpp:83
Log
Log flags. Update stringify function if changed!
Definition: proxy-io.h:127
Functions to serialize / deserialize common bitcoin types.
Definition: common-types.h:57
std::condition_variable m_cv
Definition: proxy-io.h:413
void taskFailed(kj::Exception &&exception) override
Definition: proxy.cpp:43
std::list< Connection > m_incoming_connections
List of connections.
Definition: proxy-io.h:336
Event loop implementation.
Definition: proxy-io.h:238
int m_wait_fd
Pipe read handle used to wake up the event loop thread.
Definition: proxy-io.h:312
void ListenConnections(EventLoop &loop, int fd, InitImpl &init)
Given listening socket file descriptor and an init object, handle incoming connections and requests b...
Definition: proxy-io.h:866
EventLoopRef m_loop
Definition: proxy-io.h:462
std::function< void()> testing_hook_makethread_created
Hook called on the worker thread inside makeThread(), after the thread context is set up and thread_c...
Definition: proxy-io.h:350
Mapping from capnp interface type to proxy server implementation (specializations are generated by pr...
Definition: proxy.h:28
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_, const std::function<::capnp::Capability::Client(Connection &)> &make_client)
Definition: proxy-io.h:429
Lock * cancel_lock
For IPC methods that execute asynchronously, not on the event-loop thread: lock preventing the event-...
Definition: proxy-io.h:56
LoggingErrorHandler m_error_handler
Capnp error handler. Needs to outlive m_task_set.
Definition: proxy-io.h:330
std::string message
Message to be logged.
Definition: proxy-io.h:141
void * m_context
External context pointer.
Definition: proxy-io.h:342
Connection & connection
Definition: proxy-io.h:31
CleanupList m_sync_cleanup_fns
Cleanup functions to run if connection is broken unexpectedly.
Definition: proxy-io.h:488
std::function< void()> testing_hook_async_request_start
Hook called on the worker thread when it starts to execute an async request.
Definition: proxy-io.h:355
kj::StringPtr KJ_STRINGIFY(Log flags)
Definition: proxy.cpp:441
thread_local ThreadContext g_thread_context
Definition: proxy.cpp:41
void Unlock(Lock &lock, Callback &&callback)
Definition: util.h:207
std::unique_ptr< interfaces::Init > init
std::optional< CleanupIt > m_disconnect_cb
Reference to callback function that is run if there is a sudden disconnect and the Connection object ...
Definition: proxy-io.h:91
Log level
The severity level of this message.
Definition: proxy-io.h:144
LogOptions m_log_opts
Logging options.
Definition: proxy-io.h:339
void _Serve(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream, InitImpl &init)
Given stream and init objects, construct a new ProxyServer object that handles requests from the stre...
Definition: proxy-io.h:824
void startAsyncThread() MP_REQUIRES(m_mutex)
Start asynchronous worker thread if necessary.
Definition: proxy.cpp:289
ProxyClientBase(typename Interface::Client client, Connection *connection, bool destroy_connection)
Construct libmultiprocess client object wrapping Cap&#39;n Proto client object with a reference to the as...
Definition: proxy-io.h:508
size_t max_chars
Maximum number of characters to use when representing request and response structs as strings...
Definition: proxy-io.h:156
void addAsyncCleanup(std::function< void()> fn)
Register cleanup function to run on asynchronous worker thread without blocking the event loop thread...
Definition: proxy.cpp:176
Log m_log_level
Definition: proxy-io.h:203
ProxyServer & proxy_server
Definition: proxy-io.h:48
CallContext_ CallContext
Definition: proxy-io.h:46
LoggingErrorHandler(EventLoop &loop)
Definition: proxy-io.h:121
std::list< std::function< void()> > CleanupList
Definition: proxy.h:36
EventLoop(const char *exe_name, LogFn log_fn, void *context=nullptr)
Construct event loop object with default logging options.
Definition: proxy-io.h:242
::capnp::CapabilityServerSet< Thread > m_threads
Collection of server-side IPC worker threads (ProxyServer<Thread> objects previously returned by Thre...
Definition: proxy-io.h:478
kj::Function< void()> *m_post_fn MP_GUARDED_BY(m_mutex)
Callback function to run on event loop thread during post() or sync() call.
void sync(Callable &&callable)
Wrapper around EventLoop::post that takes advantage of the fact that callable will not go out of scop...
Definition: proxy-io.h:269
Waiter()=default
kj::Own< kj::AsyncIoStream > m_stream
Definition: proxy-io.h:463
std::string thread_name
Identifying string for debug.
Definition: proxy-io.h:677
int flags
Definition: bitcoin-tx.cpp:529
bool done() const MP_REQUIRES(m_mutex)
Check if loop should exit.
Definition: proxy.cpp:315
kj::Canceler m_canceler
Canceler for canceling promises that we want to discard when the connection is destroyed.
Definition: proxy-io.h:483
std::unique_lock< std::mutex > m_lock
Definition: util.h:183
Mutex m_mutex
Mutex mainly used internally by waiter class, but also used externally to guard access to related sta...
Definition: proxy-io.h:412
Logger(const LogOptions &options, Log log_level)
Definition: proxy-io.h:166
ThreadContext & thread_context
Definition: proxy-io.h:36
auto result
Definition: common-types.h:74
const char * m_exe_name
Process name included in thread names so combined debug output from multiple processes is easier to u...
Definition: proxy-io.h:296
std::tuple< ConnThread, bool > SetThread(GuardedRef< ConnThreads > threads, Connection *connection, const std::function< Thread::Client()> &make_thread)
Definition: proxy.cpp:321
#define MP_LOG(loop,...)
Definition: proxy-io.h:209
ServerInvokeContext(ProxyServer &proxy_server, CallContext &call_context, int req)
Definition: proxy-io.h:65
std::map< Connection *, std::optional< ProxyClient< Thread > >> ConnThreads
Map from Connection to local or remote thread handle which will be used over that connection...
Definition: proxy-io.h:653
Event loop smart pointer automatically managing m_num_clients.
Definition: proxy.h:50
::capnp::rpc::twoparty::VatId::Builder vat_id
Definition: proxy-io.h:503
void wait(Lock &lock, Predicate pred)
Definition: proxy-io.h:387
Definition: util.h:171
std::optional< kj::Function< void()> > m_fn MP_GUARDED_BY(m_mutex)
#define MP_GUARDED_BY(x)
Definition: util.h:160
std::unique_ptr< ProxyClient< InitInterface > > ConnectStream(EventLoop &loop, int fd)
Given stream file descriptor, make a new ProxyClient object to send requests over the stream...
Definition: proxy-io.h:800
CallContext & call_context
Definition: proxy-io.h:49
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_)
Definition: proxy-io.h:425
ThreadContext & m_thread_context
Definition: proxy-io.h:110
Vat id for server side of connection.
Definition: proxy-io.h:499
void removeSyncCleanup(CleanupIt it)
Definition: proxy.cpp:167
Base class for generated ProxyServer classes that implement capnp server methods and forward calls to...
Definition: proxy.h:147
typename CleanupList::iterator CleanupIt
Definition: proxy.h:37
ThreadMap::Client m_thread_map
Definition: proxy-io.h:474
LogFn log_fn
External logging callback.
Definition: proxy-io.h:152
void CleanupRun(CleanupList &fns)
Definition: proxy.h:39
Log log_level
Messages with a severity level less than log_level will not be reported.
Definition: proxy-io.h:160
bool enabled() const
Definition: proxy-io.h:197
void _Listen(EventLoop &loop, kj::Own< kj::ConnectionReceiver > &&listener, InitImpl &init)
Given connection receiver and an init object, handle incoming connections by calling _Serve...
Definition: proxy-io.h:844
void loop()
Run event loop.
Definition: proxy.cpp:227
~Logger() noexcept(false)
Definition: proxy-io.h:173
Mapping from capnp interface type to proxy client implementation (specializations are generated by pr...
Definition: proxy.h:25
CleanupIt addSyncCleanup(std::function< void()> fn)
Register synchronous cleanup function to run on event loop thread (with access to capnp thread local ...
Definition: proxy.cpp:153
EventLoop & m_loop
Definition: proxy-io.h:123
Helper class that detects when a promise is canceled.
Definition: util.h:292
kj::AsyncIoContext m_io_context
Capnp IO context.
Definition: proxy-io.h:327
void post(kj::Function< void()> fn)
Run function on event loop thread.
Definition: proxy.cpp:271