Bitcoin Core  31.0.0
P2P Digital Currency
zmqnotificationinterface.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-present The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
6 
7 #include <common/args.h>
8 #include <kernel/mempool_entry.h>
9 #include <kernel/types.h>
10 #include <logging.h>
11 #include <netbase.h>
12 #include <primitives/block.h>
13 #include <primitives/transaction.h>
14 #include <util/check.h>
16 #include <zmq/zmqpublishnotifier.h>
17 #include <zmq/zmqutil.h>
18 
19 #include <zmq.h>
20 
21 #include <map>
22 #include <string>
23 #include <utility>
24 #include <vector>
25 
27 
29 
31 {
32  Shutdown();
33 }
34 
35 std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const
36 {
37  std::list<const CZMQAbstractNotifier*> result;
38  for (const auto& n : notifiers) {
39  result.push_back(n.get());
40  }
41  return result;
42 }
43 
44 std::unique_ptr<CZMQNotificationInterface> CZMQNotificationInterface::Create(std::function<bool(std::vector<std::byte>&, const CBlockIndex&)> get_block_by_index)
45 {
46  std::map<std::string, CZMQNotifierFactory> factories;
47  factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
48  factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
49  factories["pubrawblock"] = [&get_block_by_index]() -> std::unique_ptr<CZMQAbstractNotifier> {
50  return std::make_unique<CZMQPublishRawBlockNotifier>(get_block_by_index);
51  };
52  factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
53  factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
54 
55  std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
56  for (const auto& entry : factories)
57  {
58  std::string arg("-zmq" + entry.first);
59  const auto& factory = entry.second;
60  for (std::string& address : gArgs.GetArgs(arg)) {
61  // libzmq uses prefix "ipc://" for UNIX domain sockets
62  if (address.starts_with(ADDR_PREFIX_UNIX)) {
63  address.replace(0, ADDR_PREFIX_UNIX.length(), ADDR_PREFIX_IPC);
64  }
65 
66  std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
67  notifier->SetType(entry.first);
68  notifier->SetAddress(address);
69  notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetIntArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
70  notifiers.push_back(std::move(notifier));
71  }
72  }
73 
74  if (!notifiers.empty())
75  {
76  std::unique_ptr<CZMQNotificationInterface> notificationInterface(new CZMQNotificationInterface());
77  notificationInterface->notifiers = std::move(notifiers);
78 
79  if (notificationInterface->Initialize()) {
80  return notificationInterface;
81  }
82  }
83 
84  return nullptr;
85 }
86 
87 // Called at startup to conditionally set up ZMQ socket(s)
89 {
90  int major = 0, minor = 0, patch = 0;
91  zmq_version(&major, &minor, &patch);
92  LogDebug(BCLog::ZMQ, "version %d.%d.%d\n", major, minor, patch);
93 
94  LogDebug(BCLog::ZMQ, "Initialize notification interface\n");
95  assert(!pcontext);
96 
97  pcontext = zmq_ctx_new();
98 
99  if (!pcontext)
100  {
101  zmqError("Unable to initialize context");
102  return false;
103  }
104 
105  for (auto& notifier : notifiers) {
106  if (notifier->Initialize(pcontext)) {
107  LogDebug(BCLog::ZMQ, "Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
108  } else {
109  LogDebug(BCLog::ZMQ, "Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
110  return false;
111  }
112  }
113 
114  return true;
115 }
116 
117 // Called during shutdown sequence
119 {
120  LogDebug(BCLog::ZMQ, "Shutdown notification interface\n");
121  if (pcontext)
122  {
123  for (auto& notifier : notifiers) {
124  LogDebug(BCLog::ZMQ, "Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
125  notifier->Shutdown();
126  }
127  zmq_ctx_term(pcontext);
128 
129  pcontext = nullptr;
130  }
131 }
132 
133 namespace {
134 
135 template <typename Function>
136 void TryForEachAndRemoveFailed(std::list<std::unique_ptr<CZMQAbstractNotifier>>& notifiers, const Function& func)
137 {
138  for (auto i = notifiers.begin(); i != notifiers.end(); ) {
139  CZMQAbstractNotifier* notifier = i->get();
140  if (func(notifier)) {
141  ++i;
142  } else {
143  notifier->Shutdown();
144  i = notifiers.erase(i);
145  }
146  }
147 }
148 
149 } // anonymous namespace
150 
151 void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
152 {
153  if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
154  return;
155 
156  TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier* notifier) {
157  return notifier->NotifyBlock(pindexNew);
158  });
159 }
160 
162 {
163  const CTransaction& tx = *(ptx.info.m_tx);
164 
165  TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
166  return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
167  });
168 }
169 
171 {
172  // Called for all non-block inclusion reasons
173  const CTransaction& tx = *ptx;
174 
175  TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
176  return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
177  });
178 }
179 
180 void CZMQNotificationInterface::BlockConnected(const ChainstateRole& role, const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
181 {
182  if (role.historical) {
183  return;
184  }
185  for (const CTransactionRef& ptx : pblock->vtx) {
186  const CTransaction& tx = *ptx;
187  TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
188  return notifier->NotifyTransaction(tx);
189  });
190  }
191 
192  // Next we notify BlockConnect listeners for *all* blocks
193  TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) {
194  return notifier->NotifyBlockConnect(pindexConnected);
195  });
196 }
197 
198 void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
199 {
200  for (const CTransactionRef& ptx : pblock->vtx) {
201  const CTransaction& tx = *ptx;
202  TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
203  return notifier->NotifyTransaction(tx);
204  });
205  }
206 
207  // Next we notify BlockDisconnect listeners for *all* blocks
208  TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) {
209  return notifier->NotifyBlockDisconnect(pindexDisconnected);
210  });
211 }
212 
213 std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;
std::shared_ptr< const CTransaction > CTransactionRef
Definition: transaction.h:403
is a home for simple enum and struct type definitions that can be used internally by functions in the...
void BlockDisconnected(const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexDisconnected) override
Notifies listeners of a block being disconnected Provides the block that was disconnected.
bool historical
Whether this is a historical chainstate downloading old blocks to validate an assumeutxo snapshot...
Definition: types.h:26
const std::string ADDR_PREFIX_UNIX
Prefix for unix domain socket addresses (which are local filesystem paths)
Definition: netbase.h:31
assert(!tx.IsCoinBase())
std::unique_ptr< CZMQNotificationInterface > g_zmq_notification_interface
void BlockConnected(const kernel::ChainstateRole &role, const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexConnected) override
Notifies listeners of a block being connected.
virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex)
virtual bool NotifyBlock(const CBlockIndex *pindex)
MemPoolRemovalReason
Reason why a transaction was removed from the mempool, this is passed to the notification signal...
Information about chainstate that notifications are sent from.
Definition: types.h:18
std::list< std::unique_ptr< CZMQAbstractNotifier > > notifiers
virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
void TransactionRemovedFromMempool(const CTransactionRef &tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override
Notifies listeners of a transaction leaving mempool.
virtual bool NotifyBlockConnect(const CBlockIndex *pindex)
virtual bool NotifyTransaction(const CTransaction &transaction)
static const int DEFAULT_ZMQ_SNDHWM
void zmqError(const std::string &str)
Definition: zmqutil.cpp:15
ArgsManager gArgs
Definition: args.cpp:40
virtual void Shutdown()=0
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override
Notifies listeners when the block chain tip advances.
#define LogDebug(category,...)
Definition: log.h:115
const std::string ADDR_PREFIX_IPC
Prefix for unix domain socket addresses (which are local filesystem paths)
Definition: zmqutil.h:13
auto result
Definition: common-types.h:74
void TransactionAddedToMempool(const NewMempoolTransactionInfo &tx, uint64_t mempool_sequence) override
Notifies listeners of a transaction having been added to mempool.
The block chain is a tree shaped structure starting with the genesis block at the root...
Definition: chain.h:93
int64_t GetIntArg(const std::string &strArg, int64_t nDefault) const
Definition: args.h:306
virtual bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
const CTransactionRef m_tx
std::list< const CZMQAbstractNotifier * > GetActiveNotifiers() const
The basic transaction that is broadcasted on the network and contained in blocks. ...
Definition: transaction.h:280
std::vector< std::string > GetArgs(const std::string &strArg) const
Return a vector of strings of the given argument.
Definition: args.cpp:366
static std::unique_ptr< CZMQNotificationInterface > Create(std::function< bool(std::vector< std::byte > &, const CBlockIndex &)> get_block_by_index)