23 void ConnmanTestMsg::Handshake(
CNode&
node,
24 bool successfully_connected,
30 auto& peerman{
static_cast<PeerManager&
>(*m_msgproc)};
34 peerman.SendMessages(
node);
52 (void)connman.ReceiveMsgFrom(
node, std::move(msg_version));
53 node.fPauseSend =
false;
54 connman.ProcessMessagesOnce(
node);
55 peerman.SendMessages(
node);
57 if (
node.fDisconnect)
return;
61 assert(peerman.GetNodeStateStats(
node.GetId(), statestats));
62 assert(statestats.m_relay_txs == (relay_txs && !
node.IsBlockOnlyConn()));
63 assert(statestats.their_services == remote_services);
64 if (successfully_connected) {
66 (void)connman.ReceiveMsgFrom(
node, std::move(msg_verack));
67 node.fPauseSend =
false;
68 connman.ProcessMessagesOnce(
node);
69 peerman.SendMessages(
node);
79 nMaxOutboundCycleStartTime = 0
s;
80 nMaxOutboundTotalBytesSentInCycle = 0;
93 assert(
node.ReceiveMsgBytes(msg_bytes, complete));
95 node.MarkReceivedMsgsForProcessing();
102 node.vSendMsg.clear();
103 node.m_send_memusage = 0;
105 const auto& [to_send, _more, _msg_type] =
node.m_transport->GetBytesToSend(
false);
106 if (to_send.empty())
break;
107 node.m_transport->MarkBytesSent(to_send.size());
113 bool queued =
node.m_transport->SetMessageToSend(ser_msg);
115 bool complete{
false};
117 const auto& [to_send, _more, _msg_type] =
node.m_transport->GetBytesToSend(
false);
118 if (to_send.empty())
break;
120 node.m_transport->MarkBytesSent(to_send.size());
128 if (!
node)
return nullptr;
131 node->fSuccessfullyConnected =
true;
138 std::vector<NodeEvictionCandidate> candidates;
139 candidates.reserve(n_candidates);
140 for (
int id = 0;
id < n_candidates; ++id) {
141 candidates.push_back({
143 .m_connected=std::chrono::seconds{random_context.
randrange(100)},
144 .m_min_ping_time=std::chrono::microseconds{random_context.
randrange(100)},
145 .m_last_block_time=std::chrono::seconds{random_context.
randrange(100)},
146 .m_last_tx_time=std::chrono::seconds{random_context.
randrange(100)},
147 .fRelevantServices=random_context.
randbool(),
148 .m_relay_txs=random_context.
randbool(),
149 .fBloomFilter=random_context.
randbool(),
150 .nKeyedNetGroup=random_context.
randrange(100u),
151 .prefer_evict=random_context.
randbool(),
152 .m_is_local=random_context.
randbool(),
175 memset(buf, 0x0, len);
187 if (addr !=
nullptr) {
189 memset(addr, 0x00, *addr_len);
190 const socklen_t write_len =
static_cast<socklen_t
>(
sizeof(sockaddr_in));
191 if (*addr_len >= write_len) {
192 *addr_len = write_len;
193 sockaddr_in* addr_in =
reinterpret_cast<sockaddr_in*
>(addr);
194 addr_in->sin_family = AF_INET;
195 memset(&addr_in->sin_addr, 0x05,
sizeof(addr_in->sin_addr));
196 addr_in->sin_port = htons(6789);
199 return std::make_unique<ZeroSock>();
204 std::memset(opt_val, 0x0, *opt_len);
212 std::memset(
name, 0x0, *name_len);
222 if (occurred !=
nullptr) {
223 *occurred = requested;
230 for (
auto& [sock, events] : events_per_sock) {
232 events.occurred = events.requested;
239 assert(
false &&
"Move of Sock into ZeroSock not allowed.");
244 : m_contents{contents}
252 if ((
flags & MSG_PEEK) == 0) {
255 return consume_bytes;
260 assert(
false &&
"Move of Sock into StaticContentsSock not allowed.");
268 if (m_data.empty()) {
276 const size_t read_bytes{std::min(len, m_data.size())};
279 if ((
flags & MSG_PEEK) == 0) {
280 m_data.erase(m_data.begin(), m_data.begin() + read_bytes);
293 WaitForDataOrEof(lock);
294 if (m_eof && m_data.empty()) {
299 std::span<const uint8_t>
s{m_data};
300 if (!transport.ReceivedBytes(
s)) {
303 m_data.erase(m_data.begin(), m_data.begin() + m_data.size() -
s.size());
304 if (transport.ReceivedMessageComplete()) {
307 if (m_data.empty()) {
308 WaitForDataOrEof(lock);
309 if (m_eof && m_data.empty()) {
321 return std::make_optional<CNetMessage>(std::move(
msg));
327 const uint8_t* b =
static_cast<const uint8_t*
>(buf);
328 m_data.insert(m_data.end(), b, b + len);
341 Assert(lock.mutex() == &m_mutex);
345 return !m_data.empty() || m_eof;
350 :
m_pipes{pipes}, m_accept_sockets{accept_sockets}
366 m_pipes->send.PushBytes(buf, len);
378 Event* occurred)
const 381 ev.emplace(
this,
Events{requested});
383 if (occurred !=
nullptr) {
384 *occurred = ev.begin()->second.occurred;
391 const auto deadline = std::chrono::steady_clock::now() + timeout;
392 bool at_least_one_event_occurred{
false};
396 for (
auto& [sock, events] : events_per_sock) {
400 at_least_one_event_occurred =
true;
404 auto dyn_sock =
reinterpret_cast<const DynSock*
>(sock.get());
406 if (dyn_sock->m_pipes->recv.GetBytes(&b, 1, MSG_PEEK) == 1 || !dyn_sock->m_accept_sockets->Empty()) {
408 at_least_one_event_occurred =
true;
413 if (at_least_one_event_occurred || std::chrono::steady_clock::now() > deadline) {
417 std::this_thread::sleep_for(10ms);
425 assert(
false &&
"Move of Sock into DynSock not allowed.");
bool SetNonBlocking() const override
Set the non-blocking option on the socket.
bool randbool() noexcept
Generate a random boolean.
A mocked Sock alternative that returns a statically contained data upon read and succeeds and ignores...
ServiceFlags
nServices flags
Inbound connections are those initiated by a peer.
Mutex m_total_bytes_sent_mutex
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const override
Same as Wait(), but wait on many sockets within the same timeout.
ssize_t Recv(void *buf, size_t len, int flags) const override
recv(2) wrapper.
ssize_t Send(const void *buf, size_t len, int) const override
send(2) wrapper.
A mocked Sock alternative that allows providing the data to be returned by Recv() and inspecting the ...
bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const override
Wait for readiness for input (recv) or output (send).
std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const override
accept(2) wrapper.
void FlushSendBuffer(CNode &node) const
SOCKET m_socket
Contained socket.
StaticContentsSock & operator=(Sock &&other) override
Move assignment operator, grab the socket from another object and close ours (if set).
memcpy(result.begin(), stream.data(), stream.size())
static Wrapper< Formatter, T & > Using(T &&t)
Cause serialization/deserialization of an object to be done using a specified formatter class...
std::shared_ptr< Pipes > m_pipes
void WaitForDataOrEof(UniqueLock< Mutex > &lock) EXCLUSIVE_LOCKS_REQUIRED(m_mutex)
Return when there is some data to read or EOF has been signaled.
std::map< uint64_t, CachedAddrResponse > m_addr_response_caches
Addr responses stored in different caches per (network, local socket) prevent cross-network node iden...
CSerializedNetMsg Make(std::string msg_type, Args &&... args)
static constexpr Event SEND
If passed to Wait(), then it will wait for readiness to send to the socket.
static std::atomic< SOCKET > g_mocked_sock_fd
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const override
Same as Wait(), but wait on many sockets within the same timeout.
void ResetMaxOutboundCycle()
constexpr const char * VERSION
The version message provides information about the transmitting node to the receiving node at the beg...
std::optional< CNetMessage > GetNetMsg() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Deserialize a CNetMessage and remove it from the pipe.
void Eof() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Signal end-of-file on the receiving end (GetBytes() or GetNetMsg()).
std::unordered_map< std::shared_ptr< const Sock >, Events, HashSharedPtrSock, EqualSharedPtrSock > EventsPerSock
On which socket to wait for what events in WaitMany().
int Listen(int) const override
listen(2) wrapper.
constexpr const char * VERACK
The verack message acknowledges a previously-received version message, informing the connecting node ...
bool IsSelectable() const override
Check if the underlying socket can be used for select(2) (or the Wait() method).
A combination of a network address (CNetAddr) and a (TCP) port.
Transport protocol agnostic message container.
static constexpr SerParams V1
A CService with information about it as peer.
std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const override
accept(2) wrapper.
int SetSockOpt(int, int, const void *, socklen_t) const override
setsockopt(2) wrapper.
static const int PROTOCOL_VERSION
network protocol versioning
A mocked Sock alternative that succeeds on all operations.
ZeroSock & operator=(Sock &&other) override
Move assignment operator, grab the socket from another object and close ours (if set).
#define WAIT_LOCK(cs, name)
int Bind(const sockaddr *, socklen_t) const override
bind(2) wrapper.
CNode * ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport, const std::optional< Proxy > &proxy_override) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex)
Open a new P2P connection.
bool ReceiveMsgFrom(CNode &node, CSerializedNetMsg &&ser_msg) const
StaticContentsSock(const std::string &contents)
const std::string m_contents
ssize_t GetBytes(void *buf, size_t len, int flags=0) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Get bytes and remove them from the pipe.
#define EXCLUSIVE_LOCKS_REQUIRED(...)
int GetSockOpt(int level, int opt_name, void *opt_val, socklen_t *opt_len) const override
getsockopt(2) wrapper.
static constexpr Event RECV
If passed to Wait(), then it will wait for readiness to read from the socket.
ssize_t Recv(void *buf, size_t len, int flags) const override
recv(2) wrapper.
DynSock(std::shared_ptr< Pipes > pipes, std::shared_ptr< Queue > accept_sockets)
Create a new mocked sock.
class CConnman::PrivateBroadcast m_private_broadcast
std::atomic_bool m_outbound_tor_ok_at_least_once
Remember if we ever established at least one outbound connection to a Tor peer, including sending and...
void PushBytes(const void *buf, size_t len) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Push bytes to the pipe.
void NodeReceiveMsgBytes(CNode &node, std::span< const uint8_t > msg_bytes, bool &complete) const
I randrange(I range) noexcept
Generate a random integer in the range [0..range), with range > 0.
ssize_t Recv(void *buf, size_t len, int flags) const override
Return parts of the contents that was provided at construction until it is exhausted and then return ...
constexpr auto ALL_NETWORKS
ConnectionType
Different types of connections to a peer.
Wrapper around std::unique_lock style lock for MutexType.
RAII helper class that manages a socket and closes it automatically when it goes out of scope...
std::vector< NodeEvictionCandidate > GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext &random_context)
ssize_t Send(const void *, size_t len, int) const override
send(2) wrapper.
std::atomic_size_t m_num_to_open
Number of ConnectionType::PRIVATE_BROADCAST connections to open.
Information about a peer.
int GetSockName(sockaddr *name, socklen_t *name_len) const override
getsockname(2) wrapper.
int Connect(const sockaddr *, socklen_t) const override
connect(2) wrapper.
virtual void InitializeNode(const CNode &node, ServiceFlags our_services)=0
Initialize a peer (setup state)
Auxiliary requested/occurred events to wait for in WaitMany().
CNode * ConnectNodePublic(PeerManager &peerman, const char *pszDest, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex)
void AddTestNode(CNode &node)
void Reset()
Reset the internal state.
#define Assert(val)
Identity function.
std::shared_ptr< Queue > m_accept_sockets
DynSock & operator=(Sock &&) override
Move assignment operator, grab the socket from another object and close ours (if set).
bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const override
Wait for readiness for input (recv) or output (send).