Skip to content

Commit

Permalink
bitcoind_rpc: rm BlockHash from Emitter::last_mempool_tip
Browse files Browse the repository at this point in the history
Instead of comparing the blockhash against the emitted_blocks map
to see whether the block is part of the emitter's best chain, we
reduce the `last_mempool_tip` height to the last agreement height
during the polling logic.

The benefits of this is we have tighter bounds for avoiding re-
emission. Also, it will be easier to replace `emitted_blocks` to
a `CheckPoint` (since we no longer rely on map lookup).
  • Loading branch information
evanlinjin committed Oct 9, 2023
1 parent 6d4b33e commit 57590e0
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 25 deletions.
38 changes: 25 additions & 13 deletions crates/bitcoind_rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub struct Emitter<'c, C> {

/// The last emitted block during our last mempool emission. This is used to determine whether
/// there has been a reorg since our last mempool emission.
last_mempool_tip: Option<(u32, BlockHash)>,
last_mempool_tip: Option<u32>,
}

impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
Expand Down Expand Up @@ -65,12 +65,17 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
pub fn mempool(&mut self) -> Result<Vec<(Transaction, u64)>, bitcoincore_rpc::Error> {
let client = self.client;

let prev_mempool_tip = match self.last_mempool_tip {
// use 'avoid-re-emission' logic if there is no reorg
Some((height, hash)) if self.emitted_blocks.get(&height) == Some(&hash) => height,
_ => 0,
};

// This is the emitted tip height during the last mempool emission.
let prev_mempool_tip = self
.last_mempool_tip
// We use `start_height - 1` as we cannot guarantee that the block at
// `start_height` has been emitted.
.unwrap_or(self.start_height.saturating_sub(1));

// Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep
// track of the latest mempool tx's timestamp to determine whether we have seen a tx
// before. `prev_mempool_time` is the previous timestamp and `last_time` records what will
// be the new latest timestamp.
let prev_mempool_time = self.last_mempool_time;
let mut latest_time = prev_mempool_time;

Expand Down Expand Up @@ -109,11 +114,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
.collect::<Result<Vec<_>, _>>()?;

self.last_mempool_time = latest_time;
self.last_mempool_tip = self
.emitted_blocks
.iter()
.last()
.map(|(&height, &hash)| (height, hash));
self.last_mempool_tip = self.emitted_blocks.iter().last().map(|(&height, _)| height);

Ok(txs_to_emit)
}
Expand Down Expand Up @@ -209,7 +210,18 @@ where
continue;
}
PollResponse::AgreementFound(res) => {
emitter.emitted_blocks.split_off(&(res.height as u32 + 1));
let agreement_h = res.height as u32;

// get rid of evicted blocks
emitter.emitted_blocks.split_off(&(agreement_h + 1));

// The tip during the last mempool emission needs to in the best chain, we reduce
// it if it is not.
if let Some(h) = emitter.last_mempool_tip.as_mut() {
if *h > agreement_h {
*h = agreement_h;
}
}
emitter.last_block = Some(res);
continue;
}
Expand Down
53 changes: 41 additions & 12 deletions crates/bitcoind_rpc/tests/test_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,8 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
// must receive mined block which will confirm the transactions.
{
let (height, block) = emitter.next_block()?.expect("must get mined block");
let _ = chain.apply_update(block_to_chain_update(&block, height))?;
let _ = chain
.apply_update(CheckPoint::from_header(&block.header, height).into_update(false))?;
let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height);
assert!(indexed_additions.graph.txs.is_empty());
assert!(indexed_additions.graph.txouts.is_empty());
Expand Down Expand Up @@ -685,34 +686,59 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
env.send(&addr, Amount::from_sat(2100))?;
}

// perform reorgs at different heights
for reorg_count in 1..TIP_DIFF {
// sync emitter to tip
while emitter.next_header()?.is_some() {}
// sync emitter to tip, first mempool emission should include all txs (as we haven't emitted
// from the mempool yet)
while emitter.next_header()?.is_some() {}
assert_eq!(
emitter
.mempool()?
.into_iter()
.map(|(tx, _)| tx.txid())
.collect::<BTreeSet<_>>(),
env.client
.get_raw_mempool()?
.into_iter()
.collect::<BTreeSet<_>>(),
"first mempool emission should include all txs",
);

// perform reorgs at different heights, these reorgs will not comfirm transactions in the
// mempool
for reorg_count in 1..TIP_DIFF {
println!("REORG COUNT: {}", reorg_count);
env.reorg_empty_blocks(reorg_count)?;

// we recalculate this at every loop as reorgs may evict transactions from mempool
let tx_introductions = env
// This is a map of mempool txids to tip height where the tx was introduced to the mempool
// we recalculate this at every loop as reorgs may evict transactions from mempool. We use
// the introduction height to determine whether we expect a tx to appear in a mempool
// emission.
// TODO: How can have have reorg logic in `TestEnv` NOT blacklast old blocks first?
let tx_introductions = dbg!(env
.client
.get_raw_mempool_verbose()?
.into_iter()
.map(|(txid, entry)| (txid, entry.height as usize))
.collect::<BTreeMap<_, _>>();
.collect::<BTreeMap<_, _>>());

// `next_header` emits the replacement block of the reorg
if let Some((height, _)) = emitter.next_header()? {
// the mempool emission (that follows the first block emission after reorg) should return
// the entire mempool contents
println!("\t- replacement height: {}", height);

// the mempool emission (that follows the first block emission after reorg) should only
// include mempool txs introduced at reorg height or greater
let mempool = emitter
.mempool()?
.into_iter()
.map(|(tx, _)| tx.txid())
.collect::<BTreeSet<_>>();
let exp_mempool = tx_introductions.keys().copied().collect::<BTreeSet<_>>();
let exp_mempool = tx_introductions
.iter()
.filter(|(_, &intro_h)| intro_h >= (height as usize))
.map(|(&txid, _)| txid)
.collect::<BTreeSet<_>>();
assert_eq!(
mempool, exp_mempool,
"the first mempool emission after reorg should include all mempool txs"
"the first mempool emission after reorg should only include mempool txs introduced at reorg height or greater"
);

let mempool = emitter
Expand All @@ -738,6 +764,9 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
.collect::<Vec<_>>(),
);
}

// sync emitter to tip
while emitter.next_header()?.is_some() {}
}

Ok(())
Expand Down

0 comments on commit 57590e0

Please sign in to comment.