23 void ConnmanTestMsg::Handshake(
CNode&
node,
24 bool successfully_connected,
30 auto& peerman{
static_cast<PeerManager&
>(*m_msgproc)};
34 peerman.SendMessages(&
node);
52 (void)connman.ReceiveMsgFrom(
node, std::move(msg_version));
53 node.fPauseSend =
false;
54 connman.ProcessMessagesOnce(
node);
55 peerman.SendMessages(&
node);
57 if (
node.fDisconnect)
return;
61 assert(peerman.GetNodeStateStats(
node.GetId(), statestats));
62 assert(statestats.m_relay_txs == (relay_txs && !
node.IsBlockOnlyConn()));
63 assert(statestats.their_services == remote_services);
64 if (successfully_connected) {
66 (void)connman.ReceiveMsgFrom(
node, std::move(msg_verack));
67 node.fPauseSend =
false;
68 connman.ProcessMessagesOnce(
node);
69 peerman.SendMessages(&
node);
76 assert(
node.ReceiveMsgBytes(msg_bytes, complete));
78 node.MarkReceivedMsgsForProcessing();
85 node.vSendMsg.clear();
86 node.m_send_memusage = 0;
88 const auto& [to_send, _more, _msg_type] =
node.m_transport->GetBytesToSend(
false);
89 if (to_send.empty())
break;
90 node.m_transport->MarkBytesSent(to_send.size());
96 bool queued =
node.m_transport->SetMessageToSend(ser_msg);
100 const auto& [to_send, _more, _msg_type] =
node.m_transport->GetBytesToSend(
false);
101 if (to_send.empty())
break;
103 node.m_transport->MarkBytesSent(to_send.size());
111 if (!
node)
return nullptr;
114 node->fSuccessfullyConnected =
true;
121 std::vector<NodeEvictionCandidate> candidates;
122 candidates.reserve(n_candidates);
123 for (
int id = 0;
id < n_candidates; ++id) {
124 candidates.push_back({
126 .m_connected=std::chrono::seconds{random_context.
randrange(100)},
127 .m_min_ping_time=std::chrono::microseconds{random_context.
randrange(100)},
128 .m_last_block_time=std::chrono::seconds{random_context.
randrange(100)},
129 .m_last_tx_time=std::chrono::seconds{random_context.
randrange(100)},
130 .fRelevantServices=random_context.
randbool(),
131 .m_relay_txs=random_context.
randbool(),
132 .fBloomFilter=random_context.
randbool(),
133 .nKeyedNetGroup=random_context.
randrange(100u),
134 .prefer_evict=random_context.
randbool(),
135 .m_is_local=random_context.
randbool(),
158 memset(buf, 0x0, len);
170 if (addr !=
nullptr) {
172 memset(addr, 0x00, *addr_len);
173 const socklen_t write_len =
static_cast<socklen_t
>(
sizeof(sockaddr_in));
174 if (*addr_len >= write_len) {
175 *addr_len = write_len;
176 sockaddr_in* addr_in =
reinterpret_cast<sockaddr_in*
>(addr);
177 addr_in->sin_family = AF_INET;
178 memset(&addr_in->sin_addr, 0x05,
sizeof(addr_in->sin_addr));
179 addr_in->sin_port = htons(6789);
182 return std::make_unique<ZeroSock>();
187 std::memset(opt_val, 0x0, *opt_len);
195 std::memset(
name, 0x0, *name_len);
205 if (occurred !=
nullptr) {
206 *occurred = requested;
213 for (
auto& [sock, events] : events_per_sock) {
215 events.occurred = events.requested;
222 assert(
false &&
"Move of Sock into ZeroSock not allowed.");
227 : m_contents{contents}
235 if ((
flags & MSG_PEEK) == 0) {
238 return consume_bytes;
243 assert(
false &&
"Move of Sock into StaticContentsSock not allowed.");
251 if (m_data.empty()) {
259 const size_t read_bytes{std::min(len, m_data.size())};
262 if ((
flags & MSG_PEEK) == 0) {
263 m_data.erase(m_data.begin(), m_data.begin() + read_bytes);
276 WaitForDataOrEof(lock);
277 if (m_eof && m_data.empty()) {
283 if (!transport.ReceivedBytes(
s)) {
286 m_data.erase(m_data.begin(), m_data.begin() + m_data.size() -
s.size());
287 if (transport.ReceivedMessageComplete()) {
290 if (m_data.empty()) {
291 WaitForDataOrEof(lock);
292 if (m_eof && m_data.empty()) {
304 return std::make_optional<CNetMessage>(std::move(
msg));
310 const uint8_t* b =
static_cast<const uint8_t*
>(buf);
311 m_data.insert(m_data.end(), b, b + len);
324 Assert(lock.mutex() == &m_mutex);
328 return !m_data.empty() || m_eof;
333 :
m_pipes{pipes}, m_accept_sockets{accept_sockets}
349 m_pipes->send.PushBytes(buf, len);
361 Event* occurred)
const 364 ev.emplace(
this,
Events{requested});
366 if (occurred !=
nullptr) {
367 *occurred = ev.begin()->second.occurred;
374 const auto deadline = std::chrono::steady_clock::now() + timeout;
375 bool at_least_one_event_occurred{
false};
379 for (
auto& [sock, events] : events_per_sock) {
383 at_least_one_event_occurred =
true;
387 auto dyn_sock =
reinterpret_cast<const DynSock*
>(sock.get());
389 if (dyn_sock->m_pipes->recv.GetBytes(&b, 1, MSG_PEEK) == 1 || !dyn_sock->m_accept_sockets->Empty()) {
391 at_least_one_event_occurred =
true;
396 if (at_least_one_event_occurred || std::chrono::steady_clock::now() > deadline) {
400 std::this_thread::sleep_for(10ms);
408 assert(
false &&
"Move of Sock into DynSock not allowed.");
bool SetNonBlocking() const override
Set the non-blocking option on the socket.
bool randbool() noexcept
Generate a random boolean.
A mocked Sock alternative that returns a statically contained data upon read and succeeds and ignores...
ServiceFlags
nServices flags
Inbound connections are those initiated by a peer.
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const override
Same as Wait(), but wait on many sockets within the same timeout.
ssize_t Recv(void *buf, size_t len, int flags) const override
recv(2) wrapper.
ssize_t Send(const void *buf, size_t len, int) const override
send(2) wrapper.
A mocked Sock alternative that allows providing the data to be returned by Recv() and inspecting the ...
bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const override
Wait for readiness for input (recv) or output (send).
std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const override
accept(2) wrapper.
void FlushSendBuffer(CNode &node) const
SOCKET m_socket
Contained socket.
StaticContentsSock & operator=(Sock &&other) override
Move assignment operator, grab the socket from another object and close ours (if set).
memcpy(result.begin(), stream.data(), stream.size())
static Wrapper< Formatter, T & > Using(T &&t)
Cause serialization/deserialization of an object to be done using a specified formatter class...
std::shared_ptr< Pipes > m_pipes
void WaitForDataOrEof(UniqueLock< Mutex > &lock) EXCLUSIVE_LOCKS_REQUIRED(m_mutex)
Return when there is some data to read or EOF has been signaled.
CSerializedNetMsg Make(std::string msg_type, Args &&... args)
static constexpr Event SEND
If passed to Wait(), then it will wait for readiness to send to the socket.
static std::atomic< SOCKET > g_mocked_sock_fd
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const override
Same as Wait(), but wait on many sockets within the same timeout.
constexpr const char * VERSION
The version message provides information about the transmitting node to the receiving node at the beg...
std::optional< CNetMessage > GetNetMsg() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Deserialize a CNetMessage and remove it from the pipe.
void Eof() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Signal end-of-file on the receiving end (GetBytes() or GetNetMsg()).
std::unordered_map< std::shared_ptr< const Sock >, Events, HashSharedPtrSock, EqualSharedPtrSock > EventsPerSock
On which socket to wait for what events in WaitMany().
int Listen(int) const override
listen(2) wrapper.
CNode * ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex)
constexpr const char * VERACK
The verack message acknowledges a previously-received version message, informing the connecting node ...
bool IsSelectable() const override
Check if the underlying socket can be used for select(2) (or the Wait() method).
A combination of a network address (CNetAddr) and a (TCP) port.
Transport protocol agnostic message container.
static constexpr SerParams V1
void NodeReceiveMsgBytes(CNode &node, Span< const uint8_t > msg_bytes, bool &complete) const
A CService with information about it as peer.
std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const override
accept(2) wrapper.
int SetSockOpt(int, int, const void *, socklen_t) const override
setsockopt(2) wrapper.
static const int PROTOCOL_VERSION
network protocol versioning
A mocked Sock alternative that succeeds on all operations.
ZeroSock & operator=(Sock &&other) override
Move assignment operator, grab the socket from another object and close ours (if set).
#define WAIT_LOCK(cs, name)
int Bind(const sockaddr *, socklen_t) const override
bind(2) wrapper.
bool ReceiveMsgFrom(CNode &node, CSerializedNetMsg &&ser_msg) const
StaticContentsSock(const std::string &contents)
const std::string m_contents
ssize_t GetBytes(void *buf, size_t len, int flags=0) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Get bytes and remove them from the pipe.
#define EXCLUSIVE_LOCKS_REQUIRED(...)
int GetSockOpt(int level, int opt_name, void *opt_val, socklen_t *opt_len) const override
getsockopt(2) wrapper.
static constexpr Event RECV
If passed to Wait(), then it will wait for readiness to read from the socket.
ssize_t Recv(void *buf, size_t len, int flags) const override
recv(2) wrapper.
DynSock(std::shared_ptr< Pipes > pipes, std::shared_ptr< Queue > accept_sockets)
Create a new mocked sock.
void PushBytes(const void *buf, size_t len) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Push bytes to the pipe.
I randrange(I range) noexcept
Generate a random integer in the range [0..range), with range > 0.
ssize_t Recv(void *buf, size_t len, int flags) const override
Return parts of the contents that was provided at construction until it is exhausted and then return ...
constexpr auto ALL_NETWORKS
ConnectionType
Different types of connections to a peer.
Wrapper around std::unique_lock style lock for MutexType.
RAII helper class that manages a socket and closes it automatically when it goes out of scope...
std::vector< NodeEvictionCandidate > GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext &random_context)
ssize_t Send(const void *, size_t len, int) const override
send(2) wrapper.
A Span is an object that can refer to a contiguous sequence of objects.
Information about a peer.
int GetSockName(sockaddr *name, socklen_t *name_len) const override
getsockname(2) wrapper.
int Connect(const sockaddr *, socklen_t) const override
connect(2) wrapper.
virtual void InitializeNode(const CNode &node, ServiceFlags our_services)=0
Initialize a peer (setup state)
Auxiliary requested/occurred events to wait for in WaitMany().
CNode * ConnectNodePublic(PeerManager &peerman, const char *pszDest, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex)
void AddTestNode(CNode &node)
#define Assert(val)
Identity function.
std::shared_ptr< Queue > m_accept_sockets
DynSock & operator=(Sock &&) override
Move assignment operator, grab the socket from another object and close ours (if set).
bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const override
Wait for readiness for input (recv) or output (send).