Bitcoin Core  29.1.0
P2P Digital Currency
net.cpp
Go to the documentation of this file.
1 // Copyright (c) 2020-2022 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include <test/util/net.h>
6 
7 #include <net.h>
8 #include <net_processing.h>
9 #include <netaddress.h>
10 #include <netmessagemaker.h>
11 #include <node/connection_types.h>
12 #include <node/eviction.h>
13 #include <protocol.h>
14 #include <random.h>
15 #include <serialize.h>
16 #include <span.h>
17 #include <sync.h>
18 
19 #include <chrono>
20 #include <optional>
21 #include <vector>
22 
23 void ConnmanTestMsg::Handshake(CNode& node,
24  bool successfully_connected,
25  ServiceFlags remote_services,
26  ServiceFlags local_services,
27  int32_t version,
28  bool relay_txs)
29 {
30  auto& peerman{static_cast<PeerManager&>(*m_msgproc)};
31  auto& connman{*this};
32 
33  peerman.InitializeNode(node, local_services);
34  peerman.SendMessages(&node);
35  FlushSendBuffer(node); // Drop the version message added by SendMessages.
36 
37  CSerializedNetMsg msg_version{
39  version, //
40  Using<CustomUintFormatter<8>>(remote_services), //
41  int64_t{}, // dummy time
42  int64_t{}, // ignored service bits
43  CNetAddr::V1(CService{}), // dummy
44  int64_t{}, // ignored service bits
45  CNetAddr::V1(CService{}), // ignored
46  uint64_t{1}, // dummy nonce
47  std::string{}, // dummy subver
48  int32_t{}, // dummy starting_height
49  relay_txs),
50  };
51 
52  (void)connman.ReceiveMsgFrom(node, std::move(msg_version));
53  node.fPauseSend = false;
54  connman.ProcessMessagesOnce(node);
55  peerman.SendMessages(&node);
56  FlushSendBuffer(node); // Drop the verack message added by SendMessages.
57  if (node.fDisconnect) return;
58  assert(node.nVersion == version);
59  assert(node.GetCommonVersion() == std::min(version, PROTOCOL_VERSION));
60  CNodeStateStats statestats;
61  assert(peerman.GetNodeStateStats(node.GetId(), statestats));
62  assert(statestats.m_relay_txs == (relay_txs && !node.IsBlockOnlyConn()));
63  assert(statestats.their_services == remote_services);
64  if (successfully_connected) {
66  (void)connman.ReceiveMsgFrom(node, std::move(msg_verack));
67  node.fPauseSend = false;
68  connman.ProcessMessagesOnce(node);
69  peerman.SendMessages(&node);
70  assert(node.fSuccessfullyConnected == true);
71  }
72 }
73 
74 void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_bytes, bool& complete) const
75 {
76  assert(node.ReceiveMsgBytes(msg_bytes, complete));
77  if (complete) {
78  node.MarkReceivedMsgsForProcessing();
79  }
80 }
81 
83 {
84  LOCK(node.cs_vSend);
85  node.vSendMsg.clear();
86  node.m_send_memusage = 0;
87  while (true) {
88  const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false);
89  if (to_send.empty()) break;
90  node.m_transport->MarkBytesSent(to_send.size());
91  }
92 }
93 
95 {
96  bool queued = node.m_transport->SetMessageToSend(ser_msg);
97  assert(queued);
98  bool complete{false};
99  while (true) {
100  const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false);
101  if (to_send.empty()) break;
102  NodeReceiveMsgBytes(node, to_send, complete);
103  node.m_transport->MarkBytesSent(to_send.size());
104  }
105  return complete;
106 }
107 
108 CNode* ConnmanTestMsg::ConnectNodePublic(PeerManager& peerman, const char* pszDest, ConnectionType conn_type)
109 {
110  CNode* node = ConnectNode(CAddress{}, pszDest, /*fCountFailure=*/false, conn_type, /*use_v2transport=*/true);
111  if (!node) return nullptr;
112  node->SetCommonVersion(PROTOCOL_VERSION);
114  node->fSuccessfullyConnected = true;
115  AddTestNode(*node);
116  return node;
117 }
118 
119 std::vector<NodeEvictionCandidate> GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext& random_context)
120 {
121  std::vector<NodeEvictionCandidate> candidates;
122  candidates.reserve(n_candidates);
123  for (int id = 0; id < n_candidates; ++id) {
124  candidates.push_back({
125  .id=id,
126  .m_connected=std::chrono::seconds{random_context.randrange(100)},
127  .m_min_ping_time=std::chrono::microseconds{random_context.randrange(100)},
128  .m_last_block_time=std::chrono::seconds{random_context.randrange(100)},
129  .m_last_tx_time=std::chrono::seconds{random_context.randrange(100)},
130  .fRelevantServices=random_context.randbool(),
131  .m_relay_txs=random_context.randbool(),
132  .fBloomFilter=random_context.randbool(),
133  .nKeyedNetGroup=random_context.randrange(100u),
134  .prefer_evict=random_context.randbool(),
135  .m_is_local=random_context.randbool(),
136  .m_network=ALL_NETWORKS[random_context.randrange(ALL_NETWORKS.size())],
137  .m_noban=false,
138  .m_conn_type=ConnectionType::INBOUND,
139  });
140  }
141  return candidates;
142 }
143 
144 // Have different ZeroSock (or others that inherit from it) objects have different
145 // m_socket because EqualSharedPtrSock compares m_socket and we want to avoid two
146 // different objects comparing as equal.
147 static std::atomic<SOCKET> g_mocked_sock_fd{0};
148 
150 
151 // Sock::~Sock() would try to close(2) m_socket if it is not INVALID_SOCKET, avoid that.
153 
154 ssize_t ZeroSock::Send(const void*, size_t len, int) const { return len; }
155 
156 ssize_t ZeroSock::Recv(void* buf, size_t len, int flags) const
157 {
158  memset(buf, 0x0, len);
159  return len;
160 }
161 
162 int ZeroSock::Connect(const sockaddr*, socklen_t) const { return 0; }
163 
164 int ZeroSock::Bind(const sockaddr*, socklen_t) const { return 0; }
165 
166 int ZeroSock::Listen(int) const { return 0; }
167 
168 std::unique_ptr<Sock> ZeroSock::Accept(sockaddr* addr, socklen_t* addr_len) const
169 {
170  if (addr != nullptr) {
171  // Pretend all connections come from 5.5.5.5:6789
172  memset(addr, 0x00, *addr_len);
173  const socklen_t write_len = static_cast<socklen_t>(sizeof(sockaddr_in));
174  if (*addr_len >= write_len) {
175  *addr_len = write_len;
176  sockaddr_in* addr_in = reinterpret_cast<sockaddr_in*>(addr);
177  addr_in->sin_family = AF_INET;
178  memset(&addr_in->sin_addr, 0x05, sizeof(addr_in->sin_addr));
179  addr_in->sin_port = htons(6789);
180  }
181  }
182  return std::make_unique<ZeroSock>();
183 }
184 
185 int ZeroSock::GetSockOpt(int level, int opt_name, void* opt_val, socklen_t* opt_len) const
186 {
187  std::memset(opt_val, 0x0, *opt_len);
188  return 0;
189 }
190 
191 int ZeroSock::SetSockOpt(int, int, const void*, socklen_t) const { return 0; }
192 
193 int ZeroSock::GetSockName(sockaddr* name, socklen_t* name_len) const
194 {
195  std::memset(name, 0x0, *name_len);
196  return 0;
197 }
198 
199 bool ZeroSock::SetNonBlocking() const { return true; }
200 
201 bool ZeroSock::IsSelectable() const { return true; }
202 
203 bool ZeroSock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred) const
204 {
205  if (occurred != nullptr) {
206  *occurred = requested;
207  }
208  return true;
209 }
210 
211 bool ZeroSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
212 {
213  for (auto& [sock, events] : events_per_sock) {
214  (void)sock;
215  events.occurred = events.requested;
216  }
217  return true;
218 }
219 
221 {
222  assert(false && "Move of Sock into ZeroSock not allowed.");
223  return *this;
224 }
225 
226 StaticContentsSock::StaticContentsSock(const std::string& contents)
227  : m_contents{contents}
228 {
229 }
230 
231 ssize_t StaticContentsSock::Recv(void* buf, size_t len, int flags) const
232 {
233  const size_t consume_bytes{std::min(len, m_contents.size() - m_consumed)};
234  std::memcpy(buf, m_contents.data() + m_consumed, consume_bytes);
235  if ((flags & MSG_PEEK) == 0) {
236  m_consumed += consume_bytes;
237  }
238  return consume_bytes;
239 }
240 
242 {
243  assert(false && "Move of Sock into StaticContentsSock not allowed.");
244  return *this;
245 }
246 
247 ssize_t DynSock::Pipe::GetBytes(void* buf, size_t len, int flags)
248 {
249  WAIT_LOCK(m_mutex, lock);
250 
251  if (m_data.empty()) {
252  if (m_eof) {
253  return 0;
254  }
255  errno = EAGAIN; // Same as recv(2) on a non-blocking socket.
256  return -1;
257  }
258 
259  const size_t read_bytes{std::min(len, m_data.size())};
260 
261  std::memcpy(buf, m_data.data(), read_bytes);
262  if ((flags & MSG_PEEK) == 0) {
263  m_data.erase(m_data.begin(), m_data.begin() + read_bytes);
264  }
265 
266  return read_bytes;
267 }
268 
269 std::optional<CNetMessage> DynSock::Pipe::GetNetMsg()
270 {
271  V1Transport transport{NodeId{0}};
272 
273  {
274  WAIT_LOCK(m_mutex, lock);
275 
276  WaitForDataOrEof(lock);
277  if (m_eof && m_data.empty()) {
278  return std::nullopt;
279  }
280 
281  for (;;) {
282  Span<const uint8_t> s{m_data};
283  if (!transport.ReceivedBytes(s)) { // Consumed bytes are removed from the front of s.
284  return std::nullopt;
285  }
286  m_data.erase(m_data.begin(), m_data.begin() + m_data.size() - s.size());
287  if (transport.ReceivedMessageComplete()) {
288  break;
289  }
290  if (m_data.empty()) {
291  WaitForDataOrEof(lock);
292  if (m_eof && m_data.empty()) {
293  return std::nullopt;
294  }
295  }
296  }
297  }
298 
299  bool reject{false};
300  CNetMessage msg{transport.GetReceivedMessage(/*time=*/{}, reject)};
301  if (reject) {
302  return std::nullopt;
303  }
304  return std::make_optional<CNetMessage>(std::move(msg));
305 }
306 
307 void DynSock::Pipe::PushBytes(const void* buf, size_t len)
308 {
309  LOCK(m_mutex);
310  const uint8_t* b = static_cast<const uint8_t*>(buf);
311  m_data.insert(m_data.end(), b, b + len);
312  m_cond.notify_all();
313 }
314 
316 {
317  LOCK(m_mutex);
318  m_eof = true;
319  m_cond.notify_all();
320 }
321 
323 {
324  Assert(lock.mutex() == &m_mutex);
325 
326  m_cond.wait(lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) {
327  AssertLockHeld(m_mutex);
328  return !m_data.empty() || m_eof;
329  });
330 }
331 
332 DynSock::DynSock(std::shared_ptr<Pipes> pipes, std::shared_ptr<Queue> accept_sockets)
333  : m_pipes{pipes}, m_accept_sockets{accept_sockets}
334 {
335 }
336 
338 {
339  m_pipes->send.Eof();
340 }
341 
342 ssize_t DynSock::Recv(void* buf, size_t len, int flags) const
343 {
344  return m_pipes->recv.GetBytes(buf, len, flags);
345 }
346 
347 ssize_t DynSock::Send(const void* buf, size_t len, int) const
348 {
349  m_pipes->send.PushBytes(buf, len);
350  return len;
351 }
352 
353 std::unique_ptr<Sock> DynSock::Accept(sockaddr* addr, socklen_t* addr_len) const
354 {
355  ZeroSock::Accept(addr, addr_len);
356  return m_accept_sockets->Pop().value_or(nullptr);
357 }
358 
359 bool DynSock::Wait(std::chrono::milliseconds timeout,
360  Event requested,
361  Event* occurred) const
362 {
363  EventsPerSock ev;
364  ev.emplace(this, Events{requested});
365  const bool ret{WaitMany(timeout, ev)};
366  if (occurred != nullptr) {
367  *occurred = ev.begin()->second.occurred;
368  }
369  return ret;
370 }
371 
372 bool DynSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
373 {
374  const auto deadline = std::chrono::steady_clock::now() + timeout;
375  bool at_least_one_event_occurred{false};
376 
377  for (;;) {
378  // Check all sockets for readiness without waiting.
379  for (auto& [sock, events] : events_per_sock) {
380  if ((events.requested & Sock::SEND) != 0) {
381  // Always ready for Send().
382  events.occurred |= Sock::SEND;
383  at_least_one_event_occurred = true;
384  }
385 
386  if ((events.requested & Sock::RECV) != 0) {
387  auto dyn_sock = reinterpret_cast<const DynSock*>(sock.get());
388  uint8_t b;
389  if (dyn_sock->m_pipes->recv.GetBytes(&b, 1, MSG_PEEK) == 1 || !dyn_sock->m_accept_sockets->Empty()) {
390  events.occurred |= Sock::RECV;
391  at_least_one_event_occurred = true;
392  }
393  }
394  }
395 
396  if (at_least_one_event_occurred || std::chrono::steady_clock::now() > deadline) {
397  break;
398  }
399 
400  std::this_thread::sleep_for(10ms);
401  }
402 
403  return true;
404 }
405 
407 {
408  assert(false && "Move of Sock into DynSock not allowed.");
409  return *this;
410 }
int ret
bool SetNonBlocking() const override
Set the non-blocking option on the socket.
Definition: net.cpp:199
AssertLockHeld(pool.cs)
bool randbool() noexcept
Generate a random boolean.
Definition: random.h:316
A mocked Sock alternative that returns a statically contained data upon read and succeeds and ignores...
Definition: net.h:185
ServiceFlags
nServices flags
Definition: protocol.h:309
Inbound connections are those initiated by a peer.
assert(!tx.IsCoinBase())
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const override
Same as Wait(), but wait on many sockets within the same timeout.
Definition: net.cpp:211
ssize_t Recv(void *buf, size_t len, int flags) const override
recv(2) wrapper.
Definition: net.cpp:342
ssize_t Send(const void *buf, size_t len, int) const override
send(2) wrapper.
Definition: net.cpp:347
A mocked Sock alternative that allows providing the data to be returned by Recv() and inspecting the ...
Definition: net.h:212
bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const override
Wait for readiness for input (recv) or output (send).
Definition: net.cpp:203
std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const override
accept(2) wrapper.
Definition: net.cpp:353
void FlushSendBuffer(CNode &node) const
Definition: net.cpp:82
SOCKET m_socket
Contained socket.
Definition: sock.h:275
StaticContentsSock & operator=(Sock &&other) override
Move assignment operator, grab the socket from another object and close ours (if set).
Definition: net.cpp:241
memcpy(result.begin(), stream.data(), stream.size())
ZeroSock()
Definition: net.cpp:149
#define INVALID_SOCKET
Definition: compat.h:56
static Wrapper< Formatter, T & > Using(T &&t)
Cause serialization/deserialization of an object to be done using a specified formatter class...
Definition: serialize.h:497
std::shared_ptr< Pipes > m_pipes
Definition: net.h:332
void WaitForDataOrEof(UniqueLock< Mutex > &lock) EXCLUSIVE_LOCKS_REQUIRED(m_mutex)
Return when there is some data to read or EOF has been signaled.
Definition: net.cpp:322
CSerializedNetMsg Make(std::string msg_type, Args &&... args)
static constexpr Event SEND
If passed to Wait(), then it will wait for readiness to send to the socket.
Definition: sock.h:148
static std::atomic< SOCKET > g_mocked_sock_fd
Definition: net.cpp:147
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const override
Same as Wait(), but wait on many sockets within the same timeout.
Definition: net.cpp:372
constexpr const char * VERSION
The version message provides information about the transmitting node to the receiving node at the beg...
Definition: protocol.h:65
std::optional< CNetMessage > GetNetMsg() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Deserialize a CNetMessage and remove it from the pipe.
Definition: net.cpp:269
void Eof() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Signal end-of-file on the receiving end (GetBytes() or GetNetMsg()).
Definition: net.cpp:315
std::unordered_map< std::shared_ptr< const Sock >, Events, HashSharedPtrSock, EqualSharedPtrSock > EventsPerSock
On which socket to wait for what events in WaitMany().
Definition: sock.h:208
int Listen(int) const override
listen(2) wrapper.
Definition: net.cpp:166
CNode * ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex)
Definition: net.cpp:397
#define LOCK(cs)
Definition: sync.h:257
const char * name
Definition: rest.cpp:49
constexpr const char * VERACK
The verack message acknowledges a previously-received version message, informing the connecting node ...
Definition: protocol.h:70
bool IsSelectable() const override
Check if the underlying socket can be used for select(2) (or the Wait() method).
Definition: net.cpp:201
A combination of a network address (CNetAddr) and a (TCP) port.
Definition: netaddress.h:530
Fast randomness source.
Definition: random.h:376
Transport protocol agnostic message container.
Definition: net.h:230
Mutex m_mutex
Definition: net.h:261
static constexpr SerParams V1
Definition: netaddress.h:231
void NodeReceiveMsgBytes(CNode &node, Span< const uint8_t > msg_bytes, bool &complete) const
Definition: net.cpp:74
A CService with information about it as peer.
Definition: protocol.h:366
std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const override
accept(2) wrapper.
Definition: net.cpp:168
int SetSockOpt(int, int, const void *, socklen_t) const override
setsockopt(2) wrapper.
Definition: net.cpp:191
static const int PROTOCOL_VERSION
network protocol versioning
A mocked Sock alternative that succeeds on all operations.
Definition: net.h:141
int64_t NodeId
Definition: net.h:97
ZeroSock & operator=(Sock &&other) override
Move assignment operator, grab the socket from another object and close ours (if set).
Definition: net.cpp:220
#define WAIT_LOCK(cs, name)
Definition: sync.h:262
int Bind(const sockaddr *, socklen_t) const override
bind(2) wrapper.
Definition: net.cpp:164
uint8_t Event
Definition: sock.h:138
bool ReceiveMsgFrom(CNode &node, CSerializedNetMsg &&ser_msg) const
Definition: net.cpp:94
StaticContentsSock(const std::string &contents)
Definition: net.cpp:226
const std::string m_contents
Definition: net.h:204
Definition: messages.h:20
int flags
Definition: bitcoin-tx.cpp:536
ssize_t GetBytes(void *buf, size_t len, int flags=0) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Get bytes and remove them from the pipe.
Definition: net.cpp:247
#define EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: threadsafety.h:49
int GetSockOpt(int level, int opt_name, void *opt_val, socklen_t *opt_len) const override
getsockopt(2) wrapper.
Definition: net.cpp:185
static constexpr Event RECV
If passed to Wait(), then it will wait for readiness to read from the socket.
Definition: sock.h:143
ssize_t Recv(void *buf, size_t len, int flags) const override
recv(2) wrapper.
Definition: net.cpp:156
DynSock(std::shared_ptr< Pipes > pipes, std::shared_ptr< Queue > accept_sockets)
Create a new mocked sock.
Definition: net.cpp:332
Serialization wrapper class for custom integers and enums.
Definition: serialize.h:529
void PushBytes(const void *buf, size_t len) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Push bytes to the pipe.
Definition: net.cpp:307
~ZeroSock() override
Definition: net.cpp:152
I randrange(I range) noexcept
Generate a random integer in the range [0..range), with range > 0.
Definition: random.h:254
ssize_t Recv(void *buf, size_t len, int flags) const override
Return parts of the contents that was provided at construction until it is exhausted and then return ...
Definition: net.cpp:231
constexpr auto ALL_NETWORKS
Definition: net.h:127
ConnectionType
Different types of connections to a peer.
Wrapper around std::unique_lock style lock for MutexType.
Definition: sync.h:151
~DynSock()
Definition: net.cpp:337
RAII helper class that manages a socket and closes it automatically when it goes out of scope...
Definition: sock.h:26
std::vector< NodeEvictionCandidate > GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext &random_context)
Definition: net.cpp:119
ssize_t Send(const void *, size_t len, int) const override
send(2) wrapper.
Definition: net.cpp:154
A Span is an object that can refer to a contiguous sequence of objects.
Definition: span.h:97
Information about a peer.
Definition: net.h:672
int GetSockName(sockaddr *name, socklen_t *name_len) const override
getsockname(2) wrapper.
Definition: net.cpp:193
int Connect(const sockaddr *, socklen_t) const override
connect(2) wrapper.
Definition: net.cpp:162
virtual void InitializeNode(const CNode &node, ServiceFlags our_services)=0
Initialize a peer (setup state)
Auxiliary requested/occurred events to wait for in WaitMany().
Definition: sock.h:173
size_t m_consumed
Definition: net.h:205
CNode * ConnectNodePublic(PeerManager &peerman, const char *pszDest, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex)
Definition: net.cpp:108
void AddTestNode(CNode &node)
Definition: net.h:54
#define Assert(val)
Identity function.
Definition: check.h:85
std::shared_ptr< Queue > m_accept_sockets
Definition: net.h:333
DynSock & operator=(Sock &&) override
Move assignment operator, grab the socket from another object and close ours (if set).
Definition: net.cpp:406
bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const override
Wait for readiness for input (recv) or output (send).
Definition: net.cpp:359