Bitcoin Core  26.1.0
P2P Digital Currency
zmqpublishnotifier.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2022 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
6 
7 #include <chain.h>
8 #include <chainparams.h>
9 #include <crypto/common.h>
10 #include <kernel/cs_main.h>
11 #include <logging.h>
12 #include <netaddress.h>
13 #include <netbase.h>
14 #include <node/blockstorage.h>
15 #include <primitives/block.h>
16 #include <primitives/transaction.h>
17 #include <rpc/server.h>
18 #include <serialize.h>
19 #include <streams.h>
20 #include <sync.h>
21 #include <uint256.h>
22 #include <version.h>
23 #include <zmq/zmqutil.h>
24 
25 #include <zmq.h>
26 
27 #include <cassert>
28 #include <cstdarg>
29 #include <cstddef>
30 #include <cstdint>
31 #include <cstring>
32 #include <map>
33 #include <optional>
34 #include <string>
35 #include <utility>
36 #include <vector>
37 
38 namespace Consensus {
39 struct Params;
40 }
41 
42 static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
43 
44 static const char *MSG_HASHBLOCK = "hashblock";
45 static const char *MSG_HASHTX = "hashtx";
46 static const char *MSG_RAWBLOCK = "rawblock";
47 static const char *MSG_RAWTX = "rawtx";
48 static const char *MSG_SEQUENCE = "sequence";
49 
50 // Internal function to send multipart message
51 static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
52 {
53  va_list args;
54  va_start(args, size);
55 
56  while (1)
57  {
58  zmq_msg_t msg;
59 
60  int rc = zmq_msg_init_size(&msg, size);
61  if (rc != 0)
62  {
63  zmqError("Unable to initialize ZMQ msg");
64  va_end(args);
65  return -1;
66  }
67 
68  void *buf = zmq_msg_data(&msg);
69  memcpy(buf, data, size);
70 
71  data = va_arg(args, const void*);
72 
73  rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
74  if (rc == -1)
75  {
76  zmqError("Unable to send ZMQ msg");
77  zmq_msg_close(&msg);
78  va_end(args);
79  return -1;
80  }
81 
82  zmq_msg_close(&msg);
83 
84  if (!data)
85  break;
86 
87  size = va_arg(args, size_t);
88  }
89  va_end(args);
90  return 0;
91 }
92 
93 static bool IsZMQAddressIPV6(const std::string &zmq_address)
94 {
95  const std::string tcp_prefix = "tcp://";
96  const size_t tcp_index = zmq_address.rfind(tcp_prefix);
97  const size_t colon_index = zmq_address.rfind(':');
98  if (tcp_index == 0 && colon_index != std::string::npos) {
99  const std::string ip = zmq_address.substr(tcp_prefix.length(), colon_index - tcp_prefix.length());
100  const std::optional<CNetAddr> addr{LookupHost(ip, false)};
101  if (addr.has_value() && addr.value().IsIPv6()) return true;
102  }
103  return false;
104 }
105 
107 {
108  assert(!psocket);
109 
110  // check if address is being used by other publish notifier
111  std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
112 
113  if (i==mapPublishNotifiers.end())
114  {
115  psocket = zmq_socket(pcontext, ZMQ_PUB);
116  if (!psocket)
117  {
118  zmqError("Failed to create socket");
119  return false;
120  }
121 
122  LogPrint(BCLog::ZMQ, "Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
123 
124  int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark));
125  if (rc != 0)
126  {
127  zmqError("Failed to set outbound message high water mark");
128  zmq_close(psocket);
129  return false;
130  }
131 
132  const int so_keepalive_option {1};
133  rc = zmq_setsockopt(psocket, ZMQ_TCP_KEEPALIVE, &so_keepalive_option, sizeof(so_keepalive_option));
134  if (rc != 0) {
135  zmqError("Failed to set SO_KEEPALIVE");
136  zmq_close(psocket);
137  return false;
138  }
139 
140  // On some systems (e.g. OpenBSD) the ZMQ_IPV6 must not be enabled, if the address to bind isn't IPv6
141  const int enable_ipv6 { IsZMQAddressIPV6(address) ? 1 : 0};
142  rc = zmq_setsockopt(psocket, ZMQ_IPV6, &enable_ipv6, sizeof(enable_ipv6));
143  if (rc != 0) {
144  zmqError("Failed to set ZMQ_IPV6");
145  zmq_close(psocket);
146  return false;
147  }
148 
149  rc = zmq_bind(psocket, address.c_str());
150  if (rc != 0)
151  {
152  zmqError("Failed to bind address");
153  zmq_close(psocket);
154  return false;
155  }
156 
157  // register this notifier for the address, so it can be reused for other publish notifier
158  mapPublishNotifiers.insert(std::make_pair(address, this));
159  return true;
160  }
161  else
162  {
163  LogPrint(BCLog::ZMQ, "Reusing socket for address %s\n", address);
164  LogPrint(BCLog::ZMQ, "Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
165 
166  psocket = i->second->psocket;
167  mapPublishNotifiers.insert(std::make_pair(address, this));
168 
169  return true;
170  }
171 }
172 
174 {
175  // Early return if Initialize was not called
176  if (!psocket) return;
177 
178  int count = mapPublishNotifiers.count(address);
179 
180  // remove this notifier from the list of publishers using this address
181  typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
182  std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
183 
184  for (iterator it = iterpair.first; it != iterpair.second; ++it)
185  {
186  if (it->second==this)
187  {
188  mapPublishNotifiers.erase(it);
189  break;
190  }
191  }
192 
193  if (count == 1)
194  {
195  LogPrint(BCLog::ZMQ, "Close socket at address %s\n", address);
196  int linger = 0;
197  zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
198  zmq_close(psocket);
199  }
200 
201  psocket = nullptr;
202 }
203 
204 bool CZMQAbstractPublishNotifier::SendZmqMessage(const char *command, const void* data, size_t size)
205 {
206  assert(psocket);
207 
208  /* send three parts, command & data & a LE 4byte sequence number */
209  unsigned char msgseq[sizeof(uint32_t)];
210  WriteLE32(msgseq, nSequence);
211  int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), nullptr);
212  if (rc == -1)
213  return false;
214 
215  /* increment memory only sequence number after sending */
216  nSequence++;
217 
218  return true;
219 }
220 
222 {
223  uint256 hash = pindex->GetBlockHash();
224  LogPrint(BCLog::ZMQ, "Publish hashblock %s to %s\n", hash.GetHex(), this->address);
225  uint8_t data[32];
226  for (unsigned int i = 0; i < 32; i++) {
227  data[31 - i] = hash.begin()[i];
228  }
229  return SendZmqMessage(MSG_HASHBLOCK, data, 32);
230 }
231 
233 {
234  uint256 hash = transaction.GetHash();
235  LogPrint(BCLog::ZMQ, "Publish hashtx %s to %s\n", hash.GetHex(), this->address);
236  uint8_t data[32];
237  for (unsigned int i = 0; i < 32; i++) {
238  data[31 - i] = hash.begin()[i];
239  }
240  return SendZmqMessage(MSG_HASHTX, data, 32);
241 }
242 
244 {
245  LogPrint(BCLog::ZMQ, "Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address);
246 
248  CBlock block;
249  if (!m_get_block_by_index(block, *pindex)) {
250  zmqError("Can't read block from disk");
251  return false;
252  }
253 
254  ss << block;
255 
256  return SendZmqMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
257 }
258 
260 {
261  uint256 hash = transaction.GetHash();
262  LogPrint(BCLog::ZMQ, "Publish rawtx %s to %s\n", hash.GetHex(), this->address);
264  ss << transaction;
265  return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
266 }
267 
268 // Helper function to send a 'sequence' topic message with the following structure:
269 // <32-byte hash> | <1-byte label> | <8-byte LE sequence> (optional)
270 static bool SendSequenceMsg(CZMQAbstractPublishNotifier& notifier, uint256 hash, char label, std::optional<uint64_t> sequence = {})
271 {
272  unsigned char data[sizeof(hash) + sizeof(label) + sizeof(uint64_t)];
273  for (unsigned int i = 0; i < sizeof(hash); ++i) {
274  data[sizeof(hash) - 1 - i] = hash.begin()[i];
275  }
276  data[sizeof(hash)] = label;
277  if (sequence) WriteLE64(data + sizeof(hash) + sizeof(label), *sequence);
278  return notifier.SendZmqMessage(MSG_SEQUENCE, data, sequence ? sizeof(data) : sizeof(hash) + sizeof(label));
279 }
280 
282 {
283  uint256 hash = pindex->GetBlockHash();
284  LogPrint(BCLog::ZMQ, "Publish sequence block connect %s to %s\n", hash.GetHex(), this->address);
285  return SendSequenceMsg(*this, hash, /* Block (C)onnect */ 'C');
286 }
287 
289 {
290  uint256 hash = pindex->GetBlockHash();
291  LogPrint(BCLog::ZMQ, "Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address);
292  return SendSequenceMsg(*this, hash, /* Block (D)isconnect */ 'D');
293 }
294 
295 bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
296 {
297  uint256 hash = transaction.GetHash();
298  LogPrint(BCLog::ZMQ, "Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address);
299  return SendSequenceMsg(*this, hash, /* Mempool (A)cceptance */ 'A', mempool_sequence);
300 }
301 
302 bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
303 {
304  uint256 hash = transaction.GetHash();
305  LogPrint(BCLog::ZMQ, "Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address);
306  return SendSequenceMsg(*this, hash, /* Mempool (R)emoval */ 'R', mempool_sequence);
307 }
static const char * MSG_HASHBLOCK
bool NotifyTransaction(const CTransaction &transaction) override
uint32_t nSequence
upcounting per message sequence number
#define LogPrint(category,...)
Definition: logging.h:246
assert(!tx.IsCoinBase())
bool NotifyBlockDisconnect(const CBlockIndex *pindex) override
bool SendZmqMessage(const char *command, const void *data, size_t size)
Definition: block.h:68
static std::multimap< std::string, CZMQAbstractPublishNotifier * > mapPublishNotifiers
static bool SendSequenceMsg(CZMQAbstractPublishNotifier &notifier, uint256 hash, char label, std::optional< uint64_t > sequence={})
static const char * MSG_HASHTX
bool NotifyTransaction(const CTransaction &transaction) override
Transaction validation functions.
constexpr unsigned char * begin()
Definition: uint256.h:68
bool NotifyBlock(const CBlockIndex *pindex) override
uint256 GetBlockHash() const
Definition: chain.h:253
const std::function< bool(CBlock &, const CBlockIndex &)> m_get_block_by_index
ArgsManager & args
Definition: bitcoind.cpp:269
std::vector< CNetAddr > LookupHost(const std::string &name, unsigned int nMaxSolutions, bool fAllowLookup, DNSLookupFn dns_lookup_function)
Resolve a host string to its corresponding network addresses.
Definition: netbase.cpp:158
const uint256 & GetHash() const
Definition: transaction.h:337
bool NotifyBlock(const CBlockIndex *pindex) override
static void WriteLE32(unsigned char *ptr, uint32_t x)
Definition: common.h:44
static int zmq_send_multipart(void *sock, const void *data, size_t size,...)
static CService ip(uint32_t i)
static bool IsZMQAddressIPV6(const std::string &zmq_address)
void zmqError(const std::string &str)
Definition: zmqutil.cpp:13
bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) override
256-bit opaque blob.
Definition: uint256.h:106
static const char * MSG_RAWTX
const auto command
static const char * MSG_RAWBLOCK
The block chain is a tree shaped structure starting with the genesis block at the root...
Definition: chain.h:144
const CChainParams & Params()
Return the currently selected parameters.
int RPCSerializationFlags()
Definition: server.cpp:598
static const int PROTOCOL_VERSION
network protocol versioning
Definition: version.h:12
bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) override
std::string GetHex() const
Definition: uint256.cpp:11
static int count
bool NotifyBlockConnect(const CBlockIndex *pindex) override
static void WriteLE64(unsigned char *ptr, uint64_t x)
Definition: common.h:50
uint64_t sequence
The basic transaction that is broadcasted on the network and contained in blocks. ...
Definition: transaction.h:294
static const char * MSG_SEQUENCE
bool Initialize(void *pcontext) override