9#include <mp/proxy.capnp.h>
14#include <capnp/capability.h>
15#include <capnp/common.h>
17#include <condition_variable>
21#include <kj/async-io.h>
22#include <kj/async-prelude.h>
25#include <kj/function.h>
33#include <sys/socket.h>
45 KJ_LOG(
ERROR,
"Uncaught exception in daemonized task.", exception);
53 m_loop->m_num_clients += 1;
65 assert(loop->m_num_clients > 0);
66 loop->m_num_clients -= 1;
68 loop->m_cv.notify_all();
92 m_canceler.cancel(
"Interrupted by disconnect");
202 m_task_set(
new kj::TaskSet(m_error_handler)),
240 m_io_context.lowLevelProvider->wrapSocketFd(
m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)};
245 if (
read_bytes != 1)
throw std::logic_error(
"EventLoop wait_stream closed unexpectedly");
259 MP_LOG(*
this,
Log::Info) <<
"EventLoop::loop done, cancelling event listeners.";
331 thread->second.emplace(
make_thread(), connection,
false);
341 thread->second->m_disconnect_cb.reset();
356 if (m_disconnect_cb) {
362 if (m_disconnect_cb) {
363 m_context.connection->removeSyncCleanup(*m_disconnect_cb);
370 :
m_loop{*connection.
m_loop}, m_thread_context(thread_context), m_thread(
std::move(thread))
372 assert(m_thread_context.waiter.get() !=
nullptr);
377 if (!m_thread.joinable())
return;
383 assert(m_thread_context.waiter.get());
384 std::unique_ptr<Waiter> waiter;
386 const Lock lock(m_thread_context.waiter->m_mutex);
389 waiter = std::move(m_thread_context.waiter);
396 m_thread_context.request_threads.clear();
397 m_thread_context.callback_threads.clear();
399 waiter->m_cv.notify_all();
406 context.getResults().setResult(m_thread_context.thread_name);
407 return kj::READY_NOW;
416 const std::string from = context.getParams().getName();
417 std::promise<ThreadContext*> thread_context;
418 std::thread thread([&loop, &thread_context, from]() {
423 if (loop.testing_hook_makethread_created) loop.testing_hook_makethread_created();
431 return kj::READY_NOW;
Object holding network & rpc state associated with either an incoming server connection,...
CleanupIt addSyncCleanup(std::function< void()> fn)
Register synchronous cleanup function to run on event loop thread (with access to capnp thread local ...
~Connection()
Run cleanup functions.
kj::Canceler m_canceler
Canceler for canceling promises that we want to discard when the connection is destroyed.
CleanupList m_sync_cleanup_fns
Cleanup functions to run if connection is broken unexpectedly.
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId > > m_rpc_system
void removeSyncCleanup(CleanupIt it)
Event loop implementation.
kj::AsyncIoContext m_io_context
Capnp IO context.
void startAsyncThread() MP_REQUIRES(m_mutex)
Start asynchronous worker thread if necessary.
std::condition_variable m_cv
EventLoop(const char *exe_name, LogFn log_fn, void *context=nullptr)
Construct event loop object with default logging options.
void addAsyncCleanup(std::function< void()> fn)
Register cleanup function to run on asynchronous worker thread without blocking the event loop thread...
void loop()
Run event loop.
Mutex m_mutex
Mutex and condition variable used to post tasks to event loop and async thread.
bool done() const MP_REQUIRES(m_mutex)
Check if loop should exit.
int m_post_fd
Pipe write handle used to wake up the event loop thread.
std::unique_ptr< kj::TaskSet > m_task_set
Capnp list of pending promises.
int m_wait_fd
Pipe read handle used to wake up the event loop thread.
std::function< void()> testing_hook_makethread
Hook called when ProxyServer<ThreadMap>::makeThread() is called.
std::thread m_async_thread
Handle of an async worker thread.
std::thread::id m_thread_id
ID of the event loop thread.
void post(kj::Function< void()> fn)
Run function on event loop thread.
Event loop smart pointer automatically managing m_num_clients.
EventLoopRef(EventLoop &loop, Lock *lock=nullptr)
void reset(bool relock=false)
std::unique_lock< std::mutex > m_lock
void taskFailed(kj::Exception &&exception) override
boost::signals2::scoped_connection m_connection
std::optional< mp::EventLoop > m_loop
EventLoop object which manages I/O events for all connections.
Functions to serialize / deserialize common bitcoin types.
void Unlock(Lock &lock, Callback &&callback)
kj::StringPtr KJ_STRINGIFY(Log flags)
std::list< std::function< void()> > CleanupList
ConnThreads::iterator ConnThread
std::string ThreadName(const char *exe_name)
Format current thread name as "{exe_name}-{$pid}/{thread_name}-{$tid}".
std::atomic< int > server_reqs
typename CleanupList::iterator CleanupIt
std::tuple< ConnThread, bool > SetThread(GuardedRef< ConnThreads > threads, Connection *connection, const std::function< Thread::Client()> &make_thread)
thread_local ThreadContext g_thread_context
Log
Log flags. Update stringify function if changed!
std::string LongThreadName(const char *exe_name)
Mapping from capnp interface type to proxy client implementation (specializations are generated by pr...
ProxyContext(Connection *connection)
Mapping from capnp interface type to proxy server implementation (specializations are generated by pr...
Convenient wrapper around std::variant<T*, T>
The thread_local ThreadContext g_thread_context struct provides information about individual threads ...
std::unique_ptr< Waiter > waiter
Waiter object used to allow remote clients to execute code on this thread.
bool loop_thread
Whether this thread is a capnp event loop thread.
std::string thread_name
Identifying string for debug.
constexpr auto Ticks(Dur2 d)
Helper to count the seconds of a duration/time_point.