11#include <mp/proxy.capnp.h>
13#include <capnp/rpc-twoparty.h>
16#include <condition_variable>
18#include <kj/function.h>
43template <
typename ProxyServer,
typename CallContext_>
71template <
typename Interface,
typename Params,
typename Results>
100 kj::Promise<void> getName(GetNameContext context)
override;
106 template<
typename T,
typename Fn>
107 kj::Promise<T> post(Fn&& fn);
122 void taskFailed(kj::Exception&& exception)
override;
178 template <
typename T>
185 template <
typename T>
188 return logger << std::forward<T>(value);
191 explicit operator bool()
const
207#define MP_LOGPLAIN(loop, ...) if (mp::Logger logger{(loop).m_log_opts, __VA_ARGS__}; logger) logger
209#define MP_LOG(loop, ...) MP_LOGPLAIN(loop, __VA_ARGS__) << "{" << LongThreadName((loop).m_exe_name) << "} "
249 EventLoop(
const char* exe_name, std::function<
void(
bool, std::string)> old_callback,
void* context =
nullptr)
251 LogFn{[old_callback =
std::move(old_callback)](
LogMessage log_data) {old_callback(log_data.level ==
Log::Raise, std::move(log_data.message));}},
263 void post(kj::Function<
void()> fn);
268 template <
typename Callable>
271 post(std::forward<Callable>(callable));
276 void addAsyncCleanup(std::function<
void()> fn);
376 template <
typename Fn>
380 if (m_fn)
return false;
381 m_fn = std::forward<Fn>(fn);
386 template <
class Predicate>
396 auto fn = std::move(*m_fn);
400 const bool done = pred();
427 m_network(*
m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
430 kj::Own<kj::AsyncIoStream>&& stream_,
431 const std::function<::capnp::Capability::Client(
Connection&)>& make_client)
433 m_network(*
m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
446 CleanupIt addSyncCleanup(std::function<
void()> fn);
450 template <
typename F>
459 [f = std::forward<F>(f),
this]()
mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
470 std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>>
m_rpc_system;
503 ::capnp::rpc::twoparty::VatId::Builder
vat_id{
message.getRoot<::capnp::rpc::twoparty::VatId>()};
507template <
typename Interface,
typename Impl>
510 bool destroy_connection)
515 auto disconnect_cb =
m_context.connection->addSyncCleanup([
this]() {
518 typename Interface::Client(std::move(
m_client));
535 m_context.cleanup_fns.emplace_front([
this, destroy_connection, disconnect_cb]{
555 typename Interface::Client(std::move(
m_client));
557 if (destroy_connection) {
564 Sub::construct(*
this);
567template <
typename Interface,
typename Impl>
573template <
typename Interface,
typename Impl>
592template <
typename Interface,
typename Impl>
640template <
typename Interface,
typename Impl>
653using ConnThreads = std::map<Connection*, std::optional<ProxyClient<Thread>>>;
694 std::unique_ptr<Waiter>
waiter =
nullptr;
732template<
typename T,
typename Fn>
735 auto ready = kj::newPromiseAndFulfiller<void>();
736 auto cancel_monitor_ptr = kj::heap<CancelMonitor>();
743 auto self = thisCap();
744 auto ret =
m_thread_ready.then([
this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]()
mutable {
745 auto result = kj::newPromiseAndFulfiller<T>();
746 bool posted = m_thread_context.waiter->post([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready_fulfiller), result_fulfiller = kj::mv(result.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
755 m_loop->sync([ready_fulfiller = kj::mv(ready_fulfiller)]() mutable {
756 ready_fulfiller->fulfill();
757 ready_fulfiller = nullptr;
759 std::optional<T> result_value;
760 kj::Maybe<kj::Exception> exception{kj::runCatchingExceptions([&]{ result_value.emplace(fn(*cancel_monitor_ptr)); })};
761 m_loop->sync([this, &result_value, &exception, self = kj::mv(self), result_fulfiller = kj::mv(result_fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
765 cancel_monitor_ptr = nullptr;
769 KJ_IF_MAYBE(e, exception) {
770 assert(!result_value);
771 result_fulfiller->reject(kj::mv(*e));
773 assert(result_value);
774 result_fulfiller->fulfill(kj::mv(*result_value));
775 result_value.reset();
777 result_fulfiller = nullptr;
782 m_loop->m_task_set->add(kj::evalLater([self = kj::mv(self)] {}));
790 return kj::mv(result.promise);
791 }).attach(kj::heap<CancelProbe>(cancel_monitor));
792 m_thread_ready = kj::mv(ready.promise);
799template <
typename InitInterface>
802 typename InitInterface::Client init_client(
nullptr);
803 std::unique_ptr<Connection> connection;
806 loop.
m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
807 connection = std::make_unique<Connection>(loop, kj::mv(stream));
808 init_client = connection->m_rpc_system->bootstrap(
ServerVatId().vat_id).castAs<InitInterface>();
809 Connection* connection_ptr = connection.get();
812 delete connection_ptr;
815 return std::make_unique<ProxyClient<InitInterface>>(
816 kj::mv(init_client), connection.release(),
true);
823template <
typename InitInterface,
typename InitImpl>
830 return kj::heap<ProxyServer<InitInterface>>(std::shared_ptr<InitImpl>(&
init, [](
InitImpl*){}), connection);
834 it->onDisconnect([&loop, it] {
843template <
typename InitInterface,
typename InitImpl>
846 auto* ptr = listener.get();
848 [&loop, &
init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream)
mutable {
849 _Serve<InitInterface>(loop, kj::mv(stream), init);
850 _Listen<InitInterface>(loop, kj::mv(listener), init);
856template <
typename InitInterface,
typename InitImpl>
860 loop, loop.
m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
init);
865template <
typename InitInterface,
typename InitImpl>
870 loop.
m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
kj::TaskSet m_on_disconnect
LoggingErrorHandler m_error_handler
::capnp::TwoPartyVatNetwork m_network
kj::Own< kj::AsyncIoStream > m_stream
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_, const std::function<::capnp::Capability::Client(Connection &)> &make_client)
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_)
void onDisconnect(F &&f)
Add disconnect handler.
::capnp::CapabilityServerSet< Thread > m_threads
CleanupList m_sync_cleanup_fns
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId > > m_rpc_system
ThreadMap::Client m_thread_map
kj::AsyncIoContext m_io_context
Capnp IO context.
void sync(Callable &&callable)
std::condition_variable m_cv
EventLoop(const char *exe_name, LogFn log_fn, void *context=nullptr)
Construct event loop object with default logging options.
std::list< Connection > m_incoming_connections
List of connections.
int m_post_fd
Pipe write handle used to wake up the event loop thread.
LogOptions m_log_opts
Logging options.
std::unique_ptr< kj::TaskSet > m_task_set
Capnp list of pending promises.
void * m_context
External context pointer.
int m_wait_fd
Pipe read handle used to wake up the event loop thread.
std::function< void()> testing_hook_makethread
Hook called when ProxyServer<ThreadMap>::makeThread() is called.
LoggingErrorHandler m_error_handler
Capnp error handler. Needs to outlive m_task_set.
std::thread m_async_thread
std::function< void()> testing_hook_makethread_created
kj::Function< void()> *m_post_fn MP_GUARDED_BY(m_mutex)
Callback function to run on event loop thread during post() or sync() call.
EventLoop(const char *exe_name, std::function< void(bool, std::string)> old_callback, void *context=nullptr)
Backwards-compatible constructor for previous (deprecated) logging callback signature.
std::thread::id m_thread_id
ID of the event loop thread.
std::function< void()> testing_hook_async_request_done
Hook called on the worker thread just before returning results.
void post(kj::Function< void()> fn)
std::function< void()> testing_hook_async_request_start
std::unique_lock< std::mutex > m_lock
friend Logger & operator<<(Logger &logger, T &&value)
Logger(const Logger &)=delete
~Logger() noexcept(false)
Logger & operator=(Logger &&)=delete
Logger(const LogOptions &options, Log log_level)
const LogOptions & m_options
friend Logger & operator<<(Logger &&logger, T &&value)
std::ostringstream m_buffer
Logger & operator=(const Logger &)=delete
Handler for kj::TaskSet failed task events.
LoggingErrorHandler(EventLoop &loop)
void taskFailed(kj::Exception &&exception) override
Interface::Client m_client
~ProxyClientBase() noexcept
ProxyClientBase(typename Interface::Client client, Connection *connection, bool destroy_connection)
#define T(expected, seed, data)
Functions to serialize / deserialize common bitcoin types.
void Unlock(Lock &lock, Callback &&callback)
kj::StringPtr KJ_STRINGIFY(Log flags)
std::list< std::function< void()> > CleanupList
ConnThreads::iterator ConnThread
void ServeStream(EventLoop &loop, int fd, InitImpl &init)
typename CleanupList::iterator CleanupIt
std::tuple< ConnThread, bool > SetThread(GuardedRef< ConnThreads > threads, Connection *connection, const std::function< Thread::Client()> &make_thread)
void _Listen(EventLoop &loop, kj::Own< kj::ConnectionReceiver > &&listener, InitImpl &init)
std::map< Connection *, std::optional< ProxyClient< Thread > > > ConnThreads
ThreadContext g_thread_context
std::unique_ptr< ProxyClient< InitInterface > > ConnectStream(EventLoop &loop, int fd)
Log
Log flags. Update stringify function if changed!
void ListenConnections(EventLoop &loop, int fd, InitImpl &init)
std::string LongThreadName(const char *exe_name)
ServerInvokeContext< ProxyServer< Interface >, ::capnp::CallContext< Params, Results > > ServerContext
void _Serve(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream, InitImpl &init)
std::function< void(LogMessage)> LogFn
void CleanupRun(CleanupList &fns)
ThreadContext & thread_context
ClientInvokeContext(Connection &conn, ThreadContext &thread_context)
Log level
The severity level of this message.
std::string message
Message to be logged.
LogFn log_fn
External logging callback.
ProxyClient(const ProxyClient &)=delete
std::optional< CleanupIt > m_disconnect_cb
ProxyClientBase(typename Interface::Client client, Connection *connection, bool destroy_connection)
kj::Promise< void > m_thread_ready
ProxyServer(Connection &connection, ThreadContext &thread_context, std::thread &&thread)
ThreadContext & m_thread_context
ProxyServerBase(std::shared_ptr< Impl > impl, Connection &connection)
virtual ~ProxyServerBase()
std::shared_ptr< Impl > m_impl
Implementation pointer that may or may not be owned and deleted when this capnp server goes out of sc...
ServerInvokeContext(ProxyServer &proxy_server, CallContext &call_context, int req)
CallContext & call_context
ProxyServer & proxy_server
::capnp::MallocMessageBuilder message
::capnp::rpc::twoparty::VatId::Builder vat_id
ConnThreads callback_threads MP_GUARDED_BY(waiter->m_mutex)
std::unique_ptr< Waiter > waiter
ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex)
std::string thread_name
Identifying string for debug.
void wait(Lock &lock, Predicate pred)
std::condition_variable m_cv
std::optional< kj::Function< void()> > m_fn MP_GUARDED_BY(m_mutex)
AnnotatedMixin< std::mutex > Mutex
Wrapped mutex: supports waiting but not recursive locking.