23void ConnmanTestMsg::Handshake(
CNode&
node,
24 bool successfully_connected,
30 auto& peerman{
static_cast<PeerManager&
>(*m_msgproc)};
33 peerman.InitializeNode(node, local_services);
34 peerman.SendMessages(node);
37 CSerializedNetMsg msg_version{
40 Using<CustomUintFormatter<8>>(remote_services),
52 (void)connman.ReceiveMsgFrom(node, std::move(msg_version));
54 connman.ProcessMessagesOnce(node);
55 peerman.SendMessages(node);
60 CNodeStateStats statestats;
61 assert(peerman.GetNodeStateStats(node.
GetId(), statestats));
64 if (successfully_connected) {
66 (void)connman.ReceiveMsgFrom(node, std::move(msg_verack));
68 connman.ProcessMessagesOnce(node);
69 peerman.SendMessages(node);
79 nMaxOutboundCycleStartTime = 0s;
80 nMaxOutboundTotalBytesSentInCycle = 0;
93 assert(
node.ReceiveMsgBytes(msg_bytes, complete));
95 node.MarkReceivedMsgsForProcessing();
102 node.vSendMsg.clear();
103 node.m_send_memusage = 0;
105 const auto& [to_send, _more, _msg_type] =
node.m_transport->GetBytesToSend(
false);
106 if (to_send.empty())
break;
107 node.m_transport->MarkBytesSent(to_send.size());
113 bool queued =
node.m_transport->SetMessageToSend(ser_msg);
115 bool complete{
false};
117 const auto& [to_send, _more, _msg_type] =
node.m_transport->GetBytesToSend(
false);
118 if (to_send.empty())
break;
120 node.m_transport->MarkBytesSent(to_send.size());
128 if (!
node)
return nullptr;
131 node->fSuccessfullyConnected =
true;
138 std::vector<NodeEvictionCandidate> candidates;
139 candidates.reserve(n_candidates);
140 for (
int id = 0;
id < n_candidates; ++id) {
141 candidates.push_back({
143 .m_connected=std::chrono::seconds{random_context.
randrange(100)},
144 .m_min_ping_time=std::chrono::microseconds{random_context.
randrange(100)},
145 .m_last_block_time=std::chrono::seconds{random_context.
randrange(100)},
146 .m_last_tx_time=std::chrono::seconds{random_context.
randrange(100)},
147 .fRelevantServices=random_context.
randbool(),
148 .m_relay_txs=random_context.
randbool(),
149 .fBloomFilter=random_context.
randbool(),
150 .nKeyedNetGroup=random_context.
randrange(100u),
151 .prefer_evict=random_context.
randbool(),
152 .m_is_local=random_context.
randbool(),
175 memset(buf, 0x0, len);
187 if (addr !=
nullptr) {
189 memset(addr, 0x00, *addr_len);
190 const socklen_t write_len =
static_cast<socklen_t
>(
sizeof(sockaddr_in));
191 if (*addr_len >= write_len) {
192 *addr_len = write_len;
193 sockaddr_in* addr_in =
reinterpret_cast<sockaddr_in*
>(addr);
194 addr_in->sin_family = AF_INET;
195 memset(&addr_in->sin_addr, 0x05,
sizeof(addr_in->sin_addr));
196 addr_in->sin_port = htons(6789);
199 return std::make_unique<ZeroSock>();
204 std::memset(opt_val, 0x0, *opt_len);
212 std::memset(
name, 0x0, *name_len);
222 if (occurred !=
nullptr) {
223 *occurred = requested;
230 for (
auto& [sock, events] : events_per_sock) {
232 events.occurred = events.requested;
239 assert(
false &&
"Move of Sock into ZeroSock not allowed.");
252 if ((
flags & MSG_PEEK) == 0) {
255 return consume_bytes;
260 assert(
false &&
"Move of Sock into StaticContentsSock not allowed.");
268 if (m_data.empty()) {
276 const size_t read_bytes{std::min(len, m_data.size())};
278 std::memcpy(buf, m_data.data(), read_bytes);
279 if ((
flags & MSG_PEEK) == 0) {
280 m_data.erase(m_data.begin(), m_data.begin() + read_bytes);
294 if (m_eof && m_data.empty()) {
299 std::span<const uint8_t> s{m_data};
303 m_data.erase(m_data.begin(), m_data.begin() + m_data.size() - s.size());
307 if (m_data.empty()) {
309 if (m_eof && m_data.empty()) {
321 return std::make_optional<CNetMessage>(std::move(msg));
327 const uint8_t* b =
static_cast<const uint8_t*
>(buf);
328 m_data.insert(m_data.end(), b, b + len);
345 return !m_data.empty() || m_eof;
366 m_pipes->send.PushBytes(buf, len);
378 Event* occurred)
const
381 ev.emplace(
this,
Events{requested});
383 if (occurred !=
nullptr) {
384 *occurred = ev.begin()->second.occurred;
391 const auto deadline = std::chrono::steady_clock::now() + timeout;
392 bool at_least_one_event_occurred{
false};
396 for (
auto& [sock, events] : events_per_sock) {
400 at_least_one_event_occurred =
true;
404 auto dyn_sock =
reinterpret_cast<const DynSock*
>(sock.get());
406 if (dyn_sock->m_pipes->recv.GetBytes(&b, 1, MSG_PEEK) == 1 || !dyn_sock->m_accept_sockets->Empty()) {
408 at_least_one_event_occurred =
true;
413 if (at_least_one_event_occurred || std::chrono::steady_clock::now() > deadline) {
417 std::this_thread::sleep_for(10ms);
425 assert(
false &&
"Move of Sock into DynSock not allowed.");
#define Assert(val)
Identity function.
A CService with information about it as peer.
class CConnman::PrivateBroadcast m_private_broadcast
CNode * ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport, const std::optional< Proxy > &proxy_override) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex)
Open a new P2P connection.
Mutex m_total_bytes_sent_mutex
std::map< uint64_t, CachedAddrResponse > m_addr_response_caches
Addr responses stored in different caches per (network, local socket) prevent cross-network node iden...
static constexpr SerParams V1
Transport protocol agnostic message container.
Information about a peer.
std::atomic< int > nVersion
std::atomic_bool fSuccessfullyConnected
fSuccessfullyConnected is set to true on receiving VERACK from the peer.
bool IsBlockOnlyConn() const
int GetCommonVersion() const
std::atomic_bool fPauseSend
std::atomic_bool fDisconnect
ssize_t GetBytes(void *buf, size_t len, int flags=0) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Get bytes and remove them from the pipe.
void WaitForDataOrEof(UniqueLock< Mutex > &lock) EXCLUSIVE_LOCKS_REQUIRED(m_mutex)
Return when there is some data to read or EOF has been signaled.
void Eof() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Signal end-of-file on the receiving end (GetBytes() or GetNetMsg()).
std::condition_variable m_cond
std::optional< CNetMessage > GetNetMsg() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Deserialize a CNetMessage and remove it from the pipe.
void PushBytes(const void *buf, size_t len) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Push bytes to the pipe.
DynSock & operator=(Sock &&) override
Move assignment operator, grab the socket from another object and close ours (if set).
ssize_t Recv(void *buf, size_t len, int flags) const override
recv(2) wrapper.
std::shared_ptr< Pipes > m_pipes
std::shared_ptr< Queue > m_accept_sockets
bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const override
Wait for readiness for input (recv) or output (send).
std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const override
accept(2) wrapper.
ssize_t Send(const void *buf, size_t len, int) const override
send(2) wrapper.
DynSock(std::shared_ptr< Pipes > pipes, std::shared_ptr< Queue > accept_sockets)
Create a new mocked sock.
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const override
Same as Wait(), but wait on many sockets within the same timeout.
virtual void InitializeNode(const CNode &node, ServiceFlags our_services)=0
Initialize a peer (setup state).
I randrange(I range) noexcept
Generate a random integer in the range [0..range), with range > 0.
bool randbool() noexcept
Generate a random boolean.
static constexpr Event SEND
If passed to Wait(), then it will wait for readiness to send to the socket.
SOCKET m_socket
Contained socket.
static constexpr Event RECV
If passed to Wait(), then it will wait for readiness to read from the socket.
std::unordered_map< std::shared_ptr< const Sock >, Events, HashSharedPtrSock, EqualSharedPtrSock > EventsPerSock
On which socket to wait for what events in WaitMany().
const std::string m_contents
StaticContentsSock & operator=(Sock &&other) override
Move assignment operator, grab the socket from another object and close ours (if set).
StaticContentsSock(const std::string &contents)
ssize_t Recv(void *buf, size_t len, int flags) const override
Return parts of the contents that was provided at construction until it is exhausted and then return ...
Wrapper around std::unique_lock style lock for MutexType.
bool ReceivedBytes(std::span< const uint8_t > &msg_bytes) override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex)
Feed wire bytes to the transport.
bool ReceivedMessageComplete() const override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex)
Returns true if the current message is complete (so GetReceivedMessage can be called).
CNetMessage GetReceivedMessage(std::chrono::microseconds time, bool &reject_message) override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex)
Retrieve a completed message from transport.
int GetSockOpt(int level, int opt_name, void *opt_val, socklen_t *opt_len) const override
getsockopt(2) wrapper.
bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const override
Wait for readiness for input (recv) or output (send).
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const override
Same as Wait(), but wait on many sockets within the same timeout.
int GetSockName(sockaddr *name, socklen_t *name_len) const override
getsockname(2) wrapper.
int Listen(int) const override
listen(2) wrapper.
int Bind(const sockaddr *, socklen_t) const override
bind(2) wrapper.
int Connect(const sockaddr *, socklen_t) const override
connect(2) wrapper.
ssize_t Send(const void *, size_t len, int) const override
send(2) wrapper.
std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const override
accept(2) wrapper.
bool SetNonBlocking() const override
Set the non-blocking option on the socket.
ssize_t Recv(void *buf, size_t len, int flags) const override
recv(2) wrapper.
ZeroSock & operator=(Sock &&other) override
Move assignment operator, grab the socket from another object and close ours (if set).
int SetSockOpt(int, int, const void *, socklen_t) const override
setsockopt(2) wrapper.
bool IsSelectable() const override
Check if the underlying socket can be used for select(2) (or the Wait() method).
ConnectionType
Different types of connections to a peer.
@ INBOUND
Inbound connections are those initiated by a peer.
CSerializedNetMsg Make(std::string msg_type, Args &&... args)
constexpr const char * VERACK
The verack message acknowledges a previously-received version message, informing the connecting node ...
constexpr const char * VERSION
The version message provides information about the transmitting node to the receiving node at the beg...
ServiceFlags
nServices flags
static const int PROTOCOL_VERSION
network protocol versioning
static Wrapper< Formatter, T & > Using(T &&t)
Cause serialization/deserialization of an object to be done using a specified formatter class.
ServiceFlags their_services
void NodeReceiveMsgBytes(CNode &node, std::span< const uint8_t > msg_bytes, bool &complete) const
bool ReceiveMsgFrom(CNode &node, CSerializedNetMsg &&ser_msg) const
CNode * ConnectNodePublic(PeerManager &peerman, const char *pszDest, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex)
void AddTestNode(CNode &node)
void Reset()
Reset the internal state.
void FlushSendBuffer(CNode &node) const
void ResetMaxOutboundCycle()
Auxiliary requested/occurred events to wait for in WaitMany().
#define WAIT_LOCK(cs, name)
#define AssertLockHeld(cs)
static std::atomic< SOCKET > g_mocked_sock_fd
std::vector< NodeEvictionCandidate > GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext &random_context)
constexpr auto ALL_NETWORKS
#define EXCLUSIVE_LOCKS_REQUIRED(...)