diff --git a/Cargo.toml b/Cargo.toml index e625d581f..b190ba88f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ members = [ "example-crates/wallet_electrum", "example-crates/wallet_esplora_blocking", "example-crates/wallet_esplora_async", + "example-crates/wallet_rpc", "nursery/tmp_plan", "nursery/coin_select" ] diff --git a/crates/bdk/src/wallet/mod.rs b/crates/bdk/src/wallet/mod.rs index 2177a88c4..6601a112a 100644 --- a/crates/bdk/src/wallet/mod.rs +++ b/crates/bdk/src/wallet/mod.rs @@ -23,7 +23,9 @@ pub use bdk_chain::keychain::Balance; use bdk_chain::{ indexed_tx_graph, keychain::{self, KeychainTxOutIndex}, - local_chain::{self, CannotConnectError, CheckPoint, CheckPointIter, LocalChain}, + local_chain::{ + self, ApplyHeaderError, CannotConnectError, CheckPoint, CheckPointIter, LocalChain, + }, tx_graph::{CanonicalTx, TxGraph}, Append, BlockId, ChainPosition, ConfirmationTime, ConfirmationTimeHeightAnchor, FullTxOut, IndexedTxGraph, Persist, PersistBackend, @@ -31,8 +33,8 @@ use bdk_chain::{ use bitcoin::secp256k1::{All, Secp256k1}; use bitcoin::sighash::{EcdsaSighashType, TapSighashType}; use bitcoin::{ - absolute, Address, Network, OutPoint, Script, ScriptBuf, Sequence, Transaction, TxOut, Txid, - Weight, Witness, + absolute, Address, Block, Network, OutPoint, Script, ScriptBuf, Sequence, Transaction, TxOut, + Txid, Weight, Witness, }; use bitcoin::{consensus::encode::serialize, BlockHash}; use bitcoin::{constants::genesis_block, psbt}; @@ -438,6 +440,55 @@ pub enum InsertTxError { }, } +impl fmt::Display for InsertTxError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + InsertTxError::ConfirmationHeightCannotBeGreaterThanTip { + tip_height, + tx_height, + } => { + write!(f, "cannot insert tx with confirmation height ({}) higher than internal tip height ({})", tx_height, tip_height) + } + } + } +} + +#[cfg(feature = "std")] +impl std::error::Error for InsertTxError {} + +/// An error that may occur when applying a block to [`Wallet`]. +#[derive(Debug)] +pub enum ApplyBlockError { + /// Occurs when the update chain cannot connect with original chain. + CannotConnect(CannotConnectError), + /// Occurs when the `connected_to` hash does not match the hash derived from `block`. + UnexpectedConnectedToHash { + /// Block hash of `connected_to`. + connected_to_hash: BlockHash, + /// Expected block hash of `connected_to`, as derived from `block`. + expected_hash: BlockHash, + }, +} + +impl fmt::Display for ApplyBlockError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ApplyBlockError::CannotConnect(err) => err.fmt(f), + ApplyBlockError::UnexpectedConnectedToHash { + expected_hash: block_hash, + connected_to_hash: checkpoint_hash, + } => write!( + f, + "`connected_to` hash {} differs from the expected hash {} (which is derived from `block`)", + checkpoint_hash, block_hash + ), + } + } +} + +#[cfg(feature = "std")] +impl std::error::Error for ApplyBlockError {} + impl Wallet { /// Initialize an empty [`Wallet`]. pub fn new( @@ -2329,7 +2380,7 @@ impl Wallet { self.persist.commit().map(|c| c.is_some()) } - /// Returns the changes that will be staged with the next call to [`commit`]. + /// Returns the changes that will be committed with the next call to [`commit`]. /// /// [`commit`]: Self::commit pub fn staged(&self) -> &ChangeSet @@ -2353,6 +2404,86 @@ impl Wallet { pub fn local_chain(&self) -> &LocalChain { &self.chain } + + /// Introduces a `block` of `height` to the wallet, and tries to connect it to the + /// `prev_blockhash` of the block's header. + /// + /// This is a convenience method that is equivalent to calling [`apply_block_connected_to`] + /// with `prev_blockhash` and `height-1` as the `connected_to` parameter. + /// + /// [`apply_block_connected_to`]: Self::apply_block_connected_to + pub fn apply_block(&mut self, block: &Block, height: u32) -> Result<(), CannotConnectError> + where + D: PersistBackend, + { + let connected_to = match height.checked_sub(1) { + Some(prev_height) => BlockId { + height: prev_height, + hash: block.header.prev_blockhash, + }, + None => BlockId { + height, + hash: block.block_hash(), + }, + }; + self.apply_block_connected_to(block, height, connected_to) + .map_err(|err| match err { + ApplyHeaderError::InconsistentBlocks => { + unreachable!("connected_to is derived from the block so must be consistent") + } + ApplyHeaderError::CannotConnect(err) => err, + }) + } + + /// Applies relevant transactions from `block` of `height` to the wallet, and connects the + /// block to the internal chain. + /// + /// The `connected_to` parameter informs the wallet how this block connects to the internal + /// [`LocalChain`]. Relevant transactions are filtered from the `block` and inserted into the + /// internal [`TxGraph`]. + pub fn apply_block_connected_to( + &mut self, + block: &Block, + height: u32, + connected_to: BlockId, + ) -> Result<(), ApplyHeaderError> + where + D: PersistBackend, + { + let mut changeset = ChangeSet::default(); + changeset.append( + self.chain + .apply_header_connected_to(&block.header, height, connected_to)? + .into(), + ); + changeset.append( + self.indexed_graph + .apply_block_relevant(block, height) + .into(), + ); + self.persist.stage(changeset); + Ok(()) + } + + /// Apply relevant unconfirmed transactions to the wallet. + /// + /// Transactions that are not relevant are filtered out. + /// + /// This method takes in an iterator of `(tx, last_seen)` where `last_seen` is the timestamp of + /// when the transaction was last seen in the mempool. This is used for conflict resolution + /// when there is conflicting unconfirmed transactions. The transaction with the later + /// `last_seen` is prioritied. + pub fn apply_unconfirmed_txs<'t>( + &mut self, + unconfirmed_txs: impl IntoIterator, + ) where + D: PersistBackend, + { + let indexed_graph_changeset = self + .indexed_graph + .batch_insert_relevant_unconfirmed(unconfirmed_txs); + self.persist.stage(ChangeSet::from(indexed_graph_changeset)); + } } impl AsRef> for Wallet { diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index e790b8a8e..ce5e863bb 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -43,11 +43,13 @@ pub struct Emitter<'c, C> { } impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { - /// Construct a new [`Emitter`] with the given RPC `client`, `last_cp` and `start_height`. + /// Construct a new [`Emitter`]. /// - /// * `last_cp` is the check point used to find the latest block which is still part of the best - /// chain. - /// * `start_height` is the block height to start emitting blocks from. + /// `last_cp` informs the emitter of the chain we are starting off with. This way, the emitter + /// can start emission from a block that connects to the original chain. + /// + /// `start_height` starts emission from a given height (if there are no conflicts with the + /// original chain). pub fn new(client: &'c C, last_cp: CheckPoint, start_height: u32) -> Self { Self { client, @@ -127,13 +129,58 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { } /// Emit the next block height and header (if any). - pub fn next_header(&mut self) -> Result, bitcoincore_rpc::Error> { - poll(self, |hash| self.client.get_block_header(hash)) + pub fn next_header(&mut self) -> Result>, bitcoincore_rpc::Error> { + Ok(poll(self, |hash| self.client.get_block_header(hash))? + .map(|(checkpoint, block)| BlockEvent { block, checkpoint })) } /// Emit the next block height and block (if any). - pub fn next_block(&mut self) -> Result, bitcoincore_rpc::Error> { - poll(self, |hash| self.client.get_block(hash)) + pub fn next_block(&mut self) -> Result>, bitcoincore_rpc::Error> { + Ok(poll(self, |hash| self.client.get_block(hash))? + .map(|(checkpoint, block)| BlockEvent { block, checkpoint })) + } +} + +/// A newly emitted block from [`Emitter`]. +#[derive(Debug)] +pub struct BlockEvent { + /// Either a full [`Block`] or [`Header`] of the new block. + pub block: B, + + /// The checkpoint of the new block. + /// + /// A [`CheckPoint`] is a node of a linked list of [`BlockId`]s. This checkpoint is linked to + /// all [`BlockId`]s originally passed in [`Emitter::new`] as well as emitted blocks since then. + /// These blocks are guaranteed to be of the same chain. + /// + /// This is important as BDK structures require block-to-apply to be connected with another + /// block in the original chain. + pub checkpoint: CheckPoint, +} + +impl BlockEvent { + /// The block height of this new block. + pub fn block_height(&self) -> u32 { + self.checkpoint.height() + } + + /// The block hash of this new block. + pub fn block_hash(&self) -> BlockHash { + self.checkpoint.hash() + } + + /// The [`BlockId`] of a previous block that this block connects to. + /// + /// This either returns a [`BlockId`] of a previously emitted block or from the chain we started + /// with (passed in as `last_cp` in [`Emitter::new`]). + /// + /// This value is derived from [`BlockEvent::checkpoint`]. + pub fn connected_to(&self) -> BlockId { + match self.checkpoint.prev() { + Some(prev_cp) => prev_cp.block_id(), + // there is no previous checkpoint, so just connect with itself + None => self.checkpoint.block_id(), + } } } @@ -203,7 +250,7 @@ where fn poll( emitter: &mut Emitter, get_item: F, -) -> Result, bitcoincore_rpc::Error> +) -> Result, bitcoincore_rpc::Error> where C: bitcoincore_rpc::RpcApi, F: Fn(&BlockHash) -> Result, @@ -215,13 +262,14 @@ where let hash = res.hash; let item = get_item(&hash)?; - emitter.last_cp = emitter + let new_cp = emitter .last_cp .clone() .push(BlockId { height, hash }) .expect("must push"); + emitter.last_cp = new_cp.clone(); emitter.last_block = Some(res); - return Ok(Some((height, item))); + return Ok(Some((new_cp, item))); } PollResponse::NoMoreBlocks => { emitter.last_block = None; diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index 521124e5d..52d709301 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -157,28 +157,6 @@ impl TestEnv { } } -fn block_to_chain_update(block: &bitcoin::Block, height: u32) -> local_chain::Update { - let this_id = BlockId { - height, - hash: block.block_hash(), - }; - let tip = if block.header.prev_blockhash == BlockHash::all_zeros() { - CheckPoint::new(this_id) - } else { - CheckPoint::new(BlockId { - height: height - 1, - hash: block.header.prev_blockhash, - }) - .extend(core::iter::once(this_id)) - .expect("must construct checkpoint") - }; - - local_chain::Update { - tip, - introduce_older_blocks: false, - } -} - /// Ensure that blocks are emitted in order even after reorg. /// /// 1. Mine 101 blocks. @@ -200,17 +178,21 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> { // see if the emitter outputs the right blocks println!("first sync:"); - while let Some((height, block)) = emitter.next_block()? { + while let Some(emission) = emitter.next_block()? { + let height = emission.block_height(); + let hash = emission.block_hash(); assert_eq!( - block.block_hash(), + emission.block_hash(), exp_hashes[height as usize], "emitted block hash is unexpected" ); - let chain_update = block_to_chain_update(&block, height); assert_eq!( - local_chain.apply_update(chain_update)?, - BTreeMap::from([(height, Some(block.block_hash()))]), + local_chain.apply_update(local_chain::Update { + tip: emission.checkpoint, + introduce_older_blocks: false, + })?, + BTreeMap::from([(height, Some(hash))]), "chain update changeset is unexpected", ); } @@ -237,27 +219,30 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> { // see if the emitter outputs the right blocks println!("after reorg:"); let mut exp_height = exp_hashes.len() - reorged_blocks.len(); - while let Some((height, block)) = emitter.next_block()? { + while let Some(emission) = emitter.next_block()? { + let height = emission.block_height(); + let hash = emission.block_hash(); assert_eq!( height, exp_height as u32, "emitted block has unexpected height" ); assert_eq!( - block.block_hash(), - exp_hashes[height as usize], + hash, exp_hashes[height as usize], "emitted block is unexpected" ); - let chain_update = block_to_chain_update(&block, height); assert_eq!( - local_chain.apply_update(chain_update)?, + local_chain.apply_update(local_chain::Update { + tip: emission.checkpoint, + introduce_older_blocks: false, + })?, if exp_height == exp_hashes.len() - reorged_blocks.len() { - core::iter::once((height, Some(block.block_hash()))) + core::iter::once((height, Some(hash))) .chain((height + 1..exp_hashes.len() as u32).map(|h| (h, None))) .collect::() } else { - BTreeMap::from([(height, Some(block.block_hash()))]) + BTreeMap::from([(height, Some(hash))]) }, "chain update changeset is unexpected", ); @@ -307,9 +292,13 @@ fn test_into_tx_graph() -> anyhow::Result<()> { let emitter = &mut Emitter::new(&env.client, chain.tip(), 0); - while let Some((height, block)) = emitter.next_block()? { - let _ = chain.apply_update(block_to_chain_update(&block, height))?; - let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height); + while let Some(emission) = emitter.next_block()? { + let height = emission.block_height(); + let _ = chain.apply_update(local_chain::Update { + tip: emission.checkpoint, + introduce_older_blocks: false, + })?; + let indexed_additions = indexed_tx_graph.apply_block_relevant(&emission.block, height); assert!(indexed_additions.is_empty()); } @@ -367,10 +356,13 @@ fn test_into_tx_graph() -> anyhow::Result<()> { // must receive mined block which will confirm the transactions. { - let (height, block) = emitter.next_block()?.expect("must get mined block"); - let _ = chain - .apply_update(CheckPoint::from_header(&block.header, height).into_update(false))?; - let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height); + let emission = emitter.next_block()?.expect("must get mined block"); + let height = emission.block_height(); + let _ = chain.apply_update(local_chain::Update { + tip: emission.checkpoint, + introduce_older_blocks: false, + })?; + let indexed_additions = indexed_tx_graph.apply_block_relevant(&emission.block, height); assert!(indexed_additions.graph.txs.is_empty()); assert!(indexed_additions.graph.txouts.is_empty()); assert_eq!(indexed_additions.graph.anchors, exp_anchors); @@ -407,9 +399,12 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> { for reorg_count in 1..=10 { let replaced_blocks = env.reorg_empty_blocks(reorg_count)?; - let (height, next_header) = emitter.next_header()?.expect("must emit block after reorg"); + let next_emission = emitter.next_header()?.expect("must emit block after reorg"); assert_eq!( - (height as usize, next_header.block_hash()), + ( + next_emission.block_height() as usize, + next_emission.block_hash() + ), replaced_blocks[0], "block emitted after reorg should be at the reorg height" ); @@ -439,8 +434,9 @@ fn sync_from_emitter( where C: bitcoincore_rpc::RpcApi, { - while let Some((height, block)) = emitter.next_block()? { - process_block(recv_chain, recv_graph, block, height)?; + while let Some(emission) = emitter.next_block()? { + let height = emission.block_height(); + process_block(recv_chain, recv_graph, emission.block, height)?; } Ok(()) } @@ -660,7 +656,8 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<() // At this point, the emitter has seen all mempool transactions. It should only re-emit those // that have introduction heights less than the emitter's last-emitted block tip. - while let Some((height, _)) = emitter.next_header()? { + while let Some(emission) = emitter.next_header()? { + let height = emission.block_height(); // We call `mempool()` twice. // The second call (at height `h`) should skip the tx introduced at height `h`. for try_index in 0..2 { @@ -754,7 +751,8 @@ fn mempool_during_reorg() -> anyhow::Result<()> { .collect::>()); // `next_header` emits the replacement block of the reorg - if let Some((height, _)) = emitter.next_header()? { + if let Some(emission) = emitter.next_header()? { + let height = emission.block_height(); println!("\t- replacement height: {}", height); // the mempool emission (that follows the first block emission after reorg) should only @@ -835,12 +833,12 @@ fn no_agreement_point() -> anyhow::Result<()> { env.mine_blocks(PREMINE_COUNT, None)?; // emit block 99a - let (_, block_header_99a) = emitter.next_header()?.expect("block 99a header"); + let block_header_99a = emitter.next_header()?.expect("block 99a header").block; let block_hash_99a = block_header_99a.block_hash(); let block_hash_98a = block_header_99a.prev_blockhash; // emit block 100a - let (_, block_header_100a) = emitter.next_header()?.expect("block 100a header"); + let block_header_100a = emitter.next_header()?.expect("block 100a header").block; let block_hash_100a = block_header_100a.block_hash(); // get hash for block 101a @@ -855,7 +853,7 @@ fn no_agreement_point() -> anyhow::Result<()> { env.mine_blocks(3, None)?; // emit block header 99b - let (_, block_header_99b) = emitter.next_header()?.expect("block 99b header"); + let block_header_99b = emitter.next_header()?.expect("block 99b header").block; let block_hash_99b = block_header_99b.block_hash(); let block_hash_98b = block_header_99b.prev_blockhash; diff --git a/crates/chain/src/indexed_tx_graph.rs b/crates/chain/src/indexed_tx_graph.rs index 777b5d978..c2b83600b 100644 --- a/crates/chain/src/indexed_tx_graph.rs +++ b/crates/chain/src/indexed_tx_graph.rs @@ -224,20 +224,26 @@ where /// Irrelevant transactions in `txs` will be ignored. pub fn apply_block_relevant( &mut self, - block: Block, + block: &Block, height: u32, ) -> ChangeSet { let block_id = BlockId { hash: block.block_hash(), height, }; - let txs = block.txdata.iter().enumerate().map(|(tx_pos, tx)| { - ( - tx, - core::iter::once(A::from_block_position(&block, block_id, tx_pos)), - ) - }); - self.batch_insert_relevant(txs) + let mut changeset = ChangeSet::::default(); + for (tx_pos, tx) in block.txdata.iter().enumerate() { + changeset.indexer.append(self.index.index_tx(tx)); + if self.index.is_tx_relevant(tx) { + let txid = tx.txid(); + let anchor = A::from_block_position(block, block_id, tx_pos); + changeset.graph.append(self.graph.insert_tx(tx.clone())); + changeset + .graph + .append(self.graph.insert_anchor(txid, anchor)); + } + } + changeset } /// Batch insert all transactions of the given `block` of `height`. diff --git a/crates/chain/src/local_chain.rs b/crates/chain/src/local_chain.rs index 32fd72852..9be62dee3 100644 --- a/crates/chain/src/local_chain.rs +++ b/crates/chain/src/local_chain.rs @@ -5,6 +5,7 @@ use core::convert::Infallible; use crate::collections::BTreeMap; use crate::{BlockId, ChainOracle}; use alloc::sync::Arc; +use bitcoin::block::Header; use bitcoin::BlockHash; /// The [`ChangeSet`] represents changes to [`LocalChain`]. @@ -39,6 +40,28 @@ impl CheckPoint { Self(Arc::new(CPInner { block, prev: None })) } + /// Construct a checkpoint from a list of [`BlockId`]s in ascending height order. + /// + /// # Errors + /// + /// This method will error if any of the follow occurs: + /// + /// - The `blocks` iterator is empty, in which case, the error will be `None`. + /// - The `blocks` iterator is not in ascending height order. + /// - The `blocks` iterator contains multiple [`BlockId`]s of the same height. + /// + /// The error type is the last successful checkpoint constructed (if any). + pub fn from_block_ids( + block_ids: impl IntoIterator, + ) -> Result> { + let mut blocks = block_ids.into_iter(); + let mut acc = CheckPoint::new(blocks.next().ok_or(None)?); + for id in blocks { + acc = acc.push(id).map_err(Some)?; + } + Ok(acc) + } + /// Construct a checkpoint from the given `header` and block `height`. /// /// If `header` is of the genesis block, the checkpoint won't have a [`prev`] node. Otherwise, @@ -347,6 +370,95 @@ impl LocalChain { Ok(changeset) } + /// Update the chain with a given [`Header`] at `height` which you claim is connected to a existing block in the chain. + /// + /// This is useful when you have a block header that you want to record as part of the chain but + /// don't necessarily know that the `prev_blockhash` is in the chain. + /// + /// This will usually insert two new [`BlockId`]s into the chain: the header's block and the + /// header's `prev_blockhash` block. `connected_to` must already be in the chain but is allowed + /// to be `prev_blockhash` (in which case only one new block id will be inserted). + /// To be successful, `connected_to` must be chosen carefully so that `LocalChain`'s [update + /// rules][`apply_update`] are satisfied. + /// + /// # Errors + /// + /// [`ApplyHeaderError::InconsistentBlocks`] occurs if the `connected_to` block and the + /// [`Header`] is inconsistent. For example, if the `connected_to` block is the same height as + /// `header` or `prev_blockhash`, but has a different block hash. Or if the `connected_to` + /// height is greater than the header's `height`. + /// + /// [`ApplyHeaderError::CannotConnect`] occurs if the internal call to [`apply_update`] fails. + /// + /// [`apply_update`]: Self::apply_update + pub fn apply_header_connected_to( + &mut self, + header: &Header, + height: u32, + connected_to: BlockId, + ) -> Result { + let this = BlockId { + height, + hash: header.block_hash(), + }; + let prev = height.checked_sub(1).map(|prev_height| BlockId { + height: prev_height, + hash: header.prev_blockhash, + }); + let conn = match connected_to { + // `connected_to` can be ignored if same as `this` or `prev` (duplicate) + conn if conn == this || Some(conn) == prev => None, + // this occurs if: + // - `connected_to` height is the same as `prev`, but different hash + // - `connected_to` height is the same as `this`, but different hash + // - `connected_to` height is greater than `this` (this is not allowed) + conn if conn.height >= height.saturating_sub(1) => { + return Err(ApplyHeaderError::InconsistentBlocks) + } + conn => Some(conn), + }; + + let update = Update { + tip: CheckPoint::from_block_ids([conn, prev, Some(this)].into_iter().flatten()) + .expect("block ids must be in order"), + introduce_older_blocks: false, + }; + + self.apply_update(update) + .map_err(ApplyHeaderError::CannotConnect) + } + + /// Update the chain with a given [`Header`] connecting it with the previous block. + /// + /// This is a convenience method to call [`apply_header_connected_to`] with the `connected_to` + /// parameter being `height-1:prev_blockhash`. If there is no previous block (i.e. genesis), we + /// use the current block as `connected_to`. + /// + /// [`apply_header_connected_to`]: LocalChain::apply_header_connected_to + pub fn apply_header( + &mut self, + header: &Header, + height: u32, + ) -> Result { + let connected_to = match height.checked_sub(1) { + Some(prev_height) => BlockId { + height: prev_height, + hash: header.prev_blockhash, + }, + None => BlockId { + height, + hash: header.block_hash(), + }, + }; + self.apply_header_connected_to(header, height, connected_to) + .map_err(|err| match err { + ApplyHeaderError::InconsistentBlocks => { + unreachable!("connected_to is derived from the block so is always consistent") + } + ApplyHeaderError::CannotConnect(err) => err, + }) + } + /// Apply the given `changeset`. pub fn apply_changeset(&mut self, changeset: &ChangeSet) -> Result<(), MissingGenesisError> { if let Some(start_height) = changeset.keys().next().cloned() { @@ -557,6 +669,30 @@ impl core::fmt::Display for CannotConnectError { #[cfg(feature = "std")] impl std::error::Error for CannotConnectError {} +/// The error type for [`LocalChain::apply_header_connected_to`]. +#[derive(Debug, Clone, PartialEq)] +pub enum ApplyHeaderError { + /// Occurs when `connected_to` block conflicts with either the current block or previous block. + InconsistentBlocks, + /// Occurs when the update cannot connect with the original chain. + CannotConnect(CannotConnectError), +} + +impl core::fmt::Display for ApplyHeaderError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + ApplyHeaderError::InconsistentBlocks => write!( + f, + "the `connected_to` block conflicts with either the current or previous block" + ), + ApplyHeaderError::CannotConnect(err) => core::fmt::Display::fmt(err, f), + } + } +} + +#[cfg(feature = "std")] +impl std::error::Error for ApplyHeaderError {} + fn merge_chains( original_tip: CheckPoint, update_tip: CheckPoint, diff --git a/crates/chain/tests/test_local_chain.rs b/crates/chain/tests/test_local_chain.rs index 25cbbb08e..c1a1cd7f9 100644 --- a/crates/chain/tests/test_local_chain.rs +++ b/crates/chain/tests/test_local_chain.rs @@ -1,7 +1,11 @@ -use bdk_chain::local_chain::{ - AlterCheckPointError, CannotConnectError, ChangeSet, LocalChain, MissingGenesisError, Update, +use bdk_chain::{ + local_chain::{ + AlterCheckPointError, ApplyHeaderError, CannotConnectError, ChangeSet, CheckPoint, + LocalChain, MissingGenesisError, Update, + }, + BlockId, }; -use bitcoin::BlockHash; +use bitcoin::{block::Header, hashes::Hash, BlockHash}; #[macro_use] mod common; @@ -288,6 +292,27 @@ fn update_local_chain() { ], }, }, + // Allow update that is shorter than original chain + // | 0 | 1 | 2 | 3 | 4 | 5 + // chain | A C D E F + // update | A C D' + TestLocalChain { + name: "allow update that is shorter than original chain", + chain: local_chain![(0, h!("_")), (2, h!("C")), (3, h!("D")), (4, h!("E")), (5, h!("F"))], + update: chain_update![(0, h!("_")), (2, h!("C")), (3, h!("D'"))], + exp: ExpectedResult::Ok { + changeset: &[ + (3, Some(h!("D'"))), + (4, None), + (5, None), + ], + init_changeset: &[ + (0, Some(h!("_"))), + (2, Some(h!("C"))), + (3, Some(h!("D'"))), + ], + }, + }, ] .into_iter() .for_each(TestLocalChain::run); @@ -423,3 +448,234 @@ fn local_chain_disconnect_from() { ); } } + +#[test] +fn checkpoint_from_block_ids() { + struct TestCase<'a> { + name: &'a str, + blocks: &'a [(u32, BlockHash)], + exp_result: Result<(), Option<(u32, BlockHash)>>, + } + + let test_cases = [ + TestCase { + name: "in_order", + blocks: &[(0, h!("A")), (1, h!("B")), (3, h!("D"))], + exp_result: Ok(()), + }, + TestCase { + name: "with_duplicates", + blocks: &[(1, h!("B")), (2, h!("C")), (2, h!("C'"))], + exp_result: Err(Some((2, h!("C")))), + }, + TestCase { + name: "not_in_order", + blocks: &[(1, h!("B")), (3, h!("D")), (2, h!("C"))], + exp_result: Err(Some((3, h!("D")))), + }, + TestCase { + name: "empty", + blocks: &[], + exp_result: Err(None), + }, + TestCase { + name: "single", + blocks: &[(21, h!("million"))], + exp_result: Ok(()), + }, + ]; + + for (i, t) in test_cases.into_iter().enumerate() { + println!("running test case {}: '{}'", i, t.name); + let result = CheckPoint::from_block_ids( + t.blocks + .iter() + .map(|&(height, hash)| BlockId { height, hash }), + ); + match t.exp_result { + Ok(_) => { + assert!(result.is_ok(), "[{}:{}] should be Ok", i, t.name); + let result_vec = { + let mut v = result + .unwrap() + .into_iter() + .map(|cp| (cp.height(), cp.hash())) + .collect::>(); + v.reverse(); + v + }; + assert_eq!( + &result_vec, t.blocks, + "[{}:{}] not equal to original block ids", + i, t.name + ); + } + Err(exp_last) => { + assert!(result.is_err(), "[{}:{}] should be Err", i, t.name); + let err = result.unwrap_err(); + assert_eq!( + err.as_ref() + .map(|last_cp| (last_cp.height(), last_cp.hash())), + exp_last, + "[{}:{}] error's last cp height should be {:?}, got {:?}", + i, + t.name, + exp_last, + err + ); + } + } + } +} + +#[test] +fn local_chain_apply_header_connected_to() { + fn header_from_prev_blockhash(prev_blockhash: BlockHash) -> Header { + Header { + version: bitcoin::block::Version::default(), + prev_blockhash, + merkle_root: bitcoin::hash_types::TxMerkleNode::all_zeros(), + time: 0, + bits: bitcoin::CompactTarget::default(), + nonce: 0, + } + } + + struct TestCase { + name: &'static str, + chain: LocalChain, + header: Header, + height: u32, + connected_to: BlockId, + exp_result: Result)>, ApplyHeaderError>, + } + + let test_cases = [ + { + let header = header_from_prev_blockhash(h!("A")); + let hash = header.block_hash(); + let height = 2; + let connected_to = BlockId { height, hash }; + TestCase { + name: "connected_to_self_header_applied_to_self", + chain: local_chain![(0, h!("_")), (height, hash)], + header, + height, + connected_to, + exp_result: Ok(vec![]), + } + }, + { + let prev_hash = h!("A"); + let prev_height = 1; + let header = header_from_prev_blockhash(prev_hash); + let hash = header.block_hash(); + let height = prev_height + 1; + let connected_to = BlockId { + height: prev_height, + hash: prev_hash, + }; + TestCase { + name: "connected_to_prev_header_applied_to_self", + chain: local_chain![(0, h!("_")), (prev_height, prev_hash)], + header, + height, + connected_to, + exp_result: Ok(vec![(height, Some(hash))]), + } + }, + { + let header = header_from_prev_blockhash(BlockHash::all_zeros()); + let hash = header.block_hash(); + let height = 0; + let connected_to = BlockId { height, hash }; + TestCase { + name: "genesis_applied_to_self", + chain: local_chain![(0, hash)], + header, + height, + connected_to, + exp_result: Ok(vec![]), + } + }, + { + let header = header_from_prev_blockhash(h!("Z")); + let height = 10; + let hash = header.block_hash(); + let prev_height = height - 1; + let prev_hash = header.prev_blockhash; + TestCase { + name: "connect_at_connected_to", + chain: local_chain![(0, h!("_")), (2, h!("B")), (3, h!("C"))], + header, + height: 10, + connected_to: BlockId { + height: 3, + hash: h!("C"), + }, + exp_result: Ok(vec![(prev_height, Some(prev_hash)), (height, Some(hash))]), + } + }, + { + let prev_hash = h!("A"); + let prev_height = 1; + let header = header_from_prev_blockhash(prev_hash); + let connected_to = BlockId { + height: prev_height, + hash: h!("not_prev_hash"), + }; + TestCase { + name: "inconsistent_prev_hash", + chain: local_chain![(0, h!("_")), (prev_height, h!("not_prev_hash"))], + header, + height: prev_height + 1, + connected_to, + exp_result: Err(ApplyHeaderError::InconsistentBlocks), + } + }, + { + let prev_hash = h!("A"); + let prev_height = 1; + let header = header_from_prev_blockhash(prev_hash); + let height = prev_height + 1; + let connected_to = BlockId { + height, + hash: h!("not_current_hash"), + }; + TestCase { + name: "inconsistent_current_block", + chain: local_chain![(0, h!("_")), (height, h!("not_current_hash"))], + header, + height, + connected_to, + exp_result: Err(ApplyHeaderError::InconsistentBlocks), + } + }, + { + let header = header_from_prev_blockhash(h!("B")); + let height = 3; + let connected_to = BlockId { + height: 4, + hash: h!("D"), + }; + TestCase { + name: "connected_to_is_greater", + chain: local_chain![(0, h!("_")), (2, h!("B"))], + header, + height, + connected_to, + exp_result: Err(ApplyHeaderError::InconsistentBlocks), + } + }, + ]; + + for (i, t) in test_cases.into_iter().enumerate() { + println!("running test case {}: '{}'", i, t.name); + let mut chain = t.chain; + let result = chain.apply_header_connected_to(&t.header, t.height, t.connected_to); + let exp_result = t + .exp_result + .map(|cs| cs.iter().cloned().collect::()); + assert_eq!(result, exp_result, "[{}:{}] unexpected result", i, t.name); + } +} diff --git a/example-crates/example_bitcoind_rpc_polling/README.md b/example-crates/example_bitcoind_rpc_polling/README.md new file mode 100644 index 000000000..fef82ab1c --- /dev/null +++ b/example-crates/example_bitcoind_rpc_polling/README.md @@ -0,0 +1,68 @@ +# Example RPC CLI + +### Simple Regtest Test + +1. Start local regtest bitcoind. + ``` + mkdir -p /tmp/regtest/bitcoind + bitcoind -regtest -server -fallbackfee=0.0002 -rpcuser= -rpcpassword= -datadir=/tmp/regtest/bitcoind -daemon + ``` +2. Create a test bitcoind wallet and set bitcoind env. + ``` + bitcoin-cli -datadir=/tmp/regtest/bitcoind -regtest -rpcuser= -rpcpassword= -named createwallet wallet_name="test" + export RPC_URL=127.0.0.1:18443 + export RPC_USER= + export RPC_PASS= + ``` +3. Get test bitcoind wallet info. + ``` + bitcoin-cli -rpcwallet="test" -rpcuser= -rpcpassword= -datadir=/tmp/regtest/bitcoind -regtest getwalletinfo + ``` +4. Get new test bitcoind wallet address. + ``` + BITCOIND_ADDRESS=$(bitcoin-cli -rpcwallet="test" -datadir=/tmp/regtest/bitcoind -regtest -rpcuser= -rpcpassword= getnewaddress) + echo $BITCOIND_ADDRESS + ``` +5. Generate 101 blocks with reward to test bitcoind wallet address. + ``` + bitcoin-cli -datadir=/tmp/regtest/bitcoind -regtest -rpcuser= -rpcpassword= generatetoaddress 101 $BITCOIND_ADDRESS + ``` +6. Verify test bitcoind wallet balance. + ``` + bitcoin-cli -rpcwallet="test" -datadir=/tmp/regtest/bitcoind -regtest -rpcuser= -rpcpassword= getbalances + ``` +7. Set descriptor env and get address from RPC CLI wallet. + ``` + export DESCRIPTOR="wpkh(tprv8ZgxMBicQKsPfK9BTf82oQkHhawtZv19CorqQKPFeaHDMA4dXYX6eWsJGNJ7VTQXWmoHdrfjCYuDijcRmNFwSKcVhswzqs4fugE8turndGc/1/*)" + cargo run -- --network regtest address next + ``` +8. Send 5 test bitcoin to RPC CLI wallet. + ``` + bitcoin-cli -rpcwallet="test" -datadir=/tmp/regtest/bitcoind -regtest -rpcuser= -rpcpassword= sendtoaddress
5 + ``` +9. Sync blockchain with RPC CLI wallet. + ``` + cargo run -- --network regtest sync + + ``` +10. Get RPC CLI wallet unconfirmed balances. + ``` + cargo run -- --network regtest balance + ``` +11. Generate 1 block with reward to test bitcoind wallet address. + ``` + bitcoin-cli -datadir=/tmp/regtest/bitcoind -rpcuser= -rpcpassword= -regtest generatetoaddress 10 $BITCOIND_ADDRESS + ``` +12. Sync the blockchain with RPC CLI wallet. + ``` + cargo run -- --network regtest sync + + ``` +13. Get RPC CLI wallet confirmed balances. + ``` + cargo run -- --network regtest balance + ``` +14. Get RPC CLI wallet transactions. + ``` + cargo run -- --network regtest txout list + ``` \ No newline at end of file diff --git a/example-crates/example_bitcoind_rpc_polling/src/main.rs b/example-crates/example_bitcoind_rpc_polling/src/main.rs index 449242e41..aff5fc99e 100644 --- a/example-crates/example_bitcoind_rpc_polling/src/main.rs +++ b/example-crates/example_bitcoind_rpc_polling/src/main.rs @@ -14,7 +14,7 @@ use bdk_bitcoind_rpc::{ use bdk_chain::{ bitcoin::{constants::genesis_block, Block, Transaction}, indexed_tx_graph, keychain, - local_chain::{self, CheckPoint, LocalChain}, + local_chain::{self, LocalChain}, ConfirmationTimeHeightAnchor, IndexedTxGraph, }; use example_cli::{ @@ -42,7 +42,7 @@ type ChangeSet = ( #[derive(Debug)] enum Emission { - Block { height: u32, block: Block }, + Block(bdk_bitcoind_rpc::BlockEvent), Mempool(Vec<(Transaction, u64)>), Tip(u32), } @@ -178,17 +178,20 @@ fn main() -> anyhow::Result<()> { let mut last_db_commit = Instant::now(); let mut last_print = Instant::now(); - while let Some((height, block)) = emitter.next_block()? { + while let Some(emission) = emitter.next_block()? { + let height = emission.block_height(); + let mut chain = chain.lock().unwrap(); let mut graph = graph.lock().unwrap(); let mut db = db.lock().unwrap(); - let chain_update = - CheckPoint::from_header(&block.header, height).into_update(false); let chain_changeset = chain - .apply_update(chain_update) + .apply_update(local_chain::Update { + tip: emission.checkpoint, + introduce_older_blocks: false, + }) .expect("must always apply as we receive blocks in order from emitter"); - let graph_changeset = graph.apply_block_relevant(block, height); + let graph_changeset = graph.apply_block_relevant(&emission.block, height); db.stage((chain_changeset, graph_changeset)); // commit staged db changes in intervals @@ -256,7 +259,8 @@ fn main() -> anyhow::Result<()> { loop { match emitter.next_block()? { - Some((height, block)) => { + Some(block_emission) => { + let height = block_emission.block_height(); if sigterm_flag.load(Ordering::Acquire) { break; } @@ -264,7 +268,7 @@ fn main() -> anyhow::Result<()> { block_count = rpc_client.get_block_count()? as u32; tx.send(Emission::Tip(block_count))?; } - tx.send(Emission::Block { height, block })?; + tx.send(Emission::Block(block_emission))?; } None => { if await_flag(&sigterm_flag, MEMPOOL_EMIT_DELAY) { @@ -293,13 +297,17 @@ fn main() -> anyhow::Result<()> { let mut chain = chain.lock().unwrap(); let changeset = match emission { - Emission::Block { height, block } => { - let chain_update = - CheckPoint::from_header(&block.header, height).into_update(false); + Emission::Block(block_emission) => { + let height = block_emission.block_height(); + let chain_update = local_chain::Update { + tip: block_emission.checkpoint, + introduce_older_blocks: false, + }; let chain_changeset = chain .apply_update(chain_update) .expect("must always apply as we receive blocks in order from emitter"); - let graph_changeset = graph.apply_block_relevant(block, height); + let graph_changeset = + graph.apply_block_relevant(&block_emission.block, height); (chain_changeset, graph_changeset) } Emission::Mempool(mempool_txs) => { diff --git a/example-crates/wallet_rpc/Cargo.toml b/example-crates/wallet_rpc/Cargo.toml new file mode 100644 index 000000000..174144e9b --- /dev/null +++ b/example-crates/wallet_rpc/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "wallet_rpc" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bdk = { path = "../../crates/bdk" } +bdk_file_store = { path = "../../crates/file_store" } +bdk_bitcoind_rpc = { path = "../../crates/bitcoind_rpc" } + +anyhow = "1" +clap = { version = "3.2.25", features = ["derive", "env"] } +ctrlc = "2.0.1" diff --git a/example-crates/wallet_rpc/README.md b/example-crates/wallet_rpc/README.md new file mode 100644 index 000000000..0a2cc2946 --- /dev/null +++ b/example-crates/wallet_rpc/README.md @@ -0,0 +1,45 @@ +# Wallet RPC Example + +``` +$ cargo run --bin wallet_rpc -- --help + +wallet_rpc 0.1.0 +Bitcoind RPC example usign `bdk::Wallet` + +USAGE: + wallet_rpc [OPTIONS] [CHANGE_DESCRIPTOR] + +ARGS: + Wallet descriptor [env: DESCRIPTOR=] + Wallet change descriptor [env: CHANGE_DESCRIPTOR=] + +OPTIONS: + --db-path + Where to store wallet data [env: BDK_DB_PATH=] [default: .bdk_wallet_rpc_example.db] + + -h, --help + Print help information + + --network + Bitcoin network to connect to [env: BITCOIN_NETWORK=] [default: testnet] + + --rpc-cookie + RPC auth cookie file [env: RPC_COOKIE=] + + --rpc-pass + RPC auth password [env: RPC_PASS=] + + --rpc-user + RPC auth username [env: RPC_USER=] + + --start-height + Earliest block height to start sync from [env: START_HEIGHT=] [default: 481824] + + --url + RPC URL [env: RPC_URL=] [default: 127.0.0.1:8332] + + -V, --version + Print version information + +``` + diff --git a/example-crates/wallet_rpc/src/main.rs b/example-crates/wallet_rpc/src/main.rs new file mode 100644 index 000000000..dc3b8bcdc --- /dev/null +++ b/example-crates/wallet_rpc/src/main.rs @@ -0,0 +1,182 @@ +use bdk::{ + bitcoin::{Block, Network, Transaction}, + wallet::Wallet, +}; +use bdk_bitcoind_rpc::{ + bitcoincore_rpc::{Auth, Client, RpcApi}, + Emitter, +}; +use bdk_file_store::Store; +use clap::{self, Parser}; +use std::{path::PathBuf, sync::mpsc::sync_channel, thread::spawn, time::Instant}; + +const DB_MAGIC: &str = "bdk-rpc-wallet-example"; + +/// Bitcoind RPC example usign `bdk::Wallet`. +/// +/// This syncs the chain block-by-block and prints the current balance, transaction count and UTXO +/// count. +#[derive(Parser, Debug)] +#[clap(author, version, about, long_about = None)] +#[clap(propagate_version = true)] +pub struct Args { + /// Wallet descriptor + #[clap(env = "DESCRIPTOR")] + pub descriptor: String, + /// Wallet change descriptor + #[clap(env = "CHANGE_DESCRIPTOR")] + pub change_descriptor: Option, + /// Earliest block height to start sync from + #[clap(env = "START_HEIGHT", long, default_value = "481824")] + pub start_height: u32, + /// Bitcoin network to connect to + #[clap(env = "BITCOIN_NETWORK", long, default_value = "testnet")] + pub network: Network, + /// Where to store wallet data + #[clap( + env = "BDK_DB_PATH", + long, + default_value = ".bdk_wallet_rpc_example.db" + )] + pub db_path: PathBuf, + + /// RPC URL + #[clap(env = "RPC_URL", long, default_value = "127.0.0.1:8332")] + pub url: String, + /// RPC auth cookie file + #[clap(env = "RPC_COOKIE", long)] + pub rpc_cookie: Option, + /// RPC auth username + #[clap(env = "RPC_USER", long)] + pub rpc_user: Option, + /// RPC auth password + #[clap(env = "RPC_PASS", long)] + pub rpc_pass: Option, +} + +impl Args { + fn client(&self) -> anyhow::Result { + Ok(Client::new( + &self.url, + match (&self.rpc_cookie, &self.rpc_user, &self.rpc_pass) { + (None, None, None) => Auth::None, + (Some(path), _, _) => Auth::CookieFile(path.clone()), + (_, Some(user), Some(pass)) => Auth::UserPass(user.clone(), pass.clone()), + (_, Some(_), None) => panic!("rpc auth: missing rpc_pass"), + (_, None, Some(_)) => panic!("rpc auth: missing rpc_user"), + }, + )?) + } +} + +#[derive(Debug)] +enum Emission { + SigTerm, + Block(bdk_bitcoind_rpc::BlockEvent), + Mempool(Vec<(Transaction, u64)>), +} + +fn main() -> anyhow::Result<()> { + let args = Args::parse(); + + let rpc_client = args.client()?; + println!( + "Connected to Bitcoin Core RPC at {:?}", + rpc_client.get_blockchain_info().unwrap() + ); + + let start_load_wallet = Instant::now(); + let mut wallet = Wallet::new_or_load( + &args.descriptor, + args.change_descriptor.as_ref(), + Store::::open_or_create_new(DB_MAGIC.as_bytes(), args.db_path)?, + args.network, + )?; + println!( + "Loaded wallet in {}s", + start_load_wallet.elapsed().as_secs_f32() + ); + + let balance = wallet.get_balance(); + println!("Wallet balance before syncing: {} sats", balance.total()); + + let wallet_tip = wallet.latest_checkpoint(); + println!( + "Wallet tip: {} at height {}", + wallet_tip.hash(), + wallet_tip.height() + ); + + let (sender, receiver) = sync_channel::(21); + + let signal_sender = sender.clone(); + ctrlc::set_handler(move || { + signal_sender + .send(Emission::SigTerm) + .expect("failed to send sigterm") + }); + + let emitter_tip = wallet_tip.clone(); + spawn(move || -> Result<(), anyhow::Error> { + let mut emitter = Emitter::new(&rpc_client, emitter_tip, args.start_height); + while let Some(emission) = emitter.next_block()? { + sender.send(Emission::Block(emission))?; + } + sender.send(Emission::Mempool(emitter.mempool()?))?; + Ok(()) + }); + + let mut blocks_received = 0_usize; + for emission in receiver { + match emission { + Emission::SigTerm => { + println!("Sigterm received, exiting..."); + break; + } + Emission::Block(block_emission) => { + blocks_received += 1; + let height = block_emission.block_height(); + let hash = block_emission.block_hash(); + let connected_to = block_emission.connected_to(); + let start_apply_block = Instant::now(); + wallet.apply_block_connected_to(&block_emission.block, height, connected_to)?; + wallet.commit()?; + let elapsed = start_apply_block.elapsed().as_secs_f32(); + println!( + "Applied block {} at height {} in {}s", + hash, height, elapsed + ); + } + Emission::Mempool(mempool_emission) => { + let start_apply_mempool = Instant::now(); + wallet.apply_unconfirmed_txs(mempool_emission.iter().map(|(tx, time)| (tx, *time))); + wallet.commit()?; + println!( + "Applied unconfirmed transactions in {}s", + start_apply_mempool.elapsed().as_secs_f32() + ); + break; + } + } + } + let wallet_tip_end = wallet.latest_checkpoint(); + let balance = wallet.get_balance(); + println!( + "Synced {} blocks in {}s", + blocks_received, + start_load_wallet.elapsed().as_secs_f32(), + ); + println!( + "Wallet tip is '{}:{}'", + wallet_tip_end.height(), + wallet_tip_end.hash() + ); + println!("Wallet balance is {} sats", balance.total()); + println!( + "Wallet has {} transactions and {} utxos", + wallet.transactions().count(), + wallet.list_unspent().count() + ); + + Ok(()) +}