Bitcoin Core  31.0.0
P2P Digital Currency
proxy.cpp
Go to the documentation of this file.
1 // Copyright (c) The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include <mp/proxy.h>
6 
7 #include <mp/proxy-io.h>
8 #include <mp/proxy-types.h>
9 #include <mp/proxy.capnp.h>
10 #include <mp/type-threadmap.h>
11 #include <mp/util.h>
12 
13 #include <atomic>
14 #include <capnp/capability.h>
15 #include <capnp/common.h> // IWYU pragma: keep
16 #include <capnp/rpc.h>
17 #include <condition_variable>
18 #include <functional>
19 #include <future>
20 #include <kj/async.h>
21 #include <kj/async-io.h>
22 #include <kj/async-prelude.h>
23 #include <kj/common.h>
24 #include <kj/debug.h>
25 #include <kj/function.h>
26 #include <kj/memory.h>
27 #include <kj/string.h>
28 #include <map>
29 #include <memory>
30 #include <optional>
31 #include <stdexcept>
32 #include <string>
33 #include <sys/socket.h>
34 #include <thread>
35 #include <tuple>
36 #include <unistd.h>
37 #include <utility>
38 
39 namespace mp {
40 
41 thread_local ThreadContext g_thread_context; // NOLINT(bitcoin-nontrivial-threadlocal)
42 
43 void LoggingErrorHandler::taskFailed(kj::Exception&& exception)
44 {
45  KJ_LOG(ERROR, "Uncaught exception in daemonized task.", exception);
46  MP_LOG(m_loop, Log::Error) << "Uncaught exception in daemonized task.";
47 }
48 
49 EventLoopRef::EventLoopRef(EventLoop& loop, Lock* lock) : m_loop(&loop), m_lock(lock)
50 {
51  auto loop_lock{PtrOrValue{m_lock, m_loop->m_mutex}};
52  loop_lock->assert_locked(m_loop->m_mutex);
53  m_loop->m_num_clients += 1;
54 }
55 
56 // Due to the conditionals in this function, MP_NO_TSA is required to avoid
57 // error "error: mutex 'loop_lock' is not held on every path through here
58 // [-Wthread-safety-analysis]"
59 void EventLoopRef::reset(bool relock) MP_NO_TSA
60 {
61  if (auto* loop{m_loop}) {
62  m_loop = nullptr;
63  auto loop_lock{PtrOrValue{m_lock, loop->m_mutex}};
64  loop_lock->assert_locked(loop->m_mutex);
65  assert(loop->m_num_clients > 0);
66  loop->m_num_clients -= 1;
67  if (loop->done()) {
68  loop->m_cv.notify_all();
69  int post_fd{loop->m_post_fd};
70  loop_lock->unlock();
71  char buffer = 0;
72  KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon)
73  // By default, do not try to relock `loop_lock` after writing,
74  // because the event loop could wake up and destroy itself and the
75  // mutex might no longer exist.
76  if (relock) loop_lock->lock();
77  }
78  }
79 }
80 
81 ProxyContext::ProxyContext(Connection* connection) : connection(connection), loop{*connection->m_loop} {}
82 
84 {
85  // Connection destructor is always called on the event loop thread. If this
86  // is a local disconnect, it will trigger I/O, so this needs to run on the
87  // event loop thread, and if there was a remote disconnect, this is called
88  // by an onDisconnect callback directly from the event loop thread.
89  assert(std::this_thread::get_id() == m_loop->m_thread_id);
90 
91  // Try to cancel any calls that may be executing.
92  m_canceler.cancel("Interrupted by disconnect");
93 
94  // Shut down RPC system first, since this will garbage collect any
95  // ProxyServer objects that were not freed before the connection was closed.
96  // Typically all ProxyServer objects associated with this connection will be
97  // freed before this call returns. However that will not be the case if
98  // there are asynchronous IPC calls over this connection still currently
99  // executing. In that case, Cap'n Proto will destroy the ProxyServer objects
100  // after the calls finish.
101  m_rpc_system.reset();
102 
103  // ProxyClient cleanup handlers are in sync list, and ProxyServer cleanup
104  // handlers are in the async list.
105  //
106  // The ProxyClient cleanup handlers are synchronous because they are fast
107  // and don't do anything besides release capnp resources and reset state so
108  // future calls to client methods immediately throw exceptions instead of
109  // trying to communicate across the socket. The synchronous callbacks set
110  // ProxyClient capability pointers to null, so new method calls on client
111  // objects fail without triggering i/o or relying on event loop which may go
112  // out of scope or trigger obscure capnp i/o errors.
113  //
114  // The ProxyServer cleanup handlers call user defined destructors on the server
115  // object, which can run arbitrary blocking bitcoin code so they have to run
116  // asynchronously in a different thread. The asynchronous cleanup functions
117  // intentionally aren't started until after the synchronous cleanup
118  // functions run, so client objects are fully disconnected before bitcoin
119  // code in the destructors are run. This way if the bitcoin code tries to
120  // make client requests the requests will just fail immediately instead of
121  // sending i/o or accessing the event loop.
122  //
123  // The context where Connection objects are destroyed and this destructor is invoked
124  // is different depending on whether this is an outgoing connection being used
125  // to make an Init.makeX call() (e.g. Init.makeNode or Init.makeWalletClient) or an incoming
126  // connection implementing the Init interface and handling the Init.makeX() calls.
127  //
128  // Either way when a connection is closed, capnp behavior is to call all
129  // ProxyServer object destructors first, and then trigger an onDisconnect
130  // callback.
131  //
132  // On incoming side of the connection, the onDisconnect callback is written
133  // to delete the Connection object from the m_incoming_connections and call
134  // this destructor which calls Connection::disconnect.
135  //
136  // On the outgoing side, the Connection object is owned by top level client
137  // object client, which onDisconnect handler doesn't have ready access to,
138  // so onDisconnect handler just calls Connection::disconnect directly
139  // instead.
140  //
141  // Either way disconnect code runs in the event loop thread and called both
142  // on clean and unclean shutdowns. In unclean shutdown case when the
143  // connection is broken, sync and async cleanup lists will be filled with
144  // callbacks. In the clean shutdown case both lists will be empty.
145  Lock lock{m_loop->m_mutex};
146  while (!m_sync_cleanup_fns.empty()) {
147  CleanupList fn;
148  fn.splice(fn.begin(), m_sync_cleanup_fns, m_sync_cleanup_fns.begin());
149  Unlock(lock, fn.front());
150  }
151 }
152 
153 CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
154 {
155  const Lock lock(m_loop->m_mutex);
156  // Add cleanup callbacks to the front of list, so sync cleanup functions run
157  // in LIFO order. This is a good approach because sync cleanup functions are
158  // added as client objects are created, and it is natural to clean up
159  // objects in the reverse order they were created. In practice, however,
160  // order should not be significant because the cleanup callbacks run
161  // synchronously in a single batch when the connection is broken, and they
162  // only reset the connection pointers in the client objects without actually
163  // deleting the client objects.
164  return m_sync_cleanup_fns.emplace(m_sync_cleanup_fns.begin(), std::move(fn));
165 }
166 
168 {
169  // Require cleanup functions to be removed on the event loop thread to avoid
170  // needing to deal with them being removed in the middle of a disconnect.
171  assert(std::this_thread::get_id() == m_loop->m_thread_id);
172  const Lock lock(m_loop->m_mutex);
173  m_sync_cleanup_fns.erase(it);
174 }
175 
176 void EventLoop::addAsyncCleanup(std::function<void()> fn)
177 {
178  const Lock lock(m_mutex);
179  // Add async cleanup callbacks to the back of the list. Unlike the sync
180  // cleanup list, this list order is more significant because it determines
181  // the order server objects are destroyed when there is a sudden disconnect,
182  // and it is possible objects may need to be destroyed in a certain order.
183  // This function is called in ProxyServerBase destructors, and since capnp
184  // destroys ProxyServer objects in LIFO order, we should preserve this
185  // order, and add cleanup callbacks to the end of the list so they can be
186  // run starting from the beginning of the list.
187  //
188  // In bitcoin core, running these callbacks in the right order is
189  // particularly important for the wallet process, because it uses blocking
190  // shared_ptrs and requires Chain::Notification pointers owned by the node
191  // process to be destroyed before the WalletLoader objects owned by the node
192  // process, otherwise shared pointer counts of the CWallet objects (which
193  // inherit from Chain::Notification) will not be 1 when WalletLoader
194  // destructor runs and it will wait forever for them to be released.
195  m_async_fns->emplace_back(std::move(fn));
197 }
198 
199 EventLoop::EventLoop(const char* exe_name, LogOptions log_opts, void* context)
200  : m_exe_name(exe_name),
201  m_io_context(kj::setupAsyncIo()),
202  m_task_set(new kj::TaskSet(m_error_handler)),
203  m_log_opts(std::move(log_opts)),
204  m_context(context)
205 {
206  int fds[2];
207  KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
208  m_wait_fd = fds[0];
209  m_post_fd = fds[1];
210 }
211 
213 {
214  if (m_async_thread.joinable()) m_async_thread.join();
215  const Lock lock(m_mutex);
216  KJ_ASSERT(m_post_fn == nullptr);
217  KJ_ASSERT(!m_async_fns);
218  KJ_ASSERT(m_wait_fd == -1);
219  KJ_ASSERT(m_post_fd == -1);
220  KJ_ASSERT(m_num_clients == 0);
221 
222  // Spin event loop. wait for any promises triggered by RPC shutdown.
223  // auto cleanup = kj::evalLater([]{});
224  // cleanup.wait(m_io_context.waitScope);
225 }
226 
228 {
231  KJ_DEFER(g_thread_context.loop_thread = false);
232 
233  {
234  const Lock lock(m_mutex);
235  assert(!m_async_fns);
236  m_async_fns.emplace();
237  }
238 
239  kj::Own<kj::AsyncIoStream> wait_stream{
240  m_io_context.lowLevelProvider->wrapSocketFd(m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)};
241  int post_fd{m_post_fd};
242  char buffer = 0;
243  for (;;) {
244  const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(m_io_context.waitScope);
245  if (read_bytes != 1) throw std::logic_error("EventLoop wait_stream closed unexpectedly");
246  Lock lock(m_mutex);
247  if (m_post_fn) {
248  Unlock(lock, *m_post_fn);
249  m_post_fn = nullptr;
250  m_cv.notify_all();
251  } else if (done()) {
252  // Intentionally do not break if m_post_fn was set, even if done()
253  // would return true, to ensure that the EventLoopRef write(post_fd)
254  // call always succeeds and the loop does not exit between the time
255  // that the done condition is set and the write call is made.
256  break;
257  }
258  }
259  MP_LOG(*this, Log::Info) << "EventLoop::loop done, cancelling event listeners.";
260  m_task_set.reset();
261  MP_LOG(*this, Log::Info) << "EventLoop::loop bye.";
262  wait_stream = nullptr;
263  KJ_SYSCALL(::close(post_fd));
264  const Lock lock(m_mutex);
265  m_wait_fd = -1;
266  m_post_fd = -1;
267  m_async_fns.reset();
268  m_cv.notify_all();
269 }
270 
271 void EventLoop::post(kj::Function<void()> fn)
272 {
273  if (std::this_thread::get_id() == m_thread_id) {
274  fn();
275  return;
276  }
277  Lock lock(m_mutex);
278  EventLoopRef ref(*this, &lock);
279  m_cv.wait(lock.m_lock, [this]() MP_REQUIRES(m_mutex) { return m_post_fn == nullptr; });
280  m_post_fn = &fn;
281  int post_fd{m_post_fd};
282  Unlock(lock, [&] {
283  char buffer = 0;
284  KJ_SYSCALL(write(post_fd, &buffer, 1));
285  });
286  m_cv.wait(lock.m_lock, [this, &fn]() MP_REQUIRES(m_mutex) { return m_post_fn != &fn; });
287 }
288 
290 {
291  assert (std::this_thread::get_id() == m_thread_id);
292  if (m_async_thread.joinable()) {
293  // Notify to wake up the async thread if it is already running.
294  m_cv.notify_all();
295  } else if (!m_async_fns->empty()) {
296  m_async_thread = std::thread([this] {
297  Lock lock(m_mutex);
298  while (m_async_fns) {
299  if (!m_async_fns->empty()) {
300  EventLoopRef ref{*this, &lock};
301  const std::function<void()> fn = std::move(m_async_fns->front());
302  m_async_fns->pop_front();
303  Unlock(lock, fn);
304  // Important to relock because of the wait() call below.
305  ref.reset(/*relock=*/true);
306  // Continue without waiting in case there are more async_fns
307  continue;
308  }
309  m_cv.wait(lock.m_lock);
310  }
311  });
312  }
313 }
314 
315 bool EventLoop::done() const
316 {
317  assert(m_num_clients >= 0);
318  return m_num_clients == 0 && m_async_fns->empty();
319 }
320 
321 std::tuple<ConnThread, bool> SetThread(GuardedRef<ConnThreads> threads, Connection* connection, const std::function<Thread::Client()>& make_thread)
322 {
323  assert(std::this_thread::get_id() == connection->m_loop->m_thread_id);
324  ConnThread thread;
325  bool inserted;
326  {
327  const Lock lock(threads.mutex);
328  std::tie(thread, inserted) = threads.ref.try_emplace(connection);
329  }
330  if (inserted) {
331  thread->second.emplace(make_thread(), connection, /* destroy_connection= */ false);
332  thread->second->m_disconnect_cb = connection->addSyncCleanup([threads, thread] {
333  // Note: it is safe to use the `thread` iterator in this cleanup
334  // function, because the iterator would only be invalid if the map entry
335  // was removed, and if the map entry is removed the ProxyClient<Thread>
336  // destructor unregisters the cleanup.
337 
338  // Connection is being destroyed before thread client is, so reset
339  // thread client m_disconnect_cb member so thread client destructor does not
340  // try to unregister this callback after connection is destroyed.
341  thread->second->m_disconnect_cb.reset();
342 
343  // Remove connection pointer about to be destroyed from the map
344  const Lock lock(threads.mutex);
345  threads.ref.erase(thread);
346  });
347  }
348  return {thread, inserted};
349 }
350 
352 {
353  // If thread is being destroyed before connection is destroyed, remove the
354  // cleanup callback that was registered to handle the connection being
355  // destroyed before the thread being destroyed.
356  if (m_disconnect_cb) {
357  // Remove disconnect callback on the event loop thread with
358  // loop->sync(), so if the connection is broken there is not a race
359  // between this thread trying to remove the callback and the disconnect
360  // handler attempting to call it.
361  m_context.loop->sync([&]() {
362  if (m_disconnect_cb) {
363  m_context.connection->removeSyncCleanup(*m_disconnect_cb);
364  }
365  });
366  }
367 }
368 
369 ProxyServer<Thread>::ProxyServer(Connection& connection, ThreadContext& thread_context, std::thread&& thread)
370  : m_loop{*connection.m_loop}, m_thread_context(thread_context), m_thread(std::move(thread))
371 {
372  assert(m_thread_context.waiter.get() != nullptr);
373 }
374 
376 {
377  if (!m_thread.joinable()) return;
378  // Stop async thread and wait for it to exit. Need to wait because the
379  // m_thread handle needs to outlive the thread to avoid "terminate called
380  // without an active exception" error. An alternative to waiting would be
381  // detach the thread, but this would introduce nondeterminism which could
382  // make code harder to debug or extend.
383  assert(m_thread_context.waiter.get());
384  std::unique_ptr<Waiter> waiter;
385  {
386  const Lock lock(m_thread_context.waiter->m_mutex);
389  waiter = std::move(m_thread_context.waiter);
391  assert(!waiter->m_fn);
392  // Clear client maps now to avoid deadlock in m_thread.join() call
393  // below. The maps contain Thread::Client objects that need to be
394  // destroyed from the event loop thread (this thread), which can't
395  // happen if this thread is busy calling join.
396  m_thread_context.request_threads.clear();
397  m_thread_context.callback_threads.clear();
399  waiter->m_cv.notify_all();
400  }
401  m_thread.join();
402 }
403 
404 kj::Promise<void> ProxyServer<Thread>::getName(GetNameContext context)
405 {
406  context.getResults().setResult(m_thread_context.thread_name);
407  return kj::READY_NOW;
408 }
409 
411 
412 kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
413 {
414  EventLoop& loop{*m_connection.m_loop};
415  if (loop.testing_hook_makethread) loop.testing_hook_makethread();
416  const std::string from = context.getParams().getName();
417  std::promise<ThreadContext*> thread_context;
418  std::thread thread([&loop, &thread_context, from]() {
419  g_thread_context.thread_name = ThreadName(loop.m_exe_name) + " (from " + from + ")";
420  g_thread_context.waiter = std::make_unique<Waiter>();
421  Lock lock(g_thread_context.waiter->m_mutex);
422  thread_context.set_value(&g_thread_context);
423  if (loop.testing_hook_makethread_created) loop.testing_hook_makethread_created();
424  // Wait for shutdown signal from ProxyServer<Thread> destructor (signal
425  // is just waiter getting set to null.)
426  g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; });
427  });
428  auto thread_server = kj::heap<ProxyServer<Thread>>(m_connection, *thread_context.get_future().get(), std::move(thread));
429  auto thread_client = m_connection.m_threads.add(kj::mv(thread_server));
430  context.getResults().setResult(kj::mv(thread_client));
431  return kj::READY_NOW;
432 }
433 
434 std::atomic<int> server_reqs{0};
435 
436 std::string LongThreadName(const char* exe_name)
437 {
439 }
440 
441 kj::StringPtr KJ_STRINGIFY(Log v)
442 {
443  switch (v) {
444  case Log::Trace: return "Trace";
445  case Log::Debug: return "Debug";
446  case Log::Info: return "Info";
447  case Log::Warning: return "Warning";
448  case Log::Error: return "Error";
449  case Log::Raise: return "Raise";
450  }
451  return "<Log?>";
452 }
453 } // namespace mp
std::thread::id m_thread_id
ID of the event loop thread.
Definition: proxy-io.h:299
std::thread m_async_thread
Handle of an async worker thread.
Definition: proxy-io.h:303
Convenient wrapper around std::variant<T*, T>
Definition: util.h:135
assert(!tx.IsCoinBase())
Mutex m_mutex
Mutex and condition variable used to post tasks to event loop and async thread.
Definition: proxy-io.h:323
std::unique_ptr< kj::TaskSet > m_task_set
Capnp list of pending promises.
Definition: proxy-io.h:333
int m_post_fd
Pipe write handle used to wake up the event loop thread.
Definition: proxy-io.h:315
The thread_local ThreadContext g_thread_context struct provides information about individual threads ...
Definition: proxy-io.h:674
std::string ThreadName(const char *exe_name)
Format current thread name as "{exe_name}-{$pid}/{thread_name}-{$tid}".
Definition: util.cpp:64
std::string LongThreadName(const char *exe_name)
Definition: proxy.cpp:436
Object holding network & rpc state associated with either an incoming server connection, or an outgoing client connection.
Definition: proxy-io.h:422
std::optional< mp::EventLoop > m_loop
EventLoop object which manages I/O events for all connections.
Definition: protocol.cpp:144
std::atomic< int > server_reqs
Definition: proxy.cpp:434
ConnThreads::iterator ConnThread
Definition: proxy-io.h:654
Definition: common.h:29
std::condition_variable m_cv
Definition: proxy-io.h:324
Context m_context
Definition: protocol.cpp:141
#define MP_REQUIRES(x)
Definition: util.h:156
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId > > m_rpc_system
Definition: proxy-io.h:470
~Connection()
Run cleanup functions.
Definition: proxy.cpp:83
Lock * m_lock
Definition: proxy.h:64
Log
Log flags. Update stringify function if changed!
Definition: proxy-io.h:127
Functions to serialize / deserialize common bitcoin types.
Definition: common-types.h:57
void taskFailed(kj::Exception &&exception) override
Definition: proxy.cpp:43
Event loop implementation.
Definition: proxy-io.h:238
int m_wait_fd
Pipe read handle used to wake up the event loop thread.
Definition: proxy-io.h:312
EventLoopRef m_loop
Definition: proxy-io.h:462
Mapping from capnp interface type to proxy server implementation (specializations are generated by pr...
Definition: proxy.h:28
CleanupList m_sync_cleanup_fns
Cleanup functions to run if connection is broken unexpectedly.
Definition: proxy-io.h:488
kj::StringPtr KJ_STRINGIFY(Log flags)
Definition: proxy.cpp:441
ProxyContext(Connection *connection)
Definition: proxy.cpp:81
EventLoopRef(EventLoop &loop, Lock *lock=nullptr)
Definition: proxy.cpp:49
thread_local ThreadContext g_thread_context
Definition: proxy.cpp:41
boost::signals2::scoped_connection m_connection
Definition: interfaces.cpp:30
bool loop_thread
Whether this thread is a capnp event loop thread.
Definition: proxy-io.h:729
void Unlock(Lock &lock, Callback &&callback)
Definition: util.h:207
void startAsyncThread() MP_REQUIRES(m_mutex)
Start asynchronous worker thread if necessary.
Definition: proxy.cpp:289
void addAsyncCleanup(std::function< void()> fn)
Register cleanup function to run on asynchronous worker thread without blocking the event loop thread...
Definition: proxy.cpp:176
#define MP_NO_TSA
Definition: util.h:161
std::list< std::function< void()> > CleanupList
Definition: proxy.h:36
EventLoop(const char *exe_name, LogFn log_fn, void *context=nullptr)
Construct event loop object with default logging options.
Definition: proxy-io.h:242
std::string thread_name
Identifying string for debug.
Definition: proxy-io.h:677
bool done() const MP_REQUIRES(m_mutex)
Check if loop should exit.
Definition: proxy.cpp:315
kj::Canceler m_canceler
Canceler for canceling promises that we want to discard when the connection is destroyed.
Definition: proxy-io.h:483
std::unique_lock< std::mutex > m_lock
Definition: util.h:183
std::tuple< ConnThread, bool > SetThread(GuardedRef< ConnThreads > threads, Connection *connection, const std::function< Thread::Client()> &make_thread)
Definition: proxy.cpp:321
#define MP_LOG(loop,...)
Definition: proxy-io.h:209
EventLoop * m_loop
Definition: proxy.h:63
Event loop smart pointer automatically managing m_num_clients.
Definition: proxy.h:50
Definition: util.h:171
std::unique_ptr< Waiter > waiter
Waiter object used to allow remote clients to execute code on this thread.
Definition: proxy-io.h:694
Mutex & mutex
Definition: util.h:189
void removeSyncCleanup(CleanupIt it)
Definition: proxy.cpp:167
typename CleanupList::iterator CleanupIt
Definition: proxy.h:37
Connection * connection
Definition: proxy.h:70
void loop()
Run event loop.
Definition: proxy.cpp:227
Mapping from capnp interface type to proxy client implementation (specializations are generated by pr...
Definition: proxy.h:25
CleanupIt addSyncCleanup(std::function< void()> fn)
Register synchronous cleanup function to run on event loop thread (with access to capnp thread local ...
Definition: proxy.cpp:153
void reset(bool relock=false)
Definition: proxy.cpp:59
EventLoop & m_loop
Definition: proxy-io.h:123
kj::AsyncIoContext m_io_context
Capnp IO context.
Definition: proxy-io.h:327
void post(kj::Function< void()> fn)
Run function on event loop thread.
Definition: proxy.cpp:271