From e0512acf9476fa3fa7da9cc28a222432f974aad5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Sat, 30 Dec 2023 20:48:20 +0800 Subject: [PATCH] feat(bitcoind_rpc)!: emissions include checkpoint and connected_to data Previously, emissions are purely blocks + the block height. This means emitted blocks can only connect to previous-adjacent blocks. Hence, sync must start from genesis and include every block. --- crates/bitcoind_rpc/src/lib.rs | 70 ++++++++++--- crates/bitcoind_rpc/tests/test_emitter.rs | 98 +++++++++---------- .../example_bitcoind_rpc_polling/src/main.rs | 34 ++++--- 3 files changed, 128 insertions(+), 74 deletions(-) diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index e790b8a8e..ce5e863bb 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -43,11 +43,13 @@ pub struct Emitter<'c, C> { } impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { - /// Construct a new [`Emitter`] with the given RPC `client`, `last_cp` and `start_height`. + /// Construct a new [`Emitter`]. /// - /// * `last_cp` is the check point used to find the latest block which is still part of the best - /// chain. - /// * `start_height` is the block height to start emitting blocks from. + /// `last_cp` informs the emitter of the chain we are starting off with. This way, the emitter + /// can start emission from a block that connects to the original chain. + /// + /// `start_height` starts emission from a given height (if there are no conflicts with the + /// original chain). pub fn new(client: &'c C, last_cp: CheckPoint, start_height: u32) -> Self { Self { client, @@ -127,13 +129,58 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { } /// Emit the next block height and header (if any). - pub fn next_header(&mut self) -> Result, bitcoincore_rpc::Error> { - poll(self, |hash| self.client.get_block_header(hash)) + pub fn next_header(&mut self) -> Result>, bitcoincore_rpc::Error> { + Ok(poll(self, |hash| self.client.get_block_header(hash))? + .map(|(checkpoint, block)| BlockEvent { block, checkpoint })) } /// Emit the next block height and block (if any). - pub fn next_block(&mut self) -> Result, bitcoincore_rpc::Error> { - poll(self, |hash| self.client.get_block(hash)) + pub fn next_block(&mut self) -> Result>, bitcoincore_rpc::Error> { + Ok(poll(self, |hash| self.client.get_block(hash))? + .map(|(checkpoint, block)| BlockEvent { block, checkpoint })) + } +} + +/// A newly emitted block from [`Emitter`]. +#[derive(Debug)] +pub struct BlockEvent { + /// Either a full [`Block`] or [`Header`] of the new block. + pub block: B, + + /// The checkpoint of the new block. + /// + /// A [`CheckPoint`] is a node of a linked list of [`BlockId`]s. This checkpoint is linked to + /// all [`BlockId`]s originally passed in [`Emitter::new`] as well as emitted blocks since then. + /// These blocks are guaranteed to be of the same chain. + /// + /// This is important as BDK structures require block-to-apply to be connected with another + /// block in the original chain. + pub checkpoint: CheckPoint, +} + +impl BlockEvent { + /// The block height of this new block. + pub fn block_height(&self) -> u32 { + self.checkpoint.height() + } + + /// The block hash of this new block. + pub fn block_hash(&self) -> BlockHash { + self.checkpoint.hash() + } + + /// The [`BlockId`] of a previous block that this block connects to. + /// + /// This either returns a [`BlockId`] of a previously emitted block or from the chain we started + /// with (passed in as `last_cp` in [`Emitter::new`]). + /// + /// This value is derived from [`BlockEvent::checkpoint`]. + pub fn connected_to(&self) -> BlockId { + match self.checkpoint.prev() { + Some(prev_cp) => prev_cp.block_id(), + // there is no previous checkpoint, so just connect with itself + None => self.checkpoint.block_id(), + } } } @@ -203,7 +250,7 @@ where fn poll( emitter: &mut Emitter, get_item: F, -) -> Result, bitcoincore_rpc::Error> +) -> Result, bitcoincore_rpc::Error> where C: bitcoincore_rpc::RpcApi, F: Fn(&BlockHash) -> Result, @@ -215,13 +262,14 @@ where let hash = res.hash; let item = get_item(&hash)?; - emitter.last_cp = emitter + let new_cp = emitter .last_cp .clone() .push(BlockId { height, hash }) .expect("must push"); + emitter.last_cp = new_cp.clone(); emitter.last_block = Some(res); - return Ok(Some((height, item))); + return Ok(Some((new_cp, item))); } PollResponse::NoMoreBlocks => { emitter.last_block = None; diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index 521124e5d..384df92d0 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -157,28 +157,6 @@ impl TestEnv { } } -fn block_to_chain_update(block: &bitcoin::Block, height: u32) -> local_chain::Update { - let this_id = BlockId { - height, - hash: block.block_hash(), - }; - let tip = if block.header.prev_blockhash == BlockHash::all_zeros() { - CheckPoint::new(this_id) - } else { - CheckPoint::new(BlockId { - height: height - 1, - hash: block.header.prev_blockhash, - }) - .extend(core::iter::once(this_id)) - .expect("must construct checkpoint") - }; - - local_chain::Update { - tip, - introduce_older_blocks: false, - } -} - /// Ensure that blocks are emitted in order even after reorg. /// /// 1. Mine 101 blocks. @@ -200,17 +178,21 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> { // see if the emitter outputs the right blocks println!("first sync:"); - while let Some((height, block)) = emitter.next_block()? { + while let Some(emission) = emitter.next_block()? { + let height = emission.block_height(); + let hash = emission.block_hash(); assert_eq!( - block.block_hash(), + emission.block_hash(), exp_hashes[height as usize], "emitted block hash is unexpected" ); - let chain_update = block_to_chain_update(&block, height); assert_eq!( - local_chain.apply_update(chain_update)?, - BTreeMap::from([(height, Some(block.block_hash()))]), + local_chain.apply_update(local_chain::Update { + tip: emission.checkpoint, + introduce_older_blocks: false, + })?, + BTreeMap::from([(height, Some(hash))]), "chain update changeset is unexpected", ); } @@ -237,27 +219,30 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> { // see if the emitter outputs the right blocks println!("after reorg:"); let mut exp_height = exp_hashes.len() - reorged_blocks.len(); - while let Some((height, block)) = emitter.next_block()? { + while let Some(emission) = emitter.next_block()? { + let height = emission.block_height(); + let hash = emission.block_hash(); assert_eq!( height, exp_height as u32, "emitted block has unexpected height" ); assert_eq!( - block.block_hash(), - exp_hashes[height as usize], + hash, exp_hashes[height as usize], "emitted block is unexpected" ); - let chain_update = block_to_chain_update(&block, height); assert_eq!( - local_chain.apply_update(chain_update)?, + local_chain.apply_update(local_chain::Update { + tip: emission.checkpoint, + introduce_older_blocks: false, + })?, if exp_height == exp_hashes.len() - reorged_blocks.len() { - core::iter::once((height, Some(block.block_hash()))) + core::iter::once((height, Some(hash))) .chain((height + 1..exp_hashes.len() as u32).map(|h| (h, None))) .collect::() } else { - BTreeMap::from([(height, Some(block.block_hash()))]) + BTreeMap::from([(height, Some(hash))]) }, "chain update changeset is unexpected", ); @@ -307,9 +292,13 @@ fn test_into_tx_graph() -> anyhow::Result<()> { let emitter = &mut Emitter::new(&env.client, chain.tip(), 0); - while let Some((height, block)) = emitter.next_block()? { - let _ = chain.apply_update(block_to_chain_update(&block, height))?; - let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height); + while let Some(emission) = emitter.next_block()? { + let height = emission.block_height(); + let _ = chain.apply_update(local_chain::Update { + tip: emission.checkpoint, + introduce_older_blocks: false, + })?; + let indexed_additions = indexed_tx_graph.apply_block_relevant(emission.block, height); assert!(indexed_additions.is_empty()); } @@ -367,10 +356,13 @@ fn test_into_tx_graph() -> anyhow::Result<()> { // must receive mined block which will confirm the transactions. { - let (height, block) = emitter.next_block()?.expect("must get mined block"); - let _ = chain - .apply_update(CheckPoint::from_header(&block.header, height).into_update(false))?; - let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height); + let emission = emitter.next_block()?.expect("must get mined block"); + let height = emission.block_height(); + let _ = chain.apply_update(local_chain::Update { + tip: emission.checkpoint, + introduce_older_blocks: false, + })?; + let indexed_additions = indexed_tx_graph.apply_block_relevant(emission.block, height); assert!(indexed_additions.graph.txs.is_empty()); assert!(indexed_additions.graph.txouts.is_empty()); assert_eq!(indexed_additions.graph.anchors, exp_anchors); @@ -407,9 +399,12 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> { for reorg_count in 1..=10 { let replaced_blocks = env.reorg_empty_blocks(reorg_count)?; - let (height, next_header) = emitter.next_header()?.expect("must emit block after reorg"); + let next_emission = emitter.next_header()?.expect("must emit block after reorg"); assert_eq!( - (height as usize, next_header.block_hash()), + ( + next_emission.block_height() as usize, + next_emission.block_hash() + ), replaced_blocks[0], "block emitted after reorg should be at the reorg height" ); @@ -439,8 +434,9 @@ fn sync_from_emitter( where C: bitcoincore_rpc::RpcApi, { - while let Some((height, block)) = emitter.next_block()? { - process_block(recv_chain, recv_graph, block, height)?; + while let Some(emission) = emitter.next_block()? { + let height = emission.block_height(); + process_block(recv_chain, recv_graph, emission.block, height)?; } Ok(()) } @@ -660,7 +656,8 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<() // At this point, the emitter has seen all mempool transactions. It should only re-emit those // that have introduction heights less than the emitter's last-emitted block tip. - while let Some((height, _)) = emitter.next_header()? { + while let Some(emission) = emitter.next_header()? { + let height = emission.block_height(); // We call `mempool()` twice. // The second call (at height `h`) should skip the tx introduced at height `h`. for try_index in 0..2 { @@ -754,7 +751,8 @@ fn mempool_during_reorg() -> anyhow::Result<()> { .collect::>()); // `next_header` emits the replacement block of the reorg - if let Some((height, _)) = emitter.next_header()? { + if let Some(emission) = emitter.next_header()? { + let height = emission.block_height(); println!("\t- replacement height: {}", height); // the mempool emission (that follows the first block emission after reorg) should only @@ -835,12 +833,12 @@ fn no_agreement_point() -> anyhow::Result<()> { env.mine_blocks(PREMINE_COUNT, None)?; // emit block 99a - let (_, block_header_99a) = emitter.next_header()?.expect("block 99a header"); + let block_header_99a = emitter.next_header()?.expect("block 99a header").block; let block_hash_99a = block_header_99a.block_hash(); let block_hash_98a = block_header_99a.prev_blockhash; // emit block 100a - let (_, block_header_100a) = emitter.next_header()?.expect("block 100a header"); + let block_header_100a = emitter.next_header()?.expect("block 100a header").block; let block_hash_100a = block_header_100a.block_hash(); // get hash for block 101a @@ -855,7 +853,7 @@ fn no_agreement_point() -> anyhow::Result<()> { env.mine_blocks(3, None)?; // emit block header 99b - let (_, block_header_99b) = emitter.next_header()?.expect("block 99b header"); + let block_header_99b = emitter.next_header()?.expect("block 99b header").block; let block_hash_99b = block_header_99b.block_hash(); let block_hash_98b = block_header_99b.prev_blockhash; diff --git a/example-crates/example_bitcoind_rpc_polling/src/main.rs b/example-crates/example_bitcoind_rpc_polling/src/main.rs index 449242e41..648962c28 100644 --- a/example-crates/example_bitcoind_rpc_polling/src/main.rs +++ b/example-crates/example_bitcoind_rpc_polling/src/main.rs @@ -14,7 +14,7 @@ use bdk_bitcoind_rpc::{ use bdk_chain::{ bitcoin::{constants::genesis_block, Block, Transaction}, indexed_tx_graph, keychain, - local_chain::{self, CheckPoint, LocalChain}, + local_chain::{self, LocalChain}, ConfirmationTimeHeightAnchor, IndexedTxGraph, }; use example_cli::{ @@ -42,7 +42,7 @@ type ChangeSet = ( #[derive(Debug)] enum Emission { - Block { height: u32, block: Block }, + Block(bdk_bitcoind_rpc::BlockEvent), Mempool(Vec<(Transaction, u64)>), Tip(u32), } @@ -178,17 +178,20 @@ fn main() -> anyhow::Result<()> { let mut last_db_commit = Instant::now(); let mut last_print = Instant::now(); - while let Some((height, block)) = emitter.next_block()? { + while let Some(emission) = emitter.next_block()? { + let height = emission.block_height(); + let mut chain = chain.lock().unwrap(); let mut graph = graph.lock().unwrap(); let mut db = db.lock().unwrap(); - let chain_update = - CheckPoint::from_header(&block.header, height).into_update(false); let chain_changeset = chain - .apply_update(chain_update) + .apply_update(local_chain::Update { + tip: emission.checkpoint, + introduce_older_blocks: false, + }) .expect("must always apply as we receive blocks in order from emitter"); - let graph_changeset = graph.apply_block_relevant(block, height); + let graph_changeset = graph.apply_block_relevant(emission.block, height); db.stage((chain_changeset, graph_changeset)); // commit staged db changes in intervals @@ -256,7 +259,8 @@ fn main() -> anyhow::Result<()> { loop { match emitter.next_block()? { - Some((height, block)) => { + Some(block_emission) => { + let height = block_emission.block_height(); if sigterm_flag.load(Ordering::Acquire) { break; } @@ -264,7 +268,7 @@ fn main() -> anyhow::Result<()> { block_count = rpc_client.get_block_count()? as u32; tx.send(Emission::Tip(block_count))?; } - tx.send(Emission::Block { height, block })?; + tx.send(Emission::Block(block_emission))?; } None => { if await_flag(&sigterm_flag, MEMPOOL_EMIT_DELAY) { @@ -293,13 +297,17 @@ fn main() -> anyhow::Result<()> { let mut chain = chain.lock().unwrap(); let changeset = match emission { - Emission::Block { height, block } => { - let chain_update = - CheckPoint::from_header(&block.header, height).into_update(false); + Emission::Block(block_emission) => { + let height = block_emission.block_height(); + let chain_update = local_chain::Update { + tip: block_emission.checkpoint, + introduce_older_blocks: false, + }; let chain_changeset = chain .apply_update(chain_update) .expect("must always apply as we receive blocks in order from emitter"); - let graph_changeset = graph.apply_block_relevant(block, height); + let graph_changeset = + graph.apply_block_relevant(block_emission.block, height); (chain_changeset, graph_changeset) } Emission::Mempool(mempool_txs) => {