Bitcoin Core  31.0.0
P2P Digital Currency
net.cpp
Go to the documentation of this file.
1 // Copyright (c) 2020-present The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include <test/util/net.h>
6 
7 #include <net.h>
8 #include <net_processing.h>
9 #include <netaddress.h>
10 #include <netmessagemaker.h>
11 #include <node/connection_types.h>
12 #include <node/eviction.h>
13 #include <protocol.h>
14 #include <random.h>
15 #include <serialize.h>
16 #include <span.h>
17 #include <sync.h>
18 
19 #include <chrono>
20 #include <optional>
21 #include <vector>
22 
23 void ConnmanTestMsg::Handshake(CNode& node,
24  bool successfully_connected,
25  ServiceFlags remote_services,
26  ServiceFlags local_services,
27  int32_t version,
28  bool relay_txs)
29 {
30  auto& peerman{static_cast<PeerManager&>(*m_msgproc)};
31  auto& connman{*this};
32 
33  peerman.InitializeNode(node, local_services);
34  peerman.SendMessages(node);
35  FlushSendBuffer(node); // Drop the version message added by SendMessages.
36 
37  CSerializedNetMsg msg_version{
39  version, //
40  Using<CustomUintFormatter<8>>(remote_services), //
41  int64_t{}, // dummy time
42  int64_t{}, // ignored service bits
43  CNetAddr::V1(CService{}), // dummy
44  int64_t{}, // ignored service bits
45  CNetAddr::V1(CService{}), // ignored
46  uint64_t{1}, // dummy nonce
47  std::string{}, // dummy subver
48  int32_t{}, // dummy starting_height
49  relay_txs),
50  };
51 
52  (void)connman.ReceiveMsgFrom(node, std::move(msg_version));
53  node.fPauseSend = false;
54  connman.ProcessMessagesOnce(node);
55  peerman.SendMessages(node);
56  FlushSendBuffer(node); // Drop the verack message added by SendMessages.
57  if (node.fDisconnect) return;
58  assert(node.nVersion == version);
59  assert(node.GetCommonVersion() == std::min(version, PROTOCOL_VERSION));
60  CNodeStateStats statestats;
61  assert(peerman.GetNodeStateStats(node.GetId(), statestats));
62  assert(statestats.m_relay_txs == (relay_txs && !node.IsBlockOnlyConn()));
63  assert(statestats.their_services == remote_services);
64  if (successfully_connected) {
66  (void)connman.ReceiveMsgFrom(node, std::move(msg_verack));
67  node.fPauseSend = false;
68  connman.ProcessMessagesOnce(node);
69  peerman.SendMessages(node);
70  assert(node.fSuccessfullyConnected == true);
71  }
72 }
73 
75 
77 {
79  nMaxOutboundCycleStartTime = 0s;
80  nMaxOutboundTotalBytesSentInCycle = 0;
81 }
82 
84 {
89 }
90 
91 void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, std::span<const uint8_t> msg_bytes, bool& complete) const
92 {
93  assert(node.ReceiveMsgBytes(msg_bytes, complete));
94  if (complete) {
95  node.MarkReceivedMsgsForProcessing();
96  }
97 }
98 
100 {
101  LOCK(node.cs_vSend);
102  node.vSendMsg.clear();
103  node.m_send_memusage = 0;
104  while (true) {
105  const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false);
106  if (to_send.empty()) break;
107  node.m_transport->MarkBytesSent(to_send.size());
108  }
109 }
110 
112 {
113  bool queued = node.m_transport->SetMessageToSend(ser_msg);
114  assert(queued);
115  bool complete{false};
116  while (true) {
117  const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false);
118  if (to_send.empty()) break;
119  NodeReceiveMsgBytes(node, to_send, complete);
120  node.m_transport->MarkBytesSent(to_send.size());
121  }
122  return complete;
123 }
124 
125 CNode* ConnmanTestMsg::ConnectNodePublic(PeerManager& peerman, const char* pszDest, ConnectionType conn_type)
126 {
127  CNode* node = ConnectNode(CAddress{}, pszDest, /*fCountFailure=*/false, conn_type, /*use_v2transport=*/true, /*proxy_override=*/std::nullopt);
128  if (!node) return nullptr;
129  node->SetCommonVersion(PROTOCOL_VERSION);
131  node->fSuccessfullyConnected = true;
132  AddTestNode(*node);
133  return node;
134 }
135 
136 std::vector<NodeEvictionCandidate> GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext& random_context)
137 {
138  std::vector<NodeEvictionCandidate> candidates;
139  candidates.reserve(n_candidates);
140  for (int id = 0; id < n_candidates; ++id) {
141  candidates.push_back({
142  .id=id,
143  .m_connected=std::chrono::seconds{random_context.randrange(100)},
144  .m_min_ping_time=std::chrono::microseconds{random_context.randrange(100)},
145  .m_last_block_time=std::chrono::seconds{random_context.randrange(100)},
146  .m_last_tx_time=std::chrono::seconds{random_context.randrange(100)},
147  .fRelevantServices=random_context.randbool(),
148  .m_relay_txs=random_context.randbool(),
149  .fBloomFilter=random_context.randbool(),
150  .nKeyedNetGroup=random_context.randrange(100u),
151  .prefer_evict=random_context.randbool(),
152  .m_is_local=random_context.randbool(),
153  .m_network=ALL_NETWORKS[random_context.randrange(ALL_NETWORKS.size())],
154  .m_noban=false,
155  .m_conn_type=ConnectionType::INBOUND,
156  });
157  }
158  return candidates;
159 }
160 
161 // Have different ZeroSock (or others that inherit from it) objects have different
162 // m_socket because EqualSharedPtrSock compares m_socket and we want to avoid two
163 // different objects comparing as equal.
164 static std::atomic<SOCKET> g_mocked_sock_fd{0};
165 
167 
168 // Sock::~Sock() would try to close(2) m_socket if it is not INVALID_SOCKET, avoid that.
170 
171 ssize_t ZeroSock::Send(const void*, size_t len, int) const { return len; }
172 
173 ssize_t ZeroSock::Recv(void* buf, size_t len, int flags) const
174 {
175  memset(buf, 0x0, len);
176  return len;
177 }
178 
179 int ZeroSock::Connect(const sockaddr*, socklen_t) const { return 0; }
180 
181 int ZeroSock::Bind(const sockaddr*, socklen_t) const { return 0; }
182 
183 int ZeroSock::Listen(int) const { return 0; }
184 
185 std::unique_ptr<Sock> ZeroSock::Accept(sockaddr* addr, socklen_t* addr_len) const
186 {
187  if (addr != nullptr) {
188  // Pretend all connections come from 5.5.5.5:6789
189  memset(addr, 0x00, *addr_len);
190  const socklen_t write_len = static_cast<socklen_t>(sizeof(sockaddr_in));
191  if (*addr_len >= write_len) {
192  *addr_len = write_len;
193  sockaddr_in* addr_in = reinterpret_cast<sockaddr_in*>(addr);
194  addr_in->sin_family = AF_INET;
195  memset(&addr_in->sin_addr, 0x05, sizeof(addr_in->sin_addr));
196  addr_in->sin_port = htons(6789);
197  }
198  }
199  return std::make_unique<ZeroSock>();
200 }
201 
202 int ZeroSock::GetSockOpt(int level, int opt_name, void* opt_val, socklen_t* opt_len) const
203 {
204  std::memset(opt_val, 0x0, *opt_len);
205  return 0;
206 }
207 
208 int ZeroSock::SetSockOpt(int, int, const void*, socklen_t) const { return 0; }
209 
210 int ZeroSock::GetSockName(sockaddr* name, socklen_t* name_len) const
211 {
212  std::memset(name, 0x0, *name_len);
213  return 0;
214 }
215 
216 bool ZeroSock::SetNonBlocking() const { return true; }
217 
218 bool ZeroSock::IsSelectable() const { return true; }
219 
220 bool ZeroSock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred) const
221 {
222  if (occurred != nullptr) {
223  *occurred = requested;
224  }
225  return true;
226 }
227 
228 bool ZeroSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
229 {
230  for (auto& [sock, events] : events_per_sock) {
231  (void)sock;
232  events.occurred = events.requested;
233  }
234  return true;
235 }
236 
238 {
239  assert(false && "Move of Sock into ZeroSock not allowed.");
240  return *this;
241 }
242 
243 StaticContentsSock::StaticContentsSock(const std::string& contents)
244  : m_contents{contents}
245 {
246 }
247 
248 ssize_t StaticContentsSock::Recv(void* buf, size_t len, int flags) const
249 {
250  const size_t consume_bytes{std::min(len, m_contents.size() - m_consumed)};
251  std::memcpy(buf, m_contents.data() + m_consumed, consume_bytes);
252  if ((flags & MSG_PEEK) == 0) {
253  m_consumed += consume_bytes;
254  }
255  return consume_bytes;
256 }
257 
259 {
260  assert(false && "Move of Sock into StaticContentsSock not allowed.");
261  return *this;
262 }
263 
264 ssize_t DynSock::Pipe::GetBytes(void* buf, size_t len, int flags)
265 {
266  WAIT_LOCK(m_mutex, lock);
267 
268  if (m_data.empty()) {
269  if (m_eof) {
270  return 0;
271  }
272  errno = EAGAIN; // Same as recv(2) on a non-blocking socket.
273  return -1;
274  }
275 
276  const size_t read_bytes{std::min(len, m_data.size())};
277 
278  std::memcpy(buf, m_data.data(), read_bytes);
279  if ((flags & MSG_PEEK) == 0) {
280  m_data.erase(m_data.begin(), m_data.begin() + read_bytes);
281  }
282 
283  return read_bytes;
284 }
285 
286 std::optional<CNetMessage> DynSock::Pipe::GetNetMsg()
287 {
288  V1Transport transport{NodeId{0}};
289 
290  {
291  WAIT_LOCK(m_mutex, lock);
292 
293  WaitForDataOrEof(lock);
294  if (m_eof && m_data.empty()) {
295  return std::nullopt;
296  }
297 
298  for (;;) {
299  std::span<const uint8_t> s{m_data};
300  if (!transport.ReceivedBytes(s)) { // Consumed bytes are removed from the front of s.
301  return std::nullopt;
302  }
303  m_data.erase(m_data.begin(), m_data.begin() + m_data.size() - s.size());
304  if (transport.ReceivedMessageComplete()) {
305  break;
306  }
307  if (m_data.empty()) {
308  WaitForDataOrEof(lock);
309  if (m_eof && m_data.empty()) {
310  return std::nullopt;
311  }
312  }
313  }
314  }
315 
316  bool reject{false};
317  CNetMessage msg{transport.GetReceivedMessage(/*time=*/{}, reject)};
318  if (reject) {
319  return std::nullopt;
320  }
321  return std::make_optional<CNetMessage>(std::move(msg));
322 }
323 
324 void DynSock::Pipe::PushBytes(const void* buf, size_t len)
325 {
326  LOCK(m_mutex);
327  const uint8_t* b = static_cast<const uint8_t*>(buf);
328  m_data.insert(m_data.end(), b, b + len);
329  m_cond.notify_all();
330 }
331 
333 {
334  LOCK(m_mutex);
335  m_eof = true;
336  m_cond.notify_all();
337 }
338 
340 {
341  Assert(lock.mutex() == &m_mutex);
342 
343  m_cond.wait(lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) {
344  AssertLockHeld(m_mutex);
345  return !m_data.empty() || m_eof;
346  });
347 }
348 
349 DynSock::DynSock(std::shared_ptr<Pipes> pipes, std::shared_ptr<Queue> accept_sockets)
350  : m_pipes{pipes}, m_accept_sockets{accept_sockets}
351 {
352 }
353 
355 {
356  m_pipes->send.Eof();
357 }
358 
359 ssize_t DynSock::Recv(void* buf, size_t len, int flags) const
360 {
361  return m_pipes->recv.GetBytes(buf, len, flags);
362 }
363 
364 ssize_t DynSock::Send(const void* buf, size_t len, int) const
365 {
366  m_pipes->send.PushBytes(buf, len);
367  return len;
368 }
369 
370 std::unique_ptr<Sock> DynSock::Accept(sockaddr* addr, socklen_t* addr_len) const
371 {
372  ZeroSock::Accept(addr, addr_len);
373  return m_accept_sockets->Pop().value_or(nullptr);
374 }
375 
376 bool DynSock::Wait(std::chrono::milliseconds timeout,
377  Event requested,
378  Event* occurred) const
379 {
380  EventsPerSock ev;
381  ev.emplace(this, Events{requested});
382  const bool ret{WaitMany(timeout, ev)};
383  if (occurred != nullptr) {
384  *occurred = ev.begin()->second.occurred;
385  }
386  return ret;
387 }
388 
389 bool DynSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
390 {
391  const auto deadline = std::chrono::steady_clock::now() + timeout;
392  bool at_least_one_event_occurred{false};
393 
394  for (;;) {
395  // Check all sockets for readiness without waiting.
396  for (auto& [sock, events] : events_per_sock) {
397  if ((events.requested & Sock::SEND) != 0) {
398  // Always ready for Send().
399  events.occurred |= Sock::SEND;
400  at_least_one_event_occurred = true;
401  }
402 
403  if ((events.requested & Sock::RECV) != 0) {
404  auto dyn_sock = reinterpret_cast<const DynSock*>(sock.get());
405  uint8_t b;
406  if (dyn_sock->m_pipes->recv.GetBytes(&b, 1, MSG_PEEK) == 1 || !dyn_sock->m_accept_sockets->Empty()) {
407  events.occurred |= Sock::RECV;
408  at_least_one_event_occurred = true;
409  }
410  }
411  }
412 
413  if (at_least_one_event_occurred || std::chrono::steady_clock::now() > deadline) {
414  break;
415  }
416 
417  std::this_thread::sleep_for(10ms);
418  }
419 
420  return true;
421 }
422 
424 {
425  assert(false && "Move of Sock into DynSock not allowed.");
426  return *this;
427 }
int ret
bool SetNonBlocking() const override
Set the non-blocking option on the socket.
Definition: net.cpp:216
AssertLockHeld(pool.cs)
bool randbool() noexcept
Generate a random boolean.
Definition: random.h:325
A mocked Sock alternative that returns a statically contained data upon read and succeeds and ignores...
Definition: net.h:211
ServiceFlags
nServices flags
Definition: protocol.h:309
Inbound connections are those initiated by a peer.
assert(!tx.IsCoinBase())
Mutex m_total_bytes_sent_mutex
Definition: net.h:1573
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const override
Same as Wait(), but wait on many sockets within the same timeout.
Definition: net.cpp:228
ssize_t Recv(void *buf, size_t len, int flags) const override
recv(2) wrapper.
Definition: net.cpp:359
ssize_t Send(const void *buf, size_t len, int) const override
send(2) wrapper.
Definition: net.cpp:364
A mocked Sock alternative that allows providing the data to be returned by Recv() and inspecting the ...
Definition: net.h:238
bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const override
Wait for readiness for input (recv) or output (send).
Definition: net.cpp:220
std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const override
accept(2) wrapper.
Definition: net.cpp:370
void FlushSendBuffer(CNode &node) const
Definition: net.cpp:99
SOCKET m_socket
Contained socket.
Definition: sock.h:276
StaticContentsSock & operator=(Sock &&other) override
Move assignment operator, grab the socket from another object and close ours (if set).
Definition: net.cpp:258
memcpy(result.begin(), stream.data(), stream.size())
ZeroSock()
Definition: net.cpp:166
#define INVALID_SOCKET
Definition: compat.h:67
static Wrapper< Formatter, T & > Using(T &&t)
Cause serialization/deserialization of an object to be done using a specified formatter class...
Definition: serialize.h:488
std::shared_ptr< Pipes > m_pipes
Definition: net.h:358
void WaitForDataOrEof(UniqueLock< Mutex > &lock) EXCLUSIVE_LOCKS_REQUIRED(m_mutex)
Return when there is some data to read or EOF has been signaled.
Definition: net.cpp:339
std::map< uint64_t, CachedAddrResponse > m_addr_response_caches
Addr responses stored in different caches per (network, local socket) prevent cross-network node iden...
Definition: net.h:1640
CSerializedNetMsg Make(std::string msg_type, Args &&... args)
static constexpr Event SEND
If passed to Wait(), then it will wait for readiness to send to the socket.
Definition: sock.h:149
static std::atomic< SOCKET > g_mocked_sock_fd
Definition: net.cpp:164
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const override
Same as Wait(), but wait on many sockets within the same timeout.
Definition: net.cpp:389
void ResetMaxOutboundCycle()
Definition: net.cpp:76
void ResetAddrCache()
Definition: net.cpp:74
constexpr const char * VERSION
The version message provides information about the transmitting node to the receiving node at the beg...
Definition: protocol.h:65
std::optional< CNetMessage > GetNetMsg() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Deserialize a CNetMessage and remove it from the pipe.
Definition: net.cpp:286
void Eof() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Signal end-of-file on the receiving end (GetBytes() or GetNetMsg()).
Definition: net.cpp:332
std::unordered_map< std::shared_ptr< const Sock >, Events, HashSharedPtrSock, EqualSharedPtrSock > EventsPerSock
On which socket to wait for what events in WaitMany().
Definition: sock.h:209
int Listen(int) const override
listen(2) wrapper.
Definition: net.cpp:183
#define LOCK(cs)
Definition: sync.h:258
const char * name
Definition: rest.cpp:48
constexpr const char * VERACK
The verack message acknowledges a previously-received version message, informing the connecting node ...
Definition: protocol.h:70
bool IsSelectable() const override
Check if the underlying socket can be used for select(2) (or the Wait() method).
Definition: net.cpp:218
A combination of a network address (CNetAddr) and a (TCP) port.
Definition: netaddress.h:529
Fast randomness source.
Definition: random.h:385
Transport protocol agnostic message container.
Definition: net.h:237
Mutex m_mutex
Definition: net.h:287
static constexpr SerParams V1
Definition: netaddress.h:231
A CService with information about it as peer.
Definition: protocol.h:366
std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const override
accept(2) wrapper.
Definition: net.cpp:185
int SetSockOpt(int, int, const void *, socklen_t) const override
setsockopt(2) wrapper.
Definition: net.cpp:208
static const int PROTOCOL_VERSION
network protocol versioning
A mocked Sock alternative that succeeds on all operations.
Definition: net.h:167
int64_t NodeId
Definition: net.h:103
ZeroSock & operator=(Sock &&other) override
Move assignment operator, grab the socket from another object and close ours (if set).
Definition: net.cpp:237
#define WAIT_LOCK(cs, name)
Definition: sync.h:264
int Bind(const sockaddr *, socklen_t) const override
bind(2) wrapper.
Definition: net.cpp:181
CNode * ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport, const std::optional< Proxy > &proxy_override) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex)
Open a new P2P connection.
Definition: net.cpp:372
uint8_t Event
Definition: sock.h:139
bool ReceiveMsgFrom(CNode &node, CSerializedNetMsg &&ser_msg) const
Definition: net.cpp:111
StaticContentsSock(const std::string &contents)
Definition: net.cpp:243
const std::string m_contents
Definition: net.h:230
Definition: messages.h:21
int flags
Definition: bitcoin-tx.cpp:529
ssize_t GetBytes(void *buf, size_t len, int flags=0) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Get bytes and remove them from the pipe.
Definition: net.cpp:264
#define EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: threadsafety.h:51
int GetSockOpt(int level, int opt_name, void *opt_val, socklen_t *opt_len) const override
getsockopt(2) wrapper.
Definition: net.cpp:202
static constexpr Event RECV
If passed to Wait(), then it will wait for readiness to read from the socket.
Definition: sock.h:144
ssize_t Recv(void *buf, size_t len, int flags) const override
recv(2) wrapper.
Definition: net.cpp:173
DynSock(std::shared_ptr< Pipes > pipes, std::shared_ptr< Queue > accept_sockets)
Create a new mocked sock.
Definition: net.cpp:349
class CConnman::PrivateBroadcast m_private_broadcast
std::atomic_bool m_outbound_tor_ok_at_least_once
Remember if we ever established at least one outbound connection to a Tor peer, including sending and...
Definition: net.h:1204
Serialization wrapper class for custom integers and enums.
Definition: serialize.h:520
void PushBytes(const void *buf, size_t len) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Push bytes to the pipe.
Definition: net.cpp:324
~ZeroSock() override
Definition: net.cpp:169
void NodeReceiveMsgBytes(CNode &node, std::span< const uint8_t > msg_bytes, bool &complete) const
Definition: net.cpp:91
I randrange(I range) noexcept
Generate a random integer in the range [0..range), with range > 0.
Definition: random.h:254
ssize_t Recv(void *buf, size_t len, int flags) const override
Return parts of the contents that was provided at construction until it is exhausted and then return ...
Definition: net.cpp:248
constexpr auto ALL_NETWORKS
Definition: net.h:153
ConnectionType
Different types of connections to a peer.
Wrapper around std::unique_lock style lock for MutexType.
Definition: sync.h:145
~DynSock()
Definition: net.cpp:354
RAII helper class that manages a socket and closes it automatically when it goes out of scope...
Definition: sock.h:27
std::vector< NodeEvictionCandidate > GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext &random_context)
Definition: net.cpp:136
ssize_t Send(const void *, size_t len, int) const override
send(2) wrapper.
Definition: net.cpp:171
std::atomic_size_t m_num_to_open
Number of ConnectionType::PRIVATE_BROADCAST connections to open.
Definition: net.h:1253
Information about a peer.
Definition: net.h:679
int GetSockName(sockaddr *name, socklen_t *name_len) const override
getsockname(2) wrapper.
Definition: net.cpp:210
int Connect(const sockaddr *, socklen_t) const override
connect(2) wrapper.
Definition: net.cpp:179
virtual void InitializeNode(const CNode &node, ServiceFlags our_services)=0
Initialize a peer (setup state)
Auxiliary requested/occurred events to wait for in WaitMany().
Definition: sock.h:174
size_t m_consumed
Definition: net.h:231
CNode * ConnectNodePublic(PeerManager &peerman, const char *pszDest, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex)
Definition: net.cpp:125
void AddTestNode(CNode &node)
Definition: net.h:61
void Reset()
Reset the internal state.
Definition: net.cpp:83
#define Assert(val)
Identity function.
Definition: check.h:113
std::shared_ptr< Queue > m_accept_sockets
Definition: net.h:359
DynSock & operator=(Sock &&) override
Move assignment operator, grab the socket from another object and close ours (if set).
Definition: net.cpp:423
bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const override
Wait for readiness for input (recv) or output (send).
Definition: net.cpp:376