9#include <mp/proxy.capnp.h>
14#include <capnp/capability.h>
15#include <capnp/common.h>
17#include <condition_variable>
21#include <kj/async-io.h>
22#include <kj/async-prelude.h>
25#include <kj/function.h>
33#include <sys/socket.h>
45 KJ_LOG(ERROR,
"Uncaught exception in daemonized task.", exception);
52 loop_lock->assert_locked(
m_loop->m_mutex);
53 m_loop->m_num_clients += 1;
64 loop_lock->assert_locked(loop->m_mutex);
65 assert(loop->m_num_clients > 0);
66 loop->m_num_clients -= 1;
68 loop->m_cv.notify_all();
69 int post_fd{loop->m_post_fd};
72 KJ_SYSCALL(write(post_fd, &buffer, 1));
76 if (relock) loop_lock->lock();
89 assert(std::this_thread::get_id() ==
m_loop->m_thread_id);
92 m_canceler.cancel(
"Interrupted by disconnect");
171 assert(std::this_thread::get_id() ==
m_loop->m_thread_id);
195 m_async_fns->emplace_back(std::move(fn));
207 KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
216 KJ_ASSERT(m_post_fn ==
nullptr);
217 KJ_ASSERT(!m_async_fns);
220 KJ_ASSERT(m_num_clients == 0);
236 m_async_fns.emplace();
239 kj::Own<kj::AsyncIoStream> wait_stream{
240 m_io_context.lowLevelProvider->wrapSocketFd(
m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)};
244 const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(
m_io_context.waitScope);
245 if (read_bytes != 1)
throw std::logic_error(
"EventLoop wait_stream closed unexpectedly");
259 MP_LOG(*
this,
Log::Info) <<
"EventLoop::loop done, cancelling event listeners.";
262 wait_stream =
nullptr;
263 KJ_SYSCALL(::close(post_fd));
284 KJ_SYSCALL(write(post_fd, &buffer, 1));
295 }
else if (!m_async_fns->empty()) {
298 while (m_async_fns) {
299 if (!m_async_fns->empty()) {
301 const std::function<void()> fn = std::move(m_async_fns->front());
302 m_async_fns->pop_front();
317 assert(m_num_clients >= 0);
318 return m_num_clients == 0 && m_async_fns->empty();
328 std::tie(thread, inserted) = threads.ref.try_emplace(connection);
331 thread->second.emplace(make_thread(), connection,
false);
332 thread->second->m_disconnect_cb = connection->
addSyncCleanup([threads, thread] {
341 thread->second->m_disconnect_cb.reset();
345 threads.ref.erase(thread);
348 return {thread, inserted};
384 std::unique_ptr<Waiter> waiter;
399 waiter->m_cv.notify_all();
407 return kj::READY_NOW;
416 const std::string from = context.getParams().getName();
417 std::promise<ThreadContext*> thread_context;
418 std::thread thread([&loop, &thread_context, from]() {
428 auto thread_server = kj::heap<ProxyServer<Thread>>(
m_connection, *thread_context.get_future().get(), std::move(thread));
429 auto thread_client =
m_connection.m_threads.add(kj::mv(thread_server));
430 context.getResults().setResult(kj::mv(thread_client));
431 return kj::READY_NOW;
CleanupIt addSyncCleanup(std::function< void()> fn)
CleanupList m_sync_cleanup_fns
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId > > m_rpc_system
void removeSyncCleanup(CleanupIt it)
kj::AsyncIoContext m_io_context
Capnp IO context.
void startAsyncThread() MP_REQUIRES(m_mutex)
std::condition_variable m_cv
EventLoop(const char *exe_name, LogFn log_fn, void *context=nullptr)
Construct event loop object with default logging options.
void addAsyncCleanup(std::function< void()> fn)
bool done() const MP_REQUIRES(m_mutex)
Check if loop should exit.
int m_post_fd
Pipe write handle used to wake up the event loop thread.
LogOptions m_log_opts
Logging options.
std::unique_ptr< kj::TaskSet > m_task_set
Capnp list of pending promises.
void * m_context
External context pointer.
int m_wait_fd
Pipe read handle used to wake up the event loop thread.
std::function< void()> testing_hook_makethread
Hook called when ProxyServer<ThreadMap>::makeThread() is called.
LoggingErrorHandler m_error_handler
Capnp error handler. Needs to outlive m_task_set.
std::thread m_async_thread
std::function< void()> testing_hook_makethread_created
std::thread::id m_thread_id
ID of the event loop thread.
void post(kj::Function< void()> fn)
EventLoopRef(EventLoop &loop, Lock *lock=nullptr)
void reset(bool relock=false)
std::unique_lock< std::mutex > m_lock
void taskFailed(kj::Exception &&exception) override
Functions to serialize / deserialize common bitcoin types.
void Unlock(Lock &lock, Callback &&callback)
kj::StringPtr KJ_STRINGIFY(Log flags)
std::list< std::function< void()> > CleanupList
ConnThreads::iterator ConnThread
std::string ThreadName(const char *exe_name)
Format current thread name as "{exe_name}-{$pid}/{thread_name}-{$tid}".
std::atomic< int > server_reqs
typename CleanupList::iterator CleanupIt
std::tuple< ConnThread, bool > SetThread(GuardedRef< ConnThreads > threads, Connection *connection, const std::function< Thread::Client()> &make_thread)
ThreadContext g_thread_context
Log
Log flags. Update stringify function if changed!
std::string LongThreadName(const char *exe_name)
ProxyClient(const ProxyClient &)=delete
std::optional< CleanupIt > m_disconnect_cb
ProxyContext(Connection *connection)
ProxyServer(Connection &connection, ThreadContext &thread_context, std::thread &&thread)
ThreadContext & m_thread_context
ProxyServer(Connection &connection)
Connection & m_connection
Convenient wrapper around std::variant<T*, T>.