11 #include <mp/proxy.capnp.h> 13 #include <capnp/rpc-twoparty.h> 16 #include <condition_variable> 18 #include <kj/function.h> 43 template <
typename ProxyServer,
typename CallContext_>
71 template <
typename Interface,
typename Params,
typename Results>
100 kj::Promise<void> getName(GetNameContext context)
override;
106 template<
typename T,
typename Fn>
107 kj::Promise<T> post(Fn&& fn);
114 kj::Promise<void> m_thread_ready{kj::READY_NOW};
122 void taskFailed(kj::Exception&& exception)
override;
147 using LogFn = std::function<void(LogMessage)>;
178 template <
typename T>
185 template <
typename T>
188 return logger << std::forward<T>(value);
191 explicit operator bool()
const 207 #define MP_LOGPLAIN(loop, ...) if (mp::Logger logger{(loop).m_log_opts, __VA_ARGS__}; logger) logger 209 #define MP_LOG(loop, ...) MP_LOGPLAIN(loop, __VA_ARGS__) << "{" << LongThreadName((loop).m_exe_name) << "} " 246 EventLoop(
const char* exe_name, LogOptions log_opts,
void* context =
nullptr);
249 EventLoop(
const char* exe_name, std::function<
void(
bool, std::string)> old_callback,
void* context =
nullptr)
251 LogFn{[old_callback = std::move(old_callback)](
LogMessage log_data) {old_callback(log_data.level ==
Log::Raise, std::move(log_data.message));}},
263 void post(kj::Function<
void()> fn);
268 template <
typename Callable>
271 post(std::forward<Callable>(callable));
376 template <
typename Fn>
380 if (m_fn)
return false;
381 m_fn = std::forward<Fn>(fn);
386 template <
class Predicate>
396 auto fn = std::move(*m_fn);
400 const bool done = pred();
427 m_network(*
m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
430 kj::Own<kj::AsyncIoStream>&& stream_,
431 const std::function<::capnp::Capability::Client(
Connection&)>& make_client)
433 m_network(*
m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
450 template <
typename F>
459 [f = std::forward<F>(f),
this]()
mutable {
m_loop->
m_task_set->add(kj::evalLater(kj::mv(f))); }));
470 std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>>
m_rpc_system;
503 ::capnp::rpc::twoparty::VatId::Builder
vat_id{
message.getRoot<::capnp::rpc::twoparty::VatId>()};
507 template <
typename Interface,
typename Impl>
510 bool destroy_connection)
515 auto disconnect_cb =
m_context.connection->addSyncCleanup([
this]() {
518 typename Interface::Client(std::move(m_client));
535 m_context.cleanup_fns.emplace_front([
this, destroy_connection, disconnect_cb]{
555 typename Interface::Client(std::move(m_client));
557 if (destroy_connection) {
564 Sub::construct(*
this);
567 template <
typename Interface,
typename Impl>
573 template <
typename Interface,
typename Impl>
592 template <
typename Interface,
typename Impl>
615 m_context.loop->addAsyncCleanup([impl=std::move(m_impl), fns=std::move(
m_context.cleanup_fns)]()
mutable {
640 template <
typename Interface,
typename Impl>
653 using ConnThreads = std::map<Connection*, std::optional<ProxyClient<Thread>>>;
694 std::unique_ptr<Waiter> waiter =
nullptr;
729 bool loop_thread =
false;
732 template<
typename T,
typename Fn>
735 auto ready = kj::newPromiseAndFulfiller<void>();
736 auto cancel_monitor_ptr = kj::heap<CancelMonitor>();
743 auto self = thisCap();
744 auto ret = m_thread_ready.then([
this,
self = std::move(
self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]()
mutable {
745 auto result = kj::newPromiseAndFulfiller<T>();
746 bool posted = m_thread_context.waiter->post([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready_fulfiller), result_fulfiller = kj::mv(result.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
755 m_loop->sync([ready_fulfiller = kj::mv(ready_fulfiller)]() mutable {
756 ready_fulfiller->fulfill();
757 ready_fulfiller = nullptr;
759 std::optional<T> result_value;
760 kj::Maybe<kj::Exception> exception{kj::runCatchingExceptions([&]{ result_value.emplace(fn(*cancel_monitor_ptr)); })};
761 m_loop->sync([this, &result_value, &exception, self = kj::mv(self), result_fulfiller = kj::mv(result_fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
765 cancel_monitor_ptr = nullptr;
769 KJ_IF_MAYBE(e, exception) {
770 assert(!result_value);
771 result_fulfiller->reject(kj::mv(*e));
773 assert(result_value);
774 result_fulfiller->fulfill(kj::mv(*result_value));
775 result_value.reset();
777 result_fulfiller = nullptr;
782 m_loop->m_task_set->add(kj::evalLater([self = kj::mv(self)] {}));
790 return kj::mv(
result.promise);
791 }).attach(kj::heap<CancelProbe>(cancel_monitor));
792 m_thread_ready = kj::mv(ready.promise);
799 template <
typename InitInterface>
802 typename InitInterface::Client init_client(
nullptr);
803 std::unique_ptr<Connection> connection;
806 loop.
m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
807 connection = std::make_unique<Connection>(loop, kj::mv(stream));
808 init_client = connection->m_rpc_system->bootstrap(
ServerVatId().vat_id).castAs<InitInterface>();
809 Connection* connection_ptr = connection.get();
812 delete connection_ptr;
815 return std::make_unique<ProxyClient<InitInterface>>(
816 kj::mv(init_client), connection.release(),
true);
823 template <
typename InitInterface,
typename InitImpl>
830 return kj::heap<ProxyServer<InitInterface>>(std::shared_ptr<InitImpl>(&
init, [](
InitImpl*){}), connection);
833 MP_LOG(loop, Log::Info) <<
"IPC server: socket connected.";
834 it->onDisconnect([&loop, it] {
835 MP_LOG(loop, Log::Info) <<
"IPC server: socket disconnected.";
843 template <
typename InitInterface,
typename InitImpl>
846 auto* ptr = listener.get();
848 [&loop, &
init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream)
mutable {
849 _Serve<InitInterface>(loop, kj::mv(stream),
init);
850 _Listen<InitInterface>(loop, kj::mv(listener),
init);
856 template <
typename InitInterface,
typename InitImpl>
859 _Serve<InitInterface>(
860 loop, loop.
m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
init);
865 template <
typename InitInterface,
typename InitImpl>
869 _Listen<InitInterface>(loop,
870 loop.
m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
882 #endif // MP_PROXY_IO_H std::thread::id m_thread_id
ID of the event loop thread.
::capnp::TwoPartyVatNetwork m_network
std::thread m_async_thread
Handle of an async worker thread.
void ServeStream(EventLoop &loop, int fd, InitImpl &init)
Given stream file descriptor and an init object, handle requests on the stream by calling methods on ...
::capnp::MallocMessageBuilder message
Single element task queue used to handle recursive capnp calls.
Mutex m_mutex
Mutex and condition variable used to post tasks to event loop and async thread.
std::unique_ptr< kj::TaskSet > m_task_set
Capnp list of pending promises.
int m_post_fd
Pipe write handle used to wake up the event loop thread.
Handler for kj::TaskSet failed task events.
The thread_local ThreadContext g_thread_context struct provides information about individual threads ...
const LogOptions & m_options
void onDisconnect(F &&f)
Add disconnect handler.
bool request_canceled
For IPC methods that execute asynchronously, not on the event-loop thread, this is set to true if the...
Base class for generated ProxyClient classes that implement a C++ interface and forward calls to a ca...
std::function< void()> testing_hook_async_request_done
Hook called on the worker thread just before returning results.
std::function< void(LogMessage)> LogFn
std::string LongThreadName(const char *exe_name)
EventLoop(const char *exe_name, std::function< void(bool, std::string)> old_callback, void *context=nullptr)
Backwards-compatible constructor for previous (deprecated) logging callback signature.
Object holding network & rpc state associated with either an incoming server connection, or an outgoing client connection.
ClientInvokeContext(Connection &conn, ThreadContext &thread_context)
friend Logger & operator<<(Logger &&logger, T &&value)
ConnThreads::iterator ConnThread
friend Logger & operator<<(Logger &logger, T &&value)
std::condition_variable m_cv
std::ostringstream m_buffer
LoggingErrorHandler m_error_handler
std::function< void()> testing_hook_makethread
Hook called when ProxyServer<ThreadMap>::makeThread() is called.
kj::TaskSet m_on_disconnect
TaskSet used to cancel the m_network.onDisconnect() handler for remote disconnections, if the connection is closed locally first by deleting this Connection object.
Logger & operator=(Logger &&)=delete
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId > > m_rpc_system
~Connection()
Run cleanup functions.
Log
Log flags. Update stringify function if changed!
Functions to serialize / deserialize common bitcoin types.
std::condition_variable m_cv
void taskFailed(kj::Exception &&exception) override
std::list< Connection > m_incoming_connections
List of connections.
Event loop implementation.
int m_wait_fd
Pipe read handle used to wake up the event loop thread.
void ListenConnections(EventLoop &loop, int fd, InitImpl &init)
Given listening socket file descriptor and an init object, handle incoming connections and requests b...
std::function< void()> testing_hook_makethread_created
Hook called on the worker thread inside makeThread(), after the thread context is set up and thread_c...
Mapping from capnp interface type to proxy server implementation (specializations are generated by pr...
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_, const std::function<::capnp::Capability::Client(Connection &)> &make_client)
Lock * cancel_lock
For IPC methods that execute asynchronously, not on the event-loop thread: lock preventing the event-...
LoggingErrorHandler m_error_handler
Capnp error handler. Needs to outlive m_task_set.
std::string message
Message to be logged.
void * m_context
External context pointer.
CleanupList m_sync_cleanup_fns
Cleanup functions to run if connection is broken unexpectedly.
std::function< void()> testing_hook_async_request_start
Hook called on the worker thread when it starts to execute an async request.
kj::StringPtr KJ_STRINGIFY(Log flags)
thread_local ThreadContext g_thread_context
void Unlock(Lock &lock, Callback &&callback)
std::unique_ptr< interfaces::Init > init
std::optional< CleanupIt > m_disconnect_cb
Reference to callback function that is run if there is a sudden disconnect and the Connection object ...
Log level
The severity level of this message.
LogOptions m_log_opts
Logging options.
void _Serve(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream, InitImpl &init)
Given stream and init objects, construct a new ProxyServer object that handles requests from the stre...
void startAsyncThread() MP_REQUIRES(m_mutex)
Start asynchronous worker thread if necessary.
ProxyClientBase(typename Interface::Client client, Connection *connection, bool destroy_connection)
Construct libmultiprocess client object wrapping Cap'n Proto client object with a reference to the as...
size_t max_chars
Maximum number of characters to use when representing request and response structs as strings...
void addAsyncCleanup(std::function< void()> fn)
Register cleanup function to run on asynchronous worker thread without blocking the event loop thread...
ProxyServer & proxy_server
LoggingErrorHandler(EventLoop &loop)
std::list< std::function< void()> > CleanupList
EventLoop(const char *exe_name, LogFn log_fn, void *context=nullptr)
Construct event loop object with default logging options.
::capnp::CapabilityServerSet< Thread > m_threads
Collection of server-side IPC worker threads (ProxyServer<Thread> objects previously returned by Thre...
kj::Function< void()> *m_post_fn MP_GUARDED_BY(m_mutex)
Callback function to run on event loop thread during post() or sync() call.
void sync(Callable &&callable)
Wrapper around EventLoop::post that takes advantage of the fact that callable will not go out of scop...
kj::Own< kj::AsyncIoStream > m_stream
std::string thread_name
Identifying string for debug.
bool done() const MP_REQUIRES(m_mutex)
Check if loop should exit.
kj::Canceler m_canceler
Canceler for canceling promises that we want to discard when the connection is destroyed.
std::unique_lock< std::mutex > m_lock
Mutex m_mutex
Mutex mainly used internally by waiter class, but also used externally to guard access to related sta...
Logger(const LogOptions &options, Log log_level)
ThreadContext & thread_context
const char * m_exe_name
Process name included in thread names so combined debug output from multiple processes is easier to u...
std::tuple< ConnThread, bool > SetThread(GuardedRef< ConnThreads > threads, Connection *connection, const std::function< Thread::Client()> &make_thread)
ServerInvokeContext(ProxyServer &proxy_server, CallContext &call_context, int req)
std::map< Connection *, std::optional< ProxyClient< Thread > >> ConnThreads
Map from Connection to local or remote thread handle which will be used over that connection...
Event loop smart pointer automatically managing m_num_clients.
::capnp::rpc::twoparty::VatId::Builder vat_id
void wait(Lock &lock, Predicate pred)
std::optional< kj::Function< void()> > m_fn MP_GUARDED_BY(m_mutex)
std::unique_ptr< ProxyClient< InitInterface > > ConnectStream(EventLoop &loop, int fd)
Given stream file descriptor, make a new ProxyClient object to send requests over the stream...
CallContext & call_context
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_)
ThreadContext & m_thread_context
Vat id for server side of connection.
void removeSyncCleanup(CleanupIt it)
Base class for generated ProxyServer classes that implement capnp server methods and forward calls to...
typename CleanupList::iterator CleanupIt
ThreadMap::Client m_thread_map
LogFn log_fn
External logging callback.
void CleanupRun(CleanupList &fns)
Log log_level
Messages with a severity level less than log_level will not be reported.
void _Listen(EventLoop &loop, kj::Own< kj::ConnectionReceiver > &&listener, InitImpl &init)
Given connection receiver and an init object, handle incoming connections by calling _Serve...
void loop()
Run event loop.
~Logger() noexcept(false)
Mapping from capnp interface type to proxy client implementation (specializations are generated by pr...
CleanupIt addSyncCleanup(std::function< void()> fn)
Register synchronous cleanup function to run on event loop thread (with access to capnp thread local ...
Helper class that detects when a promise is canceled.
kj::AsyncIoContext m_io_context
Capnp IO context.
void post(kj::Function< void()> fn)
Run function on event loop thread.