Skip to content

Commit

Permalink
Merge #19572: ZMQ: Create "sequence" notifier, enabling client-side m…
Browse files Browse the repository at this point in the history
…empool tracking

759d94e Update zmq notification documentation and sample consumer (Gregory Sanders)
68c3c7e Add functional tests for zmq sequence topic and mempool sequence logic (Gregory Sanders)
e76fc2b Add 'sequence' zmq publisher to track all block (dis)connects, mempool deltas (Gregory Sanders)
1b615e6 zmq test: Actually make reorg occur (Gregory Sanders)

Pull request description:

  This PR creates a new ZMQ notifier that gives a "total hash history" of block (dis)connection, mempool addition/substraction, all in one pipeline. It also exposes a "mempool sequence number" to both this notifier and `getrawmempool` results, which allows the consumer to use the results together without confusion about ordering of results and without excessive `getrawmempool` polling.

  See the functional test `interfaces_zmq.py::test_mempool_sync` which shows the proposed user flow for the client-side tracking of mempool contents and confirmations.

  Inspired by bitcoin/bitcoin#19462 (comment)
  Alternative to bitcoin/bitcoin#19462 due to noted deficiencies in current zmq notification streams.

  Also fixes a legacy zmq test that didn't actually trigger a reorg because of identical blocks being generated on each side of the split(oops)

ACKs for top commit:
  laanwj:
    Code review ACK 759d94e

Tree-SHA512: 9daf0d7d996190f3a68ff40340a687519323d7a6c51dcb26be457fbc013217ea7b62fbd0700b74b654433d2e370704feb61e5584399290692464fcfcb72ce3b7
  • Loading branch information
laanwj committed Sep 23, 2020
2 parents 8219893 + 759d94e commit 9e217f5
Show file tree
Hide file tree
Showing 22 changed files with 552 additions and 63 deletions.
19 changes: 12 additions & 7 deletions contrib/zmq/zmq_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
-zmqpubrawtx=tcp://127.0.0.1:28332 \
-zmqpubrawblock=tcp://127.0.0.1:28332 \
-zmqpubhashtx=tcp://127.0.0.1:28332 \
-zmqpubhashblock=tcp://127.0.0.1:28332
-zmqpubhashblock=tcp://127.0.0.1:28332 \
-zmqpubsequence=tcp://127.0.0.1:28332
We use the asyncio library here. `self.handle()` installs itself as a
future at the end of the function. Since it never returns with the event
Expand Down Expand Up @@ -47,16 +48,14 @@ def __init__(self):
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "sequence")
self.zmqSubSocket.connect("tcp://127.0.0.1:%i" % port)

async def handle(self) :
msg = await self.zmqSubSocket.recv_multipart()
topic = msg[0]
body = msg[1]
topic, body, seq = await self.zmqSubSocket.recv_multipart()
sequence = "Unknown"
if len(msg[-1]) == 4:
msgSequence = struct.unpack('<I', msg[-1])[-1]
sequence = str(msgSequence)
if len(seq) == 4:
sequence = str(struct.unpack('<I', seq)[-1])
if topic == b"hashblock":
print('- HASH BLOCK ('+sequence+') -')
print(binascii.hexlify(body))
Expand All @@ -69,6 +68,12 @@ async def handle(self) :
elif topic == b"rawtx":
print('- RAW TX ('+sequence+') -')
print(binascii.hexlify(body))
elif topic == b"sequence":
hash = binascii.hexlify(body[:32])
label = chr(body[32])
mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
print('- SEQUENCE ('+sequence+') -')
print(hash, label, mempool_sequence)
# schedule ourselves to receive the next message
asyncio.ensure_future(self.handle())

Expand Down
29 changes: 23 additions & 6 deletions doc/zmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Currently, the following notifications are supported:
-zmqpubhashblock=address
-zmqpubrawblock=address
-zmqpubrawtx=address
-zmqpubsequence=address

The socket type is PUB and the address must be a valid ZeroMQ socket
address. The same address can be used in more than one notification.
Expand All @@ -74,6 +75,7 @@ The option to set the PUB socket's outbound message high water mark
-zmqpubhashblockhwm=n
-zmqpubrawblockhwm=n
-zmqpubrawtxhwm=n
-zmqpubsequencehwm=address

The high water mark value must be an integer greater than or equal to 0.

Expand All @@ -87,7 +89,15 @@ Each PUB notification has a topic and body, where the header
corresponds to the notification type. For instance, for the
notification `-zmqpubhashtx` the topic is `hashtx` (no null
terminator) and the body is the transaction hash (32
bytes).
bytes) for all but `sequence` topic. For `sequence`, the body
is structured as the following based on the type of message:

<32-byte hash>C : Blockhash connected
<32-byte hash>D : Blockhash disconnected
<32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason
<32-byte hash>A<8-byte LE uint> : Transactionhash added mempool

Where the 8-byte uints correspond to the mempool sequence number.

These options can also be provided in bitcoin.conf.

Expand Down Expand Up @@ -124,13 +134,20 @@ No authentication or authorization is done on connecting clients; it
is assumed that the ZeroMQ port is exposed only to trusted entities,
using other means such as firewalling.

Note that when the block chain tip changes, a reorganisation may occur
and just the tip will be notified. It is up to the subscriber to
retrieve the chain from the last known block to the new tip. Also note
that no notification occurs if the tip was in the active chain - this
is the case after calling invalidateblock RPC.
Note that for `*block` topics, when the block chain tip changes,
a reorganisation may occur and just the tip will be notified.
It is up to the subscriber to retrieve the chain from the last known
block to the new tip. Also note that no notification will occur if the tip
was in the active chain--as would be the case after calling invalidateblock RPC.
In contrast, the `sequence` topic publishes all block connections and
disconnections.

There are several possibilities that ZMQ notification can get lost
during transmission depending on the communication type you are
using. Bitcoind appends an up-counting sequence number to each
notification which allows listeners to detect lost notifications.

The `sequence` topic refers specifically to the mempool sequence
number, which is also published along with all mempool events. This
is a different sequence value than in ZMQ itself in order to allow a total
ordering of mempool events to be constructed.
4 changes: 4 additions & 0 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -488,19 +488,23 @@ void SetupServerArgs(NodeContext& node)
argsman.AddArg("-zmqpubhashtx=<address>", "Enable publish hash transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawblock=<address>", "Enable publish raw block in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubsequence=<address>", "Enable publish hash block and tx sequence in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashblockhwm=<n>", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashtxhwm=<n>", strprintf("Set publish hash transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawblockhwm=<n>", strprintf("Set publish raw block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxhwm=<n>", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubsequencehwm=<n>", strprintf("Set publish hash sequence message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
#else
hidden_args.emplace_back("-zmqpubhashblock=<address>");
hidden_args.emplace_back("-zmqpubhashtx=<address>");
hidden_args.emplace_back("-zmqpubrawblock=<address>");
hidden_args.emplace_back("-zmqpubrawtx=<address>");
hidden_args.emplace_back("-zmqpubsequence=<n>");
hidden_args.emplace_back("-zmqpubhashblockhwm=<n>");
hidden_args.emplace_back("-zmqpubhashtxhwm=<n>");
hidden_args.emplace_back("-zmqpubrawblockhwm=<n>");
hidden_args.emplace_back("-zmqpubrawtxhwm=<n>");
hidden_args.emplace_back("-zmqpubsequencehwm=<n>");
#endif

argsman.AddArg("-checkblocks=<n>", strprintf("How many blocks to check at startup (default: %u, 0 = all)", DEFAULT_CHECKBLOCKS), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
Expand Down
10 changes: 5 additions & 5 deletions src/interfaces/chain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ class NotificationsProxy : public CValidationInterface
explicit NotificationsProxy(std::shared_ptr<Chain::Notifications> notifications)
: m_notifications(std::move(notifications)) {}
virtual ~NotificationsProxy() = default;
void TransactionAddedToMempool(const CTransactionRef& tx) override
void TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) override
{
m_notifications->transactionAddedToMempool(tx);
m_notifications->transactionAddedToMempool(tx, mempool_sequence);
}
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override
{
m_notifications->transactionRemovedFromMempool(tx, reason);
m_notifications->transactionRemovedFromMempool(tx, reason, mempool_sequence);
}
void BlockConnected(const std::shared_ptr<const CBlock>& block, const CBlockIndex* index) override
{
Expand Down Expand Up @@ -405,7 +405,7 @@ class ChainImpl : public Chain
if (!m_node.mempool) return;
LOCK2(::cs_main, m_node.mempool->cs);
for (const CTxMemPoolEntry& entry : m_node.mempool->mapTx) {
notifications.transactionAddedToMempool(entry.GetSharedTx());
notifications.transactionAddedToMempool(entry.GetSharedTx(), 0 /* mempool_sequence */);
}
}
NodeContext& m_node;
Expand Down
4 changes: 2 additions & 2 deletions src/interfaces/chain.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ class Chain
{
public:
virtual ~Notifications() {}
virtual void transactionAddedToMempool(const CTransactionRef& tx) {}
virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {}
virtual void transactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) {}
virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {}
virtual void blockConnected(const CBlock& block, int height) {}
virtual void blockDisconnected(const CBlock& block, int height) {}
virtual void updatedBlockTip() {}
Expand Down
41 changes: 35 additions & 6 deletions src/rpc/blockchain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -500,9 +500,12 @@ static void entryToJSON(const CTxMemPool& pool, UniValue& info, const CTxMemPool
info.pushKV("unbroadcast", pool.IsUnbroadcastTx(tx.GetHash()));
}

UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose)
UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose, bool include_mempool_sequence)
{
if (verbose) {
if (include_mempool_sequence) {
throw JSONRPCError(RPC_INVALID_PARAMETER, "Verbose results cannot contain mempool sequence values.");
}
LOCK(pool.cs);
UniValue o(UniValue::VOBJ);
for (const CTxMemPoolEntry& e : pool.mapTx) {
Expand All @@ -516,14 +519,25 @@ UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose)
}
return o;
} else {
uint64_t mempool_sequence;
std::vector<uint256> vtxid;
pool.queryHashes(vtxid);

{
LOCK(pool.cs);
pool.queryHashes(vtxid);
mempool_sequence = pool.GetSequence();
}
UniValue a(UniValue::VARR);
for (const uint256& hash : vtxid)
a.push_back(hash.ToString());

return a;
if (!include_mempool_sequence) {
return a;
} else {
UniValue o(UniValue::VOBJ);
o.pushKV("txids", a);
o.pushKV("mempool_sequence", mempool_sequence);
return o;
}
}
}

Expand All @@ -534,6 +548,7 @@ static RPCHelpMan getrawmempool()
"\nHint: use getmempoolentry to fetch a specific transaction from the mempool.\n",
{
{"verbose", RPCArg::Type::BOOL, /* default */ "false", "True for a json object, false for array of transaction ids"},
{"mempool_sequence", RPCArg::Type::BOOL, /* default */ "false", "If verbose=false, returns a json object with transaction list and mempool sequence number attached."},
},
{
RPCResult{"for verbose = false",
Expand All @@ -546,6 +561,15 @@ static RPCHelpMan getrawmempool()
{
{RPCResult::Type::OBJ, "transactionid", "", MempoolEntryDescription()},
}},
RPCResult{"for verbose = false and mempool_sequence = true",
RPCResult::Type::OBJ, "", "",
{
{RPCResult::Type::ARR, "txids", "",
{
{RPCResult::Type::STR_HEX, "", "The transaction id"},
}},
{RPCResult::Type::NUM, "mempool_sequence", "The mempool sequence value."},
}},
},
RPCExamples{
HelpExampleCli("getrawmempool", "true")
Expand All @@ -557,7 +581,12 @@ static RPCHelpMan getrawmempool()
if (!request.params[0].isNull())
fVerbose = request.params[0].get_bool();

return MempoolToJSON(EnsureMemPool(request.context), fVerbose);
bool include_mempool_sequence = false;
if (!request.params[1].isNull()) {
include_mempool_sequence = request.params[1].get_bool();
}

return MempoolToJSON(EnsureMemPool(request.context), fVerbose, include_mempool_sequence);
},
};
}
Expand Down Expand Up @@ -2451,7 +2480,7 @@ static const CRPCCommand commands[] =
{ "blockchain", "getmempooldescendants", &getmempooldescendants, {"txid","verbose"} },
{ "blockchain", "getmempoolentry", &getmempoolentry, {"txid"} },
{ "blockchain", "getmempoolinfo", &getmempoolinfo, {} },
{ "blockchain", "getrawmempool", &getrawmempool, {"verbose"} },
{ "blockchain", "getrawmempool", &getrawmempool, {"verbose", "mempool_sequence"} },
{ "blockchain", "gettxout", &gettxout, {"txid","n","include_mempool"} },
{ "blockchain", "gettxoutsetinfo", &gettxoutsetinfo, {"hash_type"} },
{ "blockchain", "pruneblockchain", &pruneblockchain, {"height"} },
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/blockchain.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ UniValue blockToJSON(const CBlock& block, const CBlockIndex* tip, const CBlockIn
UniValue MempoolInfoToJSON(const CTxMemPool& pool);

/** Mempool to JSON */
UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose = false);
UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose = false, bool include_mempool_sequence = false);

/** Block header to JSON */
UniValue blockheaderToJSON(const CBlockIndex* tip, const CBlockIndex* blockindex) LOCKS_EXCLUDED(cs_main);
Expand Down
1 change: 1 addition & 0 deletions src/rpc/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ static const CRPCConvertParam vRPCConvertParams[] =
{ "pruneblockchain", 0, "height" },
{ "keypoolrefill", 0, "newsize" },
{ "getrawmempool", 0, "verbose" },
{ "getrawmempool", 1, "mempool_sequence" },
{ "estimatesmartfee", 0, "conf_target" },
{ "estimaterawfee", 0, "conf_target" },
{ "estimaterawfee", 1, "threshold" },
Expand Down
6 changes: 5 additions & 1 deletion src/txmempool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,12 +409,16 @@ void CTxMemPool::addUnchecked(const CTxMemPoolEntry &entry, setEntries &setAnces

void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason)
{
// We increment mempool sequence value no matter removal reason
// even if not directly reported below.
uint64_t mempool_sequence = GetAndIncrementSequence();

if (reason != MemPoolRemovalReason::BLOCK) {
// Notify clients that a transaction has been removed from the mempool
// for any reason except being included in a block. Clients interested
// in transactions included in blocks can subscribe to the BlockConnected
// notification.
GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason);
GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason, mempool_sequence);
}

const uint256 hash = it->GetTx().GetHash();
Expand Down
14 changes: 14 additions & 0 deletions src/txmempool.h
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,11 @@ class CTxMemPool
mutable uint64_t m_epoch;
mutable bool m_has_epoch_guard;

// In-memory counter for external mempool tracking purposes.
// This number is incremented once every time a transaction
// is added or removed from the mempool for any reason.
mutable uint64_t m_sequence_number{1};

void trackPackageRemoved(const CFeeRate& rate) EXCLUSIVE_LOCKS_REQUIRED(cs);

bool m_is_loaded GUARDED_BY(cs){false};
Expand Down Expand Up @@ -776,6 +781,15 @@ class CTxMemPool
return m_unbroadcast_txids.count(txid) != 0;
}

/** Guards this internal counter for external reporting */
uint64_t GetAndIncrementSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) {
return m_sequence_number++;
}

uint64_t GetSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) {
return m_sequence_number;
}

private:
/** UpdateForDescendants is used by UpdateTransactionsFromBlock to update
* the descendants for a single transaction that has been added to the
Expand Down
2 changes: 1 addition & 1 deletion src/validation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,7 @@ bool MemPoolAccept::AcceptSingleTransaction(const CTransactionRef& ptx, ATMPArgs

if (!Finalize(args, workspace)) return false;

GetMainSignals().TransactionAddedToMempool(ptx);
GetMainSignals().TransactionAddedToMempool(ptx, m_pool.GetAndIncrementSequence());

return true;
}
Expand Down
12 changes: 6 additions & 6 deletions src/validationinterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,18 +199,18 @@ void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockInd
fInitialDownload);
}

void CMainSignals::TransactionAddedToMempool(const CTransactionRef& tx) {
auto event = [tx, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx); });
void CMainSignals::TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) {
auto event = [tx, mempool_sequence, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx, mempool_sequence); });
};
ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s wtxid=%s", __func__,
tx->GetHash().ToString(),
tx->GetWitnessHash().ToString());
}

void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {
auto event = [tx, reason, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason); });
void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {
auto event = [tx, reason, mempool_sequence, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason, mempool_sequence); });
};
ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s wtxid=%s", __func__,
tx->GetHash().ToString(),
Expand Down
9 changes: 5 additions & 4 deletions src/validationinterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ class CValidationInterface {
*
* Called on a background thread.
*/
virtual void TransactionAddedToMempool(const CTransactionRef& tx) {}
virtual void TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) {}

/**
* Notifies listeners of a transaction leaving mempool.
*
Expand Down Expand Up @@ -130,7 +131,7 @@ class CValidationInterface {
*
* Called on a background thread.
*/
virtual void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {}
virtual void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {}
/**
* Notifies listeners of a block being connected.
* Provides a vector of transactions evicted from the mempool as a result.
Expand Down Expand Up @@ -197,8 +198,8 @@ class CMainSignals {


void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload);
void TransactionAddedToMempool(const CTransactionRef&);
void TransactionRemovedFromMempool(const CTransactionRef&, MemPoolRemovalReason);
void TransactionAddedToMempool(const CTransactionRef&, uint64_t mempool_sequence);
void TransactionRemovedFromMempool(const CTransactionRef&, MemPoolRemovalReason, uint64_t mempool_sequence);
void BlockConnected(const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex);
void BlockDisconnected(const std::shared_ptr<const CBlock> &, const CBlockIndex* pindex);
void ChainStateFlushed(const CBlockLocator &);
Expand Down
Loading

0 comments on commit 9e217f5

Please sign in to comment.