Bitcoin Core  26.1.0
P2P Digital Currency
zmqnotificationinterface.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2022 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
6 
7 #include <common/args.h>
8 #include <kernel/chain.h>
9 #include <logging.h>
10 #include <primitives/block.h>
11 #include <primitives/transaction.h>
12 #include <validationinterface.h>
14 #include <zmq/zmqpublishnotifier.h>
15 #include <zmq/zmqutil.h>
16 
17 #include <zmq.h>
18 
19 #include <cassert>
20 #include <map>
21 #include <string>
22 #include <utility>
23 #include <vector>
24 
26 {
27 }
28 
30 {
31  Shutdown();
32 }
33 
34 std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const
35 {
36  std::list<const CZMQAbstractNotifier*> result;
37  for (const auto& n : notifiers) {
38  result.push_back(n.get());
39  }
40  return result;
41 }
42 
43 std::unique_ptr<CZMQNotificationInterface> CZMQNotificationInterface::Create(std::function<bool(CBlock&, const CBlockIndex&)> get_block_by_index)
44 {
45  std::map<std::string, CZMQNotifierFactory> factories;
46  factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
47  factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
48  factories["pubrawblock"] = [&get_block_by_index]() -> std::unique_ptr<CZMQAbstractNotifier> {
49  return std::make_unique<CZMQPublishRawBlockNotifier>(get_block_by_index);
50  };
51  factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
52  factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
53 
54  std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
55  for (const auto& entry : factories)
56  {
57  std::string arg("-zmq" + entry.first);
58  const auto& factory = entry.second;
59  for (const std::string& address : gArgs.GetArgs(arg)) {
60  std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
61  notifier->SetType(entry.first);
62  notifier->SetAddress(address);
63  notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetIntArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
64  notifiers.push_back(std::move(notifier));
65  }
66  }
67 
68  if (!notifiers.empty())
69  {
70  std::unique_ptr<CZMQNotificationInterface> notificationInterface(new CZMQNotificationInterface());
71  notificationInterface->notifiers = std::move(notifiers);
72 
73  if (notificationInterface->Initialize()) {
74  return notificationInterface;
75  }
76  }
77 
78  return nullptr;
79 }
80 
81 // Called at startup to conditionally set up ZMQ socket(s)
83 {
84  int major = 0, minor = 0, patch = 0;
85  zmq_version(&major, &minor, &patch);
86  LogPrint(BCLog::ZMQ, "version %d.%d.%d\n", major, minor, patch);
87 
88  LogPrint(BCLog::ZMQ, "Initialize notification interface\n");
89  assert(!pcontext);
90 
91  pcontext = zmq_ctx_new();
92 
93  if (!pcontext)
94  {
95  zmqError("Unable to initialize context");
96  return false;
97  }
98 
99  for (auto& notifier : notifiers) {
100  if (notifier->Initialize(pcontext)) {
101  LogPrint(BCLog::ZMQ, "Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
102  } else {
103  LogPrint(BCLog::ZMQ, "Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
104  return false;
105  }
106  }
107 
108  return true;
109 }
110 
111 // Called during shutdown sequence
113 {
114  LogPrint(BCLog::ZMQ, "Shutdown notification interface\n");
115  if (pcontext)
116  {
117  for (auto& notifier : notifiers) {
118  LogPrint(BCLog::ZMQ, "Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
119  notifier->Shutdown();
120  }
121  zmq_ctx_term(pcontext);
122 
123  pcontext = nullptr;
124  }
125 }
126 
127 namespace {
128 
129 template <typename Function>
130 void TryForEachAndRemoveFailed(std::list<std::unique_ptr<CZMQAbstractNotifier>>& notifiers, const Function& func)
131 {
132  for (auto i = notifiers.begin(); i != notifiers.end(); ) {
133  CZMQAbstractNotifier* notifier = i->get();
134  if (func(notifier)) {
135  ++i;
136  } else {
137  notifier->Shutdown();
138  i = notifiers.erase(i);
139  }
140  }
141 }
142 
143 } // anonymous namespace
144 
145 void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
146 {
147  if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
148  return;
149 
150  TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier* notifier) {
151  return notifier->NotifyBlock(pindexNew);
152  });
153 }
154 
156 {
157  const CTransaction& tx = *ptx;
158 
159  TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
160  return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
161  });
162 }
163 
165 {
166  // Called for all non-block inclusion reasons
167  const CTransaction& tx = *ptx;
168 
169  TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
170  return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
171  });
172 }
173 
174 void CZMQNotificationInterface::BlockConnected(ChainstateRole role, const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
175 {
176  if (role == ChainstateRole::BACKGROUND) {
177  return;
178  }
179  for (const CTransactionRef& ptx : pblock->vtx) {
180  const CTransaction& tx = *ptx;
181  TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
182  return notifier->NotifyTransaction(tx);
183  });
184  }
185 
186  // Next we notify BlockConnect listeners for *all* blocks
187  TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) {
188  return notifier->NotifyBlockConnect(pindexConnected);
189  });
190 }
191 
192 void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
193 {
194  for (const CTransactionRef& ptx : pblock->vtx) {
195  const CTransaction& tx = *ptx;
196  TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
197  return notifier->NotifyTransaction(tx);
198  });
199  }
200 
201  // Next we notify BlockDisconnect listeners for *all* blocks
202  TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) {
203  return notifier->NotifyBlockDisconnect(pindexDisconnected);
204  });
205 }
206 
207 std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;
std::shared_ptr< const CTransaction > CTransactionRef
Definition: transaction.h:421
void BlockDisconnected(const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexDisconnected) override
Notifies listeners of a block being disconnected.
#define LogPrint(category,...)
Definition: logging.h:246
assert(!tx.IsCoinBase())
std::unique_ptr< CZMQNotificationInterface > g_zmq_notification_interface
Definition: block.h:68
void TransactionAddedToMempool(const CTransactionRef &tx, uint64_t mempool_sequence) override
Notifies listeners of a transaction having been added to mempool.
virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex)
virtual bool NotifyBlock(const CBlockIndex *pindex)
MemPoolRemovalReason
Reason why a transaction was removed from the mempool, this is passed to the notification signal...
std::list< std::unique_ptr< CZMQAbstractNotifier > > notifiers
virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
void TransactionRemovedFromMempool(const CTransactionRef &tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override
Notifies listeners of a transaction leaving mempool.
virtual bool NotifyBlockConnect(const CBlockIndex *pindex)
virtual bool NotifyTransaction(const CTransaction &transaction)
ChainstateRole
This enum describes the various roles a specific Chainstate instance can take.
Definition: chain.h:25
static std::unique_ptr< CZMQNotificationInterface > Create(std::function< bool(CBlock &, const CBlockIndex &)> get_block_by_index)
static const int DEFAULT_ZMQ_SNDHWM
void zmqError(const std::string &str)
Definition: zmqutil.cpp:13
void BlockConnected(ChainstateRole role, const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexConnected) override
Notifies listeners of a block being connected.
ArgsManager gArgs
Definition: args.cpp:42
virtual void Shutdown()=0
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override
Notifies listeners when the block chain tip advances.
The block chain is a tree shaped structure starting with the genesis block at the root...
Definition: chain.h:144
int64_t GetIntArg(const std::string &strArg, int64_t nDefault) const
Return integer argument or default value.
Definition: args.cpp:481
virtual bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
std::list< const CZMQAbstractNotifier * > GetActiveNotifiers() const
The basic transaction that is broadcasted on the network and contained in blocks. ...
Definition: transaction.h:294
std::vector< std::string > GetArgs(const std::string &strArg) const
Return a vector of strings of the given argument.
Definition: args.cpp:361