Bitcoin Core  31.0.0
P2P Digital Currency
type-context.h
Go to the documentation of this file.
1 // Copyright (c) The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #ifndef MP_PROXY_TYPE_CONTEXT_H
6 #define MP_PROXY_TYPE_CONTEXT_H
7 
8 #include <mp/proxy-io.h>
9 #include <mp/util.h>
10 
11 #include <kj/string.h>
12 
13 namespace mp {
14 template <typename Output>
17  ClientInvokeContext& invoke_context,
18  Output&& output,
19  typename std::enable_if<std::is_same<decltype(output.get()), Context::Builder>::value>::type* enable = nullptr)
20 {
21  auto& connection = invoke_context.connection;
22  auto& thread_context = invoke_context.thread_context;
23 
24  // Create local Thread::Server object corresponding to the current thread
25  // and pass a Thread::Client reference to it in the Context.callbackThread
26  // field so the function being called can make callbacks to this thread.
27  // Also store the Thread::Client reference in the callback_threads map so
28  // future calls over this connection can reuse it.
29  auto [callback_thread, _]{SetThread(
30  GuardedRef{thread_context.waiter->m_mutex, thread_context.callback_threads}, &connection,
31  [&] { return connection.m_threads.add(kj::heap<ProxyServer<Thread>>(connection, thread_context, std::thread{})); })};
32 
33  // Call remote ThreadMap.makeThread function so server will create a
34  // dedicated worker thread to run function calls from this thread. Store the
35  // Thread::Client reference it returns in the request_threads map.
36  auto make_request_thread{[&]{
37  // This code will only run if an IPC client call is being made for the
38  // first time on this thread. After the first call, subsequent calls
39  // will use the existing request thread. This code will also never run at
40  // all if the current thread is a request thread created for a different
41  // IPC client, because in that case PassField code (below) will have set
42  // request_thread to point to the calling thread.
43  auto request = connection.m_thread_map.makeThreadRequest();
44  request.setName(thread_context.thread_name);
45  return request.send().getResult(); // Nonblocking due to capnp request pipelining.
46  }};
47  auto [request_thread, _1]{SetThread(
48  GuardedRef{thread_context.waiter->m_mutex, thread_context.request_threads},
49  &connection, make_request_thread)};
50 
51  auto context = output.init();
52  context.setThread(request_thread->second->m_client);
53  context.setCallbackThread(callback_thread->second->m_client);
54 }
55 
58 template <typename Accessor, typename ServerContext, typename Fn, typename... Args>
59 auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& fn, Args&&... args) ->
60  typename std::enable_if<
61  std::is_same<decltype(Accessor::get(server_context.call_context.getParams())), Context::Reader>::value,
62  kj::Promise<typename ServerContext::CallContext>>::type
63 {
64  auto& server = server_context.proxy_server;
65  EventLoop& loop = *server.m_context.loop;
66  int req = server_context.req;
67  // Keep a reference to the ProxyServer instance by assigning it to the self
68  // variable. ProxyServer instances are reference-counted and if the client
69  // drops its reference and the IPC call is canceled, this variable keeps the
70  // instance alive until the method finishes executing. The self variable
71  // needs to be destroyed on the event loop thread so it is freed in a sync()
72  // call below.
73  auto self = server.thisCap();
74  auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, &loop, req, fn, args...](CancelMonitor& cancel_monitor) mutable {
75  MP_LOG(loop, Log::Debug) << "IPC server executing request #" << req;
76  if (loop.testing_hook_async_request_start) loop.testing_hook_async_request_start();
77  KJ_DEFER(if (loop.testing_hook_async_request_done) loop.testing_hook_async_request_done());
78  ServerContext server_context{server, call_context, req};
79  // Before invoking the function, store a reference to the
80  // callbackThread provided by the client in the
81  // thread_local.request_threads map. This way, if this
82  // server thread needs to execute any RPCs that call back to
83  // the client, they will happen on the same client thread
84  // that is waiting for this function, just like what would
85  // happen if this were a normal function call made on the
86  // local stack.
87  //
88  // If the request_threads map already has an entry for this
89  // connection, it will be left unchanged, and it indicates
90  // that the current thread is an RPC client thread which is
91  // in the middle of an RPC call, and the current RPC call is
92  // a nested call from the remote thread handling that RPC
93  // call. In this case, the callbackThread value should point
94  // to the same thread already in the map, so there is no
95  // need to update the map.
96  auto& thread_context = g_thread_context;
97  auto& request_threads = thread_context.request_threads;
98  ConnThread request_thread;
99  bool inserted{false};
100  Mutex cancel_mutex;
101  Lock cancel_lock{cancel_mutex};
102  server_context.cancel_lock = &cancel_lock;
103  loop.sync([&] {
104  // Detect request being canceled before it executes.
105  if (cancel_monitor.m_canceled) {
106  server_context.request_canceled = true;
107  return;
108  }
109  // Detect request being canceled while it executes.
110  assert(!cancel_monitor.m_on_cancel);
111  cancel_monitor.m_on_cancel = [&loop, &server_context, &cancel_mutex, req]() {
112  MP_LOG(loop, Log::Info) << "IPC server request #" << req << " canceled while executing.";
113  // Lock cancel_mutex here to block the event loop
114  // thread and prevent it from deleting the request's
115  // params and response structs while the execution
116  // thread is accessing them. Because this lock is
117  // released before the event loop thread does delete
118  // the structs, the mutex does not provide any
119  // protection from the event loop deleting the
120  // structs _before_ the execution thread acquires
121  // it. So in addition to locking the mutex, the
122  // execution thread always checks request_canceled
123  // as well before accessing the structs.
124  Lock cancel_lock{cancel_mutex};
125  server_context.request_canceled = true;
126  };
127  // Update requests_threads map if not canceled. We know
128  // the request is not canceled currently because
129  // cancel_monitor.m_canceled was checked above and this
130  // code is running on the event loop thread.
131  std::tie(request_thread, inserted) = SetThread(
132  GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
133  [&] { return Accessor::get(call_context.getParams()).getCallbackThread(); });
134  });
135 
136  // If an entry was inserted into the request_threads map,
137  // remove it after calling fn.invoke. If an entry was not
138  // inserted, one already existed, meaning this must be a
139  // recursive call (IPC call calling back to the caller which
140  // makes another IPC call), so avoid modifying the map.
141  const bool erase_thread{inserted};
142  KJ_DEFER(
143  // Release the cancel lock before calling loop->sync and
144  // waiting for the event loop thread, because if a
145  // cancellation happened, it needs to run the on_cancel
146  // callback above. It's safe to release cancel_lock at
147  // this point because the fn.invoke() call below will be
148  // finished and no longer accessing the params or
149  // results structs.
150  cancel_lock.m_lock.unlock();
151  // Erase the request_threads entry on the event loop
152  // thread with loop->sync(), so if the connection is
153  // broken there is not a race between this thread and
154  // the disconnect handler trying to destroy the thread
155  // client object.
156  loop.sync([&] {
157  // Clear cancellation callback. At this point the
158  // method invocation finished and the result is
159  // either being returned, or discarded if a
160  // cancellation happened. So we do not need to be
161  // notified of cancellations after this point. Also
162  // we do not want to be notified because
163  // cancel_mutex and server_context could be out of
164  // scope when it happens.
165  cancel_monitor.m_on_cancel = nullptr;
166  auto self_dispose{kj::mv(self)};
167  if (erase_thread) {
168  // Look up the thread again without using existing
169  // iterator since entry may no longer be there after
170  // a disconnect. Destroy node after releasing
171  // Waiter::m_mutex, so the ProxyClient<Thread>
172  // destructor is able to use EventLoop::mutex
173  // without violating lock order.
174  ConnThreads::node_type removed;
175  {
176  Lock lock(thread_context.waiter->m_mutex);
177  removed = request_threads.extract(server.m_context.connection);
178  }
179  }
180  });
181  );
182  if (server_context.request_canceled) {
183  MP_LOG(loop, Log::Info) << "IPC server request #" << req << " canceled before it could be executed";
184  } else KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]{
185  try {
186  fn.invoke(server_context, args...);
187  } catch (const InterruptException& e) {
188  MP_LOG(loop, Log::Info) << "IPC server request #" << req << " interrupted (" << e.what() << ")";
189  }
190  })) {
191  MP_LOG(loop, Log::Error) << "IPC server request #" << req << " uncaught exception (" << kj::str(*exception).cStr() << ")";
192  kj::throwRecoverableException(kj::mv(*exception));
193  }
194  return call_context;
195  // End of scope: if KJ_DEFER was reached, it runs here
196  };
197 
198  // Lookup Thread object specified by the client. The specified thread should
199  // be a local Thread::Server object, but it needs to be looked up
200  // asynchronously with getLocalServer().
201  const auto& params = server_context.call_context.getParams();
202  Context::Reader context_arg = Accessor::get(params);
203  auto thread_client = context_arg.getThread();
204  auto result = server.m_context.connection->m_threads.getLocalServer(thread_client)
205  .then([&loop, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
206  // Assuming the thread object is found, pass it a pointer to the
207  // `invoke` lambda above which will invoke the function on that
208  // thread.
209  KJ_IF_MAYBE (thread_server, perhaps) {
210  auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
211  MP_LOG(loop, Log::Debug)
212  << "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
213  return thread.template post<typename ServerContext::CallContext>(std::move(invoke));
214  } else {
215  MP_LOG(loop, Log::Error)
216  << "IPC server error request #" << req << ", missing thread to execute request";
217  throw std::runtime_error("invalid thread handle");
218  }
219  });
220  // Use connection m_canceler object to cancel the result promise if the
221  // connection is destroyed. (By default Cap'n Proto does not cancel requests
222  // on disconnect, since it's possible clients might want to make requests
223  // and immediately disconnect without waiting for results, but not want the
224  // requests to be canceled.)
225  return server.m_context.connection->m_canceler.wrap(kj::mv(result));
226 }
227 } // namespace mp
228 
229 #endif // MP_PROXY_TYPE_CONTEXT_H
assert(!tx.IsCoinBase())
Function parameter type for prioritizing overloaded function calls that would otherwise be ambiguous...
Definition: util.h:108
ConnThreads::iterator ConnThread
Definition: proxy-io.h:654
Exception thrown from code executing an IPC call that is interrupted.
Definition: util.h:278
Generic utility functions used by capnp code.
Definition: util.h:32
consteval auto _(util::TranslatedLiteral str)
Definition: translation.h:79
const char * what() const noexcept override
Definition: util.h:280
Functions to serialize / deserialize common bitcoin types.
Definition: common-types.h:57
Event loop implementation.
Definition: proxy-io.h:238
void * m_context
External context pointer.
Definition: proxy-io.h:342
ArgsManager & args
Definition: bitcoind.cpp:277
Connection & connection
Definition: proxy-io.h:31
thread_local ThreadContext g_thread_context
Definition: proxy.cpp:41
ServerInvokeContext< ProxyServer< Interface >, ::capnp::CallContext< Params, Results > > ServerContext
Definition: proxy-io.h:72
ThreadContext & thread_context
Definition: proxy-io.h:36
auto result
Definition: common-types.h:74
void CustomBuildField(TypeList< LocalType >, Priority< 1 >, InvokeContext &invoke_context, Value &&value, Output &&output) requires Serializable< LocalType
Overload multiprocess library&#39;s CustomBuildField hook to allow any serializable object to be stored i...
std::tuple< ConnThread, bool > SetThread(GuardedRef< ConnThreads > threads, Connection *connection, const std::function< Thread::Client()> &make_thread)
Definition: proxy.cpp:321
#define MP_LOG(loop,...)
Definition: proxy-io.h:209
Definition: util.h:171
auto PassField(Priority< 1 >, TypeList< LocalType &>, ServerContext &server_context, Fn &&fn, Args &&... args) -> Require< typename decltype(Accessor::get(server_context.call_context.getParams()))::Calls >
PassField override for callable interface reference arguments.
Definition: proxy-types.h:291
Helper class that detects when a promise is canceled.
Definition: util.h:292