diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index a4b28c8e8..8ed646c81 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -33,7 +33,7 @@ pub struct Emitter<'c, C> { /// The last emitted block during our last mempool emission. This is used to determine whether /// there has been a reorg since our last mempool emission. - last_mempool_tip: Option<(u32, BlockHash)>, + last_mempool_tip: Option, } impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { @@ -65,12 +65,17 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { pub fn mempool(&mut self) -> Result, bitcoincore_rpc::Error> { let client = self.client; - let prev_mempool_tip = match self.last_mempool_tip { - // use 'avoid-re-emission' logic if there is no reorg - Some((height, hash)) if self.emitted_blocks.get(&height) == Some(&hash) => height, - _ => 0, - }; - + // This is the emitted tip height during the last mempool emission. + let prev_mempool_tip = self + .last_mempool_tip + // We use `start_height - 1` as we cannot guarantee that the block at + // `start_height` has been emitted. + .unwrap_or(self.start_height.saturating_sub(1)); + + // Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep + // track of the latest mempool tx's timestamp to determine whether we have seen a tx + // before. `prev_mempool_time` is the previous timestamp and `last_time` records what will + // be the new latest timestamp. let prev_mempool_time = self.last_mempool_time; let mut latest_time = prev_mempool_time; @@ -109,11 +114,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { .collect::, _>>()?; self.last_mempool_time = latest_time; - self.last_mempool_tip = self - .emitted_blocks - .iter() - .last() - .map(|(&height, &hash)| (height, hash)); + self.last_mempool_tip = self.emitted_blocks.iter().last().map(|(&height, _)| height); Ok(txs_to_emit) } @@ -209,7 +210,18 @@ where continue; } PollResponse::AgreementFound(res) => { - emitter.emitted_blocks.split_off(&(res.height as u32 + 1)); + let agreement_h = res.height as u32; + + // get rid of evicted blocks + emitter.emitted_blocks.split_off(&(agreement_h + 1)); + + // The tip during the last mempool emission needs to in the best chain, we reduce + // it if it is not. + if let Some(h) = emitter.last_mempool_tip.as_mut() { + if *h > agreement_h { + *h = agreement_h; + } + } emitter.last_block = Some(res); continue; } diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index 601fb5616..5d57bde18 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -368,7 +368,8 @@ fn test_into_tx_graph() -> anyhow::Result<()> { // must receive mined block which will confirm the transactions. { let (height, block) = emitter.next_block()?.expect("must get mined block"); - let _ = chain.apply_update(block_to_chain_update(&block, height))?; + let _ = chain + .apply_update(CheckPoint::from_header(&block.header, height).into_update(false))?; let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height); assert!(indexed_additions.graph.txs.is_empty()); assert!(indexed_additions.graph.txouts.is_empty()); @@ -685,34 +686,59 @@ fn mempool_during_reorg() -> anyhow::Result<()> { env.send(&addr, Amount::from_sat(2100))?; } - // perform reorgs at different heights - for reorg_count in 1..TIP_DIFF { - // sync emitter to tip - while emitter.next_header()?.is_some() {} + // sync emitter to tip, first mempool emission should include all txs (as we haven't emitted + // from the mempool yet) + while emitter.next_header()?.is_some() {} + assert_eq!( + emitter + .mempool()? + .into_iter() + .map(|(tx, _)| tx.txid()) + .collect::>(), + env.client + .get_raw_mempool()? + .into_iter() + .collect::>(), + "first mempool emission should include all txs", + ); + // perform reorgs at different heights, these reorgs will not comfirm transactions in the + // mempool + for reorg_count in 1..TIP_DIFF { println!("REORG COUNT: {}", reorg_count); env.reorg_empty_blocks(reorg_count)?; - // we recalculate this at every loop as reorgs may evict transactions from mempool - let tx_introductions = env + // This is a map of mempool txids to tip height where the tx was introduced to the mempool + // we recalculate this at every loop as reorgs may evict transactions from mempool. We use + // the introduction height to determine whether we expect a tx to appear in a mempool + // emission. + // TODO: How can have have reorg logic in `TestEnv` NOT blacklast old blocks first? + let tx_introductions = dbg!(env .client .get_raw_mempool_verbose()? .into_iter() .map(|(txid, entry)| (txid, entry.height as usize)) - .collect::>(); + .collect::>()); + // `next_header` emits the replacement block of the reorg if let Some((height, _)) = emitter.next_header()? { - // the mempool emission (that follows the first block emission after reorg) should return - // the entire mempool contents + println!("\t- replacement height: {}", height); + + // the mempool emission (that follows the first block emission after reorg) should only + // include mempool txs introduced at reorg height or greater let mempool = emitter .mempool()? .into_iter() .map(|(tx, _)| tx.txid()) .collect::>(); - let exp_mempool = tx_introductions.keys().copied().collect::>(); + let exp_mempool = tx_introductions + .iter() + .filter(|(_, &intro_h)| intro_h >= (height as usize)) + .map(|(&txid, _)| txid) + .collect::>(); assert_eq!( mempool, exp_mempool, - "the first mempool emission after reorg should include all mempool txs" + "the first mempool emission after reorg should only include mempool txs introduced at reorg height or greater" ); let mempool = emitter @@ -738,6 +764,9 @@ fn mempool_during_reorg() -> anyhow::Result<()> { .collect::>(), ); } + + // sync emitter to tip + while emitter.next_header()?.is_some() {} } Ok(())