diff --git a/.github/workflows/cont_integration.yml b/.github/workflows/cont_integration.yml index f02dcb834..2ec2e0a3a 100644 --- a/.github/workflows/cont_integration.yml +++ b/.github/workflows/cont_integration.yml @@ -32,6 +32,7 @@ jobs: run: | cargo update -p log --precise "0.4.18" cargo update -p tempfile --precise "3.6.0" + cargo update -p minreq --precise "2.8.1" cargo update -p rustls:0.21.6 --precise "0.21.1" cargo update -p tokio:1.32.0 --precise "1.29.1" cargo update -p flate2:1.0.27 --precise "1.0.26" diff --git a/Cargo.toml b/Cargo.toml index 9fafb8b78..b235da3e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,8 +5,10 @@ members = [ "crates/file_store", "crates/electrum", "crates/esplora", + "crates/bitcoind_rpc", "example-crates/example_cli", "example-crates/example_electrum", + "example-crates/example_rpc", "example-crates/wallet_electrum", "example-crates/wallet_esplora_blocking", "example-crates/wallet_esplora_async", diff --git a/README.md b/README.md index ae230abbd..a0af8dd7f 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,8 @@ To build with the MSRV you will need to pin dependencies as follows: cargo update -p log --precise "0.4.18" # tempfile 3.7.0 has MSRV 1.63.0+ cargo update -p tempfile --precise "3.6.0" +# minreq 2.9.0 prevents pinning rustls to 0.21.1 +cargo update -p minreq --precise "2.8.1" # rustls 0.21.2 has MSRV 1.60.0+ cargo update -p rustls:0.21.6 --precise "0.21.1" # tokio 1.30 has MSRV 1.63.0+ @@ -79,7 +81,7 @@ cargo update -p flate2:1.0.27 --precise "1.0.26" cargo update -p reqwest --precise "0.11.18" # h2 0.3.21 has MSRV 1.63.0+ cargo update -p h2 --precise "0.3.20" -# rustls-webpki has MSRV 1.60.0+ +# rustls-webpki 0.100.2 has MSRV 1.60.0+ cargo update -p rustls-webpki --precise "0.100.1" ``` diff --git a/crates/bdk/src/wallet/mod.rs b/crates/bdk/src/wallet/mod.rs index 1ca78a775..ac3c808ee 100644 --- a/crates/bdk/src/wallet/mod.rs +++ b/crates/bdk/src/wallet/mod.rs @@ -22,10 +22,10 @@ use alloc::{ pub use bdk_chain::keychain::Balance; use bdk_chain::{ indexed_tx_graph, - keychain::{KeychainTxOutIndex, WalletChangeSet, WalletUpdate}, + keychain::{self, KeychainTxOutIndex}, local_chain::{self, CannotConnectError, CheckPoint, CheckPointIter, LocalChain}, tx_graph::{CanonicalTx, TxGraph}, - Append, BlockId, ChainPosition, ConfirmationTime, ConfirmationTimeAnchor, FullTxOut, + Anchor, Append, BlockId, ChainPosition, ConfirmationTime, ConfirmationTimeAnchor, FullTxOut, IndexedTxGraph, Persist, PersistBackend, }; use bitcoin::consensus::encode::serialize; @@ -94,6 +94,90 @@ pub struct Wallet { secp: SecpCtx, } +/// A structure to update [`Wallet`]. +/// +/// It updates [`bdk_chain::keychain::KeychainTxOutIndex`], [`bdk_chain::TxGraph`] and [`local_chain::LocalChain`] atomically. +#[derive(Debug, Clone)] +pub struct WalletUpdate { + /// Contains the last active derivation indices per keychain (`K`), which is used to update the + /// [`KeychainTxOutIndex`]. + pub last_active_indices: BTreeMap, + + /// Update for the [`TxGraph`]. + pub graph: TxGraph, + + /// Update for the [`LocalChain`]. + /// + /// [`LocalChain`]: local_chain::LocalChain + pub chain: Option, +} + +impl Default for WalletUpdate { + fn default() -> Self { + Self { + last_active_indices: BTreeMap::new(), + graph: TxGraph::default(), + chain: None, + } + } +} + +/// A structure that records the corresponding changes as result of applying an [`WalletUpdate`]. +#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] +pub struct WalletChangeSet { + /// Changes to the [`LocalChain`]. + /// + /// [`LocalChain`]: local_chain::LocalChain + pub chain: local_chain::ChangeSet, + + /// ChangeSet to [`IndexedTxGraph`]. + /// + /// [`IndexedTxGraph`]: bdk_chain::indexed_tx_graph::IndexedTxGraph + #[serde(bound( + deserialize = "K: Ord + serde::Deserialize<'de>, A: Ord + serde::Deserialize<'de>", + serialize = "K: Ord + serde::Serialize, A: Ord + serde::Serialize", + ))] + pub index_tx_graph: indexed_tx_graph::ChangeSet>, +} + +impl Default for WalletChangeSet { + fn default() -> Self { + Self { + chain: Default::default(), + index_tx_graph: Default::default(), + } + } +} + +impl Append for WalletChangeSet { + fn append(&mut self, other: Self) { + Append::append(&mut self.chain, other.chain); + Append::append(&mut self.index_tx_graph, other.index_tx_graph); + } + + fn is_empty(&self) -> bool { + self.chain.is_empty() && self.index_tx_graph.is_empty() + } +} + +impl From for WalletChangeSet { + fn from(chain: local_chain::ChangeSet) -> Self { + Self { + chain, + ..Default::default() + } + } +} + +impl From>> for WalletChangeSet { + fn from(index_tx_graph: indexed_tx_graph::ChangeSet>) -> Self { + Self { + index_tx_graph, + ..Default::default() + } + } +} + /// The update to a [`Wallet`] used in [`Wallet::apply_update`]. This is usually returned from blockchain data sources. pub type Update = WalletUpdate; @@ -1719,7 +1803,11 @@ impl Wallet { where D: PersistBackend, { - let mut changeset = ChangeSet::from(self.chain.apply_update(update.chain)?); + let mut changeset = match update.chain { + Some(chain_update) => ChangeSet::from(self.chain.apply_update(chain_update)?), + None => ChangeSet::default(), + }; + let (_, index_changeset) = self .indexed_graph .index diff --git a/crates/bitcoind_rpc/Cargo.toml b/crates/bitcoind_rpc/Cargo.toml new file mode 100644 index 000000000..2faf16eb1 --- /dev/null +++ b/crates/bitcoind_rpc/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "bdk_bitcoind_rpc" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bdk_chain = { path = "../chain", version = "0.5.0", features = ["serde", "miniscript"] } +bitcoincore-rpc = { version = "0.17.0" } + +[dev-dependencies] +bitcoind = { version = "0.32.0", features = ["25_0"] } +anyhow = { version = "1" } diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs new file mode 100644 index 000000000..c2e29ca78 --- /dev/null +++ b/crates/bitcoind_rpc/src/lib.rs @@ -0,0 +1,441 @@ +//! This crate is used for updating [`bdk_chain`] structures with data from the `bitcoind` RPC +//! interface (excluding the RPC wallet API). +//! +//! [`Emitter`] is the main structure which sources blockchain data from [`bitcoincore_rpc::Client`]. +//! +//! To only get block updates (exlude mempool transactions), the caller can use +//! [`Emitter::emit_block`] until it returns `Ok(None)` (which means the chain tip is reached). A +//! separate method, [`Emitter::emit_mempool`] can be used to emit the whole mempool. Another +//! method, [`Emitter::emit_update`] is avaliable, which emits block updates until the block tip is +//! reached, then the next update will be the mempool. +//! +//! # [`IntoIterator`] implementation +//! +//! [`Emitter`] implements [`IntoIterator`] which transforms itself into [`UpdateIter`]. The +//! iterator is implemented in a way that even after a call to [`Iterator::next`] returns [`None`], +//! subsequent calls may resume returning [`Some`]. +//! +//! The iterator initially returns blocks in increasing height order. After the chain tip is +//! reached, the next update is the mempool. After the mempool update is released, the first +//! succeeding call to [`Iterator::next`] will return [`None`]. +//! +//! This logic is useful if the caller wishes to "update once". +//! +//! ```rust,no_run +//! use bdk_bitcoind_rpc::{EmittedUpdate, Emitter}; +//! # let client: bdk_bitcoind_rpc::bitcoincore_rpc::Client = todo!(); +//! +//! for r in Emitter::new(&client, 709_632, None) { +//! let update = r.expect("todo: deal with the error properly"); +//! +//! match update.checkpoint() { +//! Some(cp) => println!("block {}:{}", cp.height(), cp.hash()), +//! None => println!("mempool!"), +//! } +//! } +//! ``` +//! +//! Alternatively, if the caller wishes to keep [`Emitter`] in a dedicated update-thread, the caller +//! can continue to poll [`Iterator::next`] with a delay. + +#![warn(missing_docs)] + +use bdk_chain::{ + bitcoin::{Block, Transaction}, + indexed_tx_graph::TxItem, + local_chain::{self, CheckPoint}, + BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor, +}; +pub use bitcoincore_rpc; +use bitcoincore_rpc::{json::GetBlockResult, RpcApi}; +use std::fmt::Debug; + +/// An update emitted from [`Emitter`]. This can either be of a block or a subset of +/// mempool transactions. +#[derive(Debug, Clone)] +pub enum EmittedUpdate { + /// An emitted block. + Block(EmittedBlock), + /// An emitted subset of mempool transactions. + /// + /// [`Emitter`] attempts to avoid re-emitting transactions. + Mempool(EmittedMempool), +} + +impl EmittedUpdate { + /// Returns whether the update is of a subset of the mempool. + pub fn is_mempool(&self) -> bool { + matches!(self, Self::Mempool { .. }) + } + + /// Returns whether the update is of a block. + pub fn is_block(&self) -> bool { + matches!(self, Self::Block { .. }) + } + + /// Get the emission's checkpoint. + /// + /// The emission will only have a checkpoint if it is the [`EmittedUpdate::Block`] variant. + pub fn checkpoint(&self) -> Option { + match self { + EmittedUpdate::Block(e) => Some(e.checkpoint()), + EmittedUpdate::Mempool(_) => None, + } + } + + /// Convenience method to get [`local_chain::Update`]. + pub fn chain_update(&self) -> Option { + Some(local_chain::Update { + tip: self.checkpoint()?, + introduce_older_blocks: false, + }) + } + + /// Return transaction items to be consumed by [`IndexedTxGraph::insert_relevant_txs`]. + /// + /// The `anchor_map` parameter takes in a closure that creates anchors of a specific type. + /// [`confirmation_height_anchor`] and [`confirmation_time_anchor`] are avaliable to create + /// updates with [`ConfirmationHeightAnchor`] and [`ConfirmationTimeAnchor`] respectively. + /// + /// [`IndexedTxGraph::insert_relevant_txs`]: bdk_chain::IndexedTxGraph::insert_relevant_txs + pub fn indexed_tx_graph_update(&self, anchor_map: M) -> Vec>> + where + M: Fn(&CheckPoint, &Block, usize) -> A, + A: Clone + Ord + PartialEq, + { + match self { + EmittedUpdate::Block(e) => e.indexed_tx_graph_update(anchor_map).collect(), + EmittedUpdate::Mempool(e) => e.indexed_tx_graph_update().collect(), + } + } +} + +/// An emitted block. +#[derive(Debug, Clone)] +pub struct EmittedBlock { + /// The checkpoint constructed from the block's height/hash and connected to the previous block. + pub cp: CheckPoint, + /// The actual block of the chain. + pub block: Block, +} + +impl EmittedBlock { + /// Get the emission's checkpoint. + pub fn checkpoint(&self) -> CheckPoint { + self.cp.clone() + } + + /// Convenience method to get [`local_chain::Update`]. + pub fn chain_update(&self) -> local_chain::Update { + local_chain::Update { + tip: self.cp.clone(), + introduce_older_blocks: false, + } + } + + /// Return transaction items to be consumed by [`IndexedTxGraph::insert_relevant_txs`]. + /// + /// Refer to [`EmittedUpdate::indexed_tx_graph_update`] for more. + /// + /// [`IndexedTxGraph::insert_relevant_txs`]: bdk_chain::IndexedTxGraph::insert_relevant_txs + pub fn indexed_tx_graph_update( + &self, + anchor_map: M, + ) -> impl Iterator>> + where + M: Fn(&CheckPoint, &Block, usize) -> A, + A: Clone + Ord + PartialEq, + { + self.block + .txdata + .iter() + .enumerate() + .map(move |(i, tx)| (tx, Some(anchor_map(&self.cp, &self.block, i)), None)) + } +} + +/// An emitted subset of mempool transactions. +#[derive(Debug, Clone)] +pub struct EmittedMempool { + /// Subset of mempool transactions as tuples of `(tx, seen_at)`. + /// + /// `seen_at` is the unix timestamp of when the transaction was first seen in the mempool. + pub txs: Vec<(Transaction, u64)>, +} + +impl EmittedMempool { + /// Return transaction items to be consumed by [`IndexedTxGraph::insert_relevant_txs`]. + /// + /// Refer to [`EmittedUpdate::indexed_tx_graph_update`] for more. + /// + /// [`IndexedTxGraph::insert_relevant_txs`]: bdk_chain::IndexedTxGraph::insert_relevant_txs + pub fn indexed_tx_graph_update(&self) -> impl Iterator>> + where + A: Clone + Ord + PartialEq, + { + self.txs + .iter() + .map(|(tx, seen_at)| (tx, None, Some(*seen_at))) + } +} + +/// A closure that transforms a [`EmittedUpdate`] into a [`ConfirmationHeightAnchor`]. +/// +/// This is to be used as an input to [`EmittedUpdate::indexed_tx_graph_update`]. +pub fn confirmation_height_anchor( + cp: &CheckPoint, + _block: &Block, + _tx_pos: usize, +) -> ConfirmationHeightAnchor { + let anchor_block = cp.block_id(); + ConfirmationHeightAnchor { + anchor_block, + confirmation_height: anchor_block.height, + } +} + +/// A closure that transforms a [`EmittedUpdate`] into a [`ConfirmationTimeAnchor`]. +/// +/// This is to be used as an input to [`EmittedUpdate::indexed_tx_graph_update`]. +pub fn confirmation_time_anchor( + cp: &CheckPoint, + block: &Block, + _tx_pos: usize, +) -> ConfirmationTimeAnchor { + let anchor_block = cp.block_id(); + ConfirmationTimeAnchor { + anchor_block, + confirmation_height: anchor_block.height, + confirmation_time: block.header.time as _, + } +} + +/// A structure that emits updates for [`bdk_chain`] structures, sourcing blockchain data from +/// [`bitcoincore_rpc::Client`]. +/// +/// Refer to [module-level documentation] for more. +/// +/// [module-level documentation]: crate +pub struct Emitter<'c, C> { + client: &'c C, + fallback_height: u32, + + last_cp: Option, + last_info: Option, +} + +impl<'c, C: RpcApi> IntoIterator for Emitter<'c, C> { + type Item = as Iterator>::Item; + type IntoIter = UpdateIter<'c, C>; + + fn into_iter(self) -> Self::IntoIter { + UpdateIter { + emitter: self, + last_emission_was_mempool: false, + } + } +} + +impl<'c, C: RpcApi> Emitter<'c, C> { + /// Constructs a new [`Emitter`] with the provided [`bitcoincore_rpc::Client`]. + /// + /// * `fallback_height` is the block height to start from if `last_cp` is not provided, or a + /// point of agreement is not found. + /// * `last_cp` is the last known checkpoint to build updates on (if any). + pub fn new(client: &'c C, fallback_height: u32, last_cp: Option) -> Self { + Self { + client, + fallback_height, + last_cp, + last_info: None, + } + } + + /// Emits the whole mempool contents. + pub fn emit_mempool(&self) -> Result { + let txs = self + .client + .get_raw_mempool()? + .into_iter() + .map( + |txid| -> Result<(Transaction, u64), bitcoincore_rpc::Error> { + let first_seen = self + .client + .get_mempool_entry(&txid) + .map(|entry| entry.time)?; + let tx = self.client.get_raw_transaction(&txid, None)?; + Ok((tx, first_seen)) + }, + ) + .collect::, _>>()?; + Ok(EmittedMempool { txs }) + } + + /// Emits the next block (if any). + pub fn emit_block(&mut self) -> Result, bitcoincore_rpc::Error> { + enum PollResponse { + /// A new block that is in chain is found. Congratulations! + Block { + cp: CheckPoint, + info: GetBlockResult, + }, + /// This either signals that we have reached the tip, or that the blocks ahead are not + /// in the best chain. In either case, we need to find the agreement point again. + NoMoreBlocks, + /// We have exhausted the local checkpoint history and there is no agreement point. We + /// should emit from the fallback height for the next round. + AgreementPointNotFound, + /// We have found an agreement point! Do not emit this one, emit the one higher. + AgreementPointFound { + cp: CheckPoint, + info: GetBlockResult, + }, + } + + fn poll(emitter: &mut Emitter) -> Result + where + C: RpcApi, + { + let client = emitter.client; + + match (&mut emitter.last_cp, &mut emitter.last_info) { + (None, None) => { + let info = client + .get_block_info(&client.get_block_hash(emitter.fallback_height as _)?)?; + let cp = CheckPoint::new(BlockId { + height: info.height as _, + hash: info.hash, + }); + Ok(PollResponse::Block { cp, info }) + } + (Some(last_cp), None) => { + for cp in last_cp.iter() { + let cp_block = cp.block_id(); + let info = client.get_block_info(&cp_block.hash)?; + if info.confirmations < 0 { + // block is not in the main chain + continue; + } + // agreement point found + return Ok(PollResponse::AgreementPointFound { cp, info }); + } + // no agreement point found + Ok(PollResponse::AgreementPointNotFound) + } + (Some(last_cp), Some(last_info)) => { + let next_hash = match last_info.nextblockhash { + None => return Ok(PollResponse::NoMoreBlocks), + Some(next_hash) => next_hash, + }; + let info = client.get_block_info(&next_hash)?; + if info.confirmations < 0 { + return Ok(PollResponse::NoMoreBlocks); + } + let cp = last_cp + .clone() + .push(BlockId { + height: info.height as _, + hash: info.hash, + }) + .expect("must extend from checkpoint"); + Ok(PollResponse::Block { cp, info }) + } + (None, Some(last_info)) => unreachable!( + "info cannot exist without checkpoint: info={:#?}", + last_info + ), + } + } + + loop { + match poll(self)? { + PollResponse::Block { cp, info } => { + let block = self.client.get_block(&info.hash)?; + self.last_cp = Some(cp.clone()); + self.last_info = Some(info); + return Ok(Some(EmittedBlock { cp, block })); + } + PollResponse::NoMoreBlocks => { + // we have reached the tip, try find agreement point in next round + self.last_info = None; + return Ok(None); + } + PollResponse::AgreementPointNotFound => { + self.last_cp = None; + self.last_info = None; + continue; + } + PollResponse::AgreementPointFound { cp, info } => { + self.last_cp = Some(cp); + self.last_info = Some(info); + continue; + } + } + } + } + + /// Continuously poll [`bitcoincore_rpc::Client`] until an update is found. + pub fn emit_update(&mut self) -> Result { + match self.emit_block()? { + Some(emitted_block) => Ok(EmittedUpdate::Block(emitted_block)), + None => self.emit_mempool().map(EmittedUpdate::Mempool), + } + } +} + +/// Extends [`bitcoincore_rpc::Error`]. +pub trait BitcoindRpcErrorExt { + /// Returns whether the error is a "not found" error. + /// + /// This is useful since [`Emitter`] emits [`Result<_, bitcoincore_rpc::Error>`]s as + /// [`Iterator::Item`]. + fn is_not_found_error(&self) -> bool; +} + +impl BitcoindRpcErrorExt for bitcoincore_rpc::Error { + fn is_not_found_error(&self) -> bool { + if let bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(rpc_err)) = self + { + rpc_err.code == -5 + } else { + false + } + } +} + +/// An [`Iterator`] that wraps an [`Emitter`], and emits [`Result`]s of [`EmittedUpdate`]. +/// +/// ```rust,no_run +/// use bdk_bitcoind_rpc::{EmittedUpdate, Emitter, UpdateIter}; +/// use core::iter::{IntoIterator, Iterator}; +/// # let client: bdk_bitcoind_rpc::bitcoincore_rpc::Client = todo!(); +/// +/// let mut update_iter = Emitter::new(&client, 706_932, None).into_iter(); +/// let update = update_iter.next().expect("must get next update"); +/// println!("got update: {:?}", update); +/// ``` +/// +/// Refer to [module-level documentation] for more. +/// +/// [module-level documentation]: crate +pub struct UpdateIter<'c, C> { + emitter: Emitter<'c, C>, + last_emission_was_mempool: bool, +} + +impl<'c, C: RpcApi> Iterator for UpdateIter<'c, C> { + type Item = Result; + + fn next(&mut self) -> Option { + if self.last_emission_was_mempool { + self.last_emission_was_mempool = false; + None + } else { + let update = self.emitter.emit_update(); + if matches!(update, Ok(EmittedUpdate::Mempool(_))) { + self.last_emission_was_mempool = true; + } + Some(update) + } + } +} diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs new file mode 100644 index 000000000..529191ff2 --- /dev/null +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -0,0 +1,373 @@ +use std::collections::{BTreeMap, BTreeSet}; + +use bdk_bitcoind_rpc::Emitter; +use bdk_chain::{ + bitcoin::{Address, Amount, BlockHash, Txid}, + local_chain::LocalChain, + Append, BlockId, ConfirmationHeightAnchor, IndexedTxGraph, SpkTxOutIndex, +}; +use bitcoincore_rpc::RpcApi; + +struct TestEnv { + #[allow(dead_code)] + daemon: bitcoind::BitcoinD, + client: bitcoincore_rpc::Client, +} + +impl TestEnv { + fn new() -> anyhow::Result { + let daemon = match std::env::var_os("TEST_BITCOIND") { + Some(bitcoind_path) => bitcoind::BitcoinD::new(bitcoind_path), + None => bitcoind::BitcoinD::from_downloaded(), + }?; + let client = bitcoincore_rpc::Client::new( + &daemon.rpc_url(), + bitcoincore_rpc::Auth::CookieFile(daemon.params.cookie_file.clone()), + )?; + Ok(Self { daemon, client }) + } + + fn mine_blocks( + &self, + count: usize, + address: Option
, + ) -> anyhow::Result> { + let coinbase_address = match address { + Some(address) => address, + None => self.client.get_new_address(None, None)?.assume_checked(), + }; + let block_hashes = self + .client + .generate_to_address(count as _, &coinbase_address)?; + Ok(block_hashes) + } + + fn reorg(&self, count: usize) -> anyhow::Result> { + let start_height = self.client.get_block_count()?; + + let mut hash = self.client.get_best_block_hash()?; + for _ in 0..count { + let prev_hash = self.client.get_block_info(&hash)?.previousblockhash; + self.client.invalidate_block(&hash)?; + match prev_hash { + Some(prev_hash) => hash = prev_hash, + None => break, + } + } + + let res = self.mine_blocks(count, None); + assert_eq!( + self.client.get_block_count()?, + start_height, + "reorg should not result in height change" + ); + res + } +} + +/// Ensure that blocks are emitted in order even after reorg. +/// +/// 1. Mine 101 blocks. +/// 2. Emit blocks from [`Emitter`] and update the [`LocalChain`]. +/// 3. Reorg highest 6 blocks. +/// 4. Emit blocks from [`Emitter`] and re-update the [`LocalChain`]. +#[test] +pub fn test_sync_local_chain() -> anyhow::Result<()> { + let env = TestEnv::new()?; + let mut local_chain = LocalChain::default(); + let mut emitter = Emitter::new(&env.client, 0, local_chain.tip()); + + // mine some blocks and returned the actual block hashes + let exp_hashes = { + let mut hashes = vec![env.client.get_block_hash(0)?]; // include genesis block + hashes.extend(env.mine_blocks(101, None)?); + hashes + }; + + // see if the emitter outputs the right blocks + loop { + let cp = match emitter.emit_block()? { + Some(b) => b.checkpoint(), + None => break, + }; + assert_eq!( + cp.hash(), + exp_hashes[cp.height() as usize], + "emitted block hash is unexpected" + ); + + let chain_update = bdk_chain::local_chain::Update { + tip: cp.clone(), + introduce_older_blocks: false, + }; + assert_eq!( + local_chain.apply_update(chain_update)?, + BTreeMap::from([(cp.height(), Some(cp.hash()))]), + "chain update changeset is unexpected", + ); + } + + assert_eq!( + local_chain.blocks(), + &exp_hashes + .iter() + .enumerate() + .map(|(i, hash)| (i as u32, *hash)) + .collect(), + "final local_chain state is unexpected", + ); + + // create new emitter (just for testing sake) + drop(emitter); + let mut emitter = Emitter::new(&env.client, 0, local_chain.tip()); + + // perform reorg + let reorged_blocks = env.reorg(6)?; + let exp_hashes = exp_hashes + .iter() + .take(exp_hashes.len() - reorged_blocks.len()) + .chain(&reorged_blocks) + .cloned() + .collect::>(); + + // see if the emitter outputs the right blocks + let mut exp_height = exp_hashes.len() - reorged_blocks.len(); + loop { + let cp = match emitter.emit_block()? { + Some(b) => b.checkpoint(), + None => break, + }; + assert_eq!( + cp.height(), + exp_height as u32, + "emitted block has unexpected height" + ); + + assert_eq!( + cp.hash(), + exp_hashes[cp.height() as usize], + "emitted block is unexpected" + ); + + let chain_update = bdk_chain::local_chain::Update { + tip: cp.clone(), + introduce_older_blocks: false, + }; + assert_eq!( + local_chain.apply_update(chain_update)?, + if exp_height == exp_hashes.len() - reorged_blocks.len() { + core::iter::once((cp.height(), Some(cp.hash()))) + .chain((cp.height() + 1..exp_hashes.len() as u32).map(|h| (h, None))) + .collect::() + } else { + BTreeMap::from([(cp.height(), Some(cp.hash()))]) + }, + "chain update changeset is unexpected", + ); + + exp_height += 1; + } + + assert_eq!( + local_chain.blocks(), + &exp_hashes + .iter() + .enumerate() + .map(|(i, hash)| (i as u32, *hash)) + .collect(), + "final local_chain state is unexpected after reorg", + ); + + Ok(()) +} + +/// Ensure that [`EmittedUpdate::into_tx_graph_update`] behaves appropriately for both mempool and +/// block updates. +/// +/// [`EmittedUpdate::into_tx_graph_update`]: bdk_bitcoind_rpc::EmittedUpdate::into_tx_graph_update +#[test] +fn test_into_tx_graph() -> anyhow::Result<()> { + let env = TestEnv::new()?; + + println!("getting new addresses!"); + let addr_0 = env.client.get_new_address(None, None)?.assume_checked(); + let addr_1 = env.client.get_new_address(None, None)?.assume_checked(); + let addr_2 = env.client.get_new_address(None, None)?.assume_checked(); + println!("got new addresses!"); + + println!("mining block!"); + env.mine_blocks(101, None)?; + println!("mined blocks!"); + + let mut chain = LocalChain::default(); + let mut indexed_tx_graph = IndexedTxGraph::::new({ + let mut index = SpkTxOutIndex::::default(); + index.insert_spk(0, addr_0.script_pubkey()); + index.insert_spk(1, addr_1.script_pubkey()); + index.insert_spk(2, addr_2.script_pubkey()); + index + }); + + for r in Emitter::new(&env.client, 0, chain.tip()) { + let update = r?; + + if let Some(chain_update) = update.chain_update() { + let _ = chain.apply_update(chain_update)?; + } + + let tx_graph_update = + update.indexed_tx_graph_update(bdk_bitcoind_rpc::confirmation_height_anchor); + + let indexed_additions = indexed_tx_graph.insert_relevant_txs(tx_graph_update); + assert!(indexed_additions.is_empty()); + } + + // send 3 txs to a tracked address, these txs will be in the mempool + let exp_txids = { + let mut txids = BTreeSet::new(); + for _ in 0..3 { + txids.insert(env.client.send_to_address( + &addr_0, + Amount::from_sat(10_000), + None, + None, + None, + None, + None, + None, + )?); + } + txids + }; + + // expect the next update to be a mempool update (with 3 relevant tx) + { + let update = Emitter::new(&env.client, 0, chain.tip()).emit_update()?; + assert!(update.is_mempool()); + + let tx_graph_update = + update.indexed_tx_graph_update(bdk_bitcoind_rpc::confirmation_height_anchor); + + let indexed_additions = indexed_tx_graph.insert_relevant_txs(tx_graph_update); + assert_eq!( + indexed_additions + .graph + .txs + .iter() + .map(|tx| tx.txid()) + .collect::>(), + exp_txids, + "changeset should have the 3 mempool transactions", + ); + assert!(indexed_additions.graph.anchors.is_empty()); + } + + // mine a block that confirms the 3 txs + let exp_block_hash = env.mine_blocks(1, None)?[0]; + let exp_block_height = env.client.get_block_info(&exp_block_hash)?.height as u32; + let exp_anchors = exp_txids + .iter() + .map({ + let anchor = ConfirmationHeightAnchor { + anchor_block: BlockId { + height: exp_block_height, + hash: exp_block_hash, + }, + confirmation_height: exp_block_height, + }; + move |&txid| (anchor, txid) + }) + .collect::>(); + + { + let update = Emitter::new(&env.client, 0, chain.tip()).emit_update()?; + assert!(update.is_block()); + + if let Some(chain_update) = update.chain_update() { + let _ = chain.apply_update(chain_update)?; + } + + let tx_graph_update = + update.indexed_tx_graph_update(bdk_bitcoind_rpc::confirmation_height_anchor); + + let indexed_additions = indexed_tx_graph.insert_relevant_txs(tx_graph_update); + assert!(indexed_additions.graph.txs.is_empty()); + assert!(indexed_additions.graph.txouts.is_empty()); + assert_eq!(indexed_additions.graph.anchors, exp_anchors); + } + + Ok(()) +} + +#[test] +fn test_status_of_reorg_tx() -> anyhow::Result<()> { + let env = TestEnv::new()?; + let address = env.client.get_new_address(None, None)?.assume_checked(); + + env.mine_blocks(101, None)?; + + let mut chain = LocalChain::default(); + let mut indexed_tx_graph = IndexedTxGraph::::new({ + let mut index = SpkTxOutIndex::::default(); + index.insert_spk(0, address.script_pubkey()); + index + }); + + let txid = env.client.send_to_address( + &address, + Amount::from_sat(10_000), + None, + None, + None, + None, + None, + None, + )?; + + env.mine_blocks(1, None)?; + + for r in Emitter::new(&env.client, 0, chain.tip()) { + let update = r?; + + if let Some(chain_update) = update.chain_update() { + let _ = chain.apply_update(chain_update)?; + } + + let tx_graph_update = + update.indexed_tx_graph_update(bdk_bitcoind_rpc::confirmation_height_anchor); + + let _ = indexed_tx_graph.insert_relevant_txs(tx_graph_update); + } + + let chain_tip = chain.tip().unwrap().block_id(); + let tx_chain_position = indexed_tx_graph + .graph() + .get_chain_position(&chain, chain_tip, txid) + .unwrap(); + assert!(tx_chain_position.is_confirmed()); + + env.reorg(2)?; + + for r in Emitter::new(&env.client, 0, chain.tip()) { + let update = r?; + + if let Some(chain_update) = update.chain_update() { + let _ = chain.apply_update(chain_update)?; + } + + let tx_graph_update = + update.indexed_tx_graph_update(bdk_bitcoind_rpc::confirmation_height_anchor); + + let indexed_additions = indexed_tx_graph.insert_relevant_txs(tx_graph_update); + assert!(indexed_additions.is_empty()); + } + + let chain_tip = chain.tip().unwrap().block_id(); + let tx_chain_position = indexed_tx_graph + .graph() + .get_chain_position(&chain, chain_tip, txid) + .unwrap(); + assert!(!tx_chain_position.is_confirmed()); + + Ok(()) +} diff --git a/crates/chain/src/indexed_tx_graph.rs b/crates/chain/src/indexed_tx_graph.rs index 6dc2e9943..169c4ad36 100644 --- a/crates/chain/src/indexed_tx_graph.rs +++ b/crates/chain/src/indexed_tx_graph.rs @@ -135,8 +135,7 @@ where /// timestamp of when the transactions are last seen. pub fn insert_relevant_txs<'t>( &mut self, - txs: impl IntoIterator)>, - seen_at: Option, + txs: impl IntoIterator>>, ) -> ChangeSet { // The algorithm below allows for non-topologically ordered transactions by using two loops. // This is achieved by: @@ -146,17 +145,19 @@ where // returns true or not. (in a second loop). let mut changeset = ChangeSet::::default(); let mut transactions = Vec::new(); - for (tx, anchors) in txs.into_iter() { + for (tx, anchors, seen_at) in txs.into_iter() { changeset.indexer.append(self.index.index_tx(tx)); - transactions.push((tx, anchors)); + transactions.push((tx, anchors, seen_at)); } changeset.append( transactions .into_iter() - .filter_map(|(tx, anchors)| match self.index.is_tx_relevant(tx) { - true => Some(self.insert_tx(tx, anchors, seen_at)), - false => None, - }) + .filter_map( + |(tx, anchors, seen_at)| match self.index.is_tx_relevant(tx) { + true => Some(self.insert_tx(tx, anchors, seen_at)), + false => None, + }, + ) .fold(Default::default(), |mut acc, other| { acc.append(other); acc @@ -166,6 +167,9 @@ where } } +/// Represents a single transaction update. +pub type TxItem<'t, A> = (&'t Transaction, A, Option); + /// A structure that represents changes to an [`IndexedTxGraph`]. #[derive(Clone, Debug, PartialEq)] #[cfg_attr( @@ -233,7 +237,18 @@ pub trait Indexer { /// Scan and index the given `outpoint` and `txout`. fn index_txout(&mut self, outpoint: OutPoint, txout: &TxOut) -> Self::ChangeSet; - /// Scan and index the given transaction. + /// Scans a transaction for relevant outpoints, which are stored and indexed internally. + /// + /// If the matched script pubkey is part of the lookahead, the last stored index is updated for + /// the script pubkey's keychain and the [`ChangeSet`] returned will reflect the + /// change. + /// + /// Typically, this method is used in two situations: + /// + /// 1. After loading transaction data from the disk, you may scan over all the txouts to restore all + /// your txouts. + /// 2. When getting new data from the chain, you usually scan it before incorporating it into + /// your chain state. fn index_tx(&mut self, tx: &Transaction) -> Self::ChangeSet; /// Apply changeset to itself. diff --git a/crates/chain/src/keychain.rs b/crates/chain/src/keychain.rs index 64d68d81e..63972a0ad 100644 --- a/crates/chain/src/keychain.rs +++ b/crates/chain/src/keychain.rs @@ -10,9 +10,7 @@ //! //! [`SpkTxOutIndex`]: crate::SpkTxOutIndex -use crate::{ - collections::BTreeMap, indexed_tx_graph, local_chain, tx_graph::TxGraph, Anchor, Append, -}; +use crate::{collections::BTreeMap, Append}; #[cfg(feature = "miniscript")] mod txout_index; @@ -82,98 +80,6 @@ impl AsRef> for ChangeSet { } } -/// A structure to update [`KeychainTxOutIndex`], [`TxGraph`] and [`LocalChain`] atomically. -/// -/// [`LocalChain`]: local_chain::LocalChain -#[derive(Debug, Clone)] -pub struct WalletUpdate { - /// Contains the last active derivation indices per keychain (`K`), which is used to update the - /// [`KeychainTxOutIndex`]. - pub last_active_indices: BTreeMap, - - /// Update for the [`TxGraph`]. - pub graph: TxGraph, - - /// Update for the [`LocalChain`]. - /// - /// [`LocalChain`]: local_chain::LocalChain - pub chain: local_chain::Update, -} - -impl WalletUpdate { - /// Construct a [`WalletUpdate`] with a given [`local_chain::Update`]. - pub fn new(chain_update: local_chain::Update) -> Self { - Self { - last_active_indices: BTreeMap::new(), - graph: TxGraph::default(), - chain: chain_update, - } - } -} - -/// A structure that records the corresponding changes as result of applying an [`WalletUpdate`]. -#[derive(Debug, Clone, PartialEq)] -#[cfg_attr( - feature = "serde", - derive(serde::Deserialize, serde::Serialize), - serde( - crate = "serde_crate", - bound( - deserialize = "K: Ord + serde::Deserialize<'de>, A: Ord + serde::Deserialize<'de>", - serialize = "K: Ord + serde::Serialize, A: Ord + serde::Serialize", - ) - ) -)] -pub struct WalletChangeSet { - /// Changes to the [`LocalChain`]. - /// - /// [`LocalChain`]: local_chain::LocalChain - pub chain: local_chain::ChangeSet, - - /// ChangeSet to [`IndexedTxGraph`]. - /// - /// [`IndexedTxGraph`]: crate::indexed_tx_graph::IndexedTxGraph - pub index_tx_graph: indexed_tx_graph::ChangeSet>, -} - -impl Default for WalletChangeSet { - fn default() -> Self { - Self { - chain: Default::default(), - index_tx_graph: Default::default(), - } - } -} - -impl Append for WalletChangeSet { - fn append(&mut self, other: Self) { - Append::append(&mut self.chain, other.chain); - Append::append(&mut self.index_tx_graph, other.index_tx_graph); - } - - fn is_empty(&self) -> bool { - self.chain.is_empty() && self.index_tx_graph.is_empty() - } -} - -impl From for WalletChangeSet { - fn from(chain: local_chain::ChangeSet) -> Self { - Self { - chain, - ..Default::default() - } - } -} - -impl From>> for WalletChangeSet { - fn from(index_tx_graph: indexed_tx_graph::ChangeSet>) -> Self { - Self { - index_tx_graph, - ..Default::default() - } - } -} - /// Balance, differentiated into various categories. #[derive(Debug, PartialEq, Eq, Clone, Default)] #[cfg_attr( diff --git a/crates/chain/src/keychain/txout_index.rs b/crates/chain/src/keychain/txout_index.rs index 9b38a7ade..5996d4d46 100644 --- a/crates/chain/src/keychain/txout_index.rs +++ b/crates/chain/src/keychain/txout_index.rs @@ -3,7 +3,7 @@ use crate::{ indexed_tx_graph::Indexer, miniscript::{Descriptor, DescriptorPublicKey}, spk_iter::BIP32_MAX_INDEX, - ForEachTxOut, SpkIterator, SpkTxOutIndex, + SpkIterator, SpkTxOutIndex, }; use alloc::vec::Vec; use bitcoin::{OutPoint, Script, TxOut}; @@ -91,11 +91,19 @@ impl Indexer for KeychainTxOutIndex { type ChangeSet = super::ChangeSet; fn index_txout(&mut self, outpoint: OutPoint, txout: &TxOut) -> Self::ChangeSet { - self.scan_txout(outpoint, txout) + let mut changeset = super::ChangeSet::::default(); + for (keychain, index) in self.inner.index_txout(outpoint, txout) { + changeset.append(self.reveal_to_target(&keychain, index).1); + } + changeset } fn index_tx(&mut self, tx: &bitcoin::Transaction) -> Self::ChangeSet { - self.scan(tx) + let mut changeset = super::ChangeSet::::default(); + for (op, txout) in tx.output.iter().enumerate() { + changeset.append(self.index_txout(OutPoint::new(tx.txid(), op as u32), txout)); + } + changeset } fn initial_changeset(&self) -> Self::ChangeSet { @@ -112,38 +120,6 @@ impl Indexer for KeychainTxOutIndex { } impl KeychainTxOutIndex { - /// Scans an object for relevant outpoints, which are stored and indexed internally. - /// - /// If the matched script pubkey is part of the lookahead, the last stored index is updated for - /// the script pubkey's keychain and the [`super::ChangeSet`] returned will reflect the - /// change. - /// - /// Typically, this method is used in two situations: - /// - /// 1. After loading transaction data from the disk, you may scan over all the txouts to restore all - /// your txouts. - /// 2. When getting new data from the chain, you usually scan it before incorporating it into - /// your chain state (i.e., `SparseChain`, `ChainGraph`). - /// - /// See [`ForEachTxout`] for the types that support this. - /// - /// [`ForEachTxout`]: crate::ForEachTxOut - pub fn scan(&mut self, txouts: &impl ForEachTxOut) -> super::ChangeSet { - let mut changeset = super::ChangeSet::::default(); - txouts.for_each_txout(|(op, txout)| changeset.append(self.scan_txout(op, txout))); - changeset - } - - /// Scan a single outpoint for a matching script pubkey. - /// - /// If it matches, this will store and index it. - pub fn scan_txout(&mut self, op: OutPoint, txout: &TxOut) -> super::ChangeSet { - match self.inner.scan_txout(op, txout).cloned() { - Some((keychain, index)) => self.reveal_to_target(&keychain, index).1, - None => super::ChangeSet::default(), - } - } - /// Return a reference to the internal [`SpkTxOutIndex`]. pub fn inner(&self) -> &SpkTxOutIndex<(K, u32)> { &self.inner @@ -200,14 +176,11 @@ impl KeychainTxOutIndex { /// Set the lookahead count for `keychain`. /// /// The lookahead is the number of scripts to cache ahead of the last stored script index. This - /// is useful during a scan via [`scan`] or [`scan_txout`]. + /// is useful during a scan via [`Indexer::index_tx`] or [`Indexer::index_txout`]. /// /// # Panics /// /// This will panic if the `keychain` does not exist. - /// - /// [`scan`]: Self::scan - /// [`scan_txout`]: Self::scan_txout pub fn set_lookahead(&mut self, keychain: &K, lookahead: u32) { self.lookahead.insert(keychain.clone(), lookahead); self.replenish_lookahead(keychain); diff --git a/crates/chain/src/lib.rs b/crates/chain/src/lib.rs index ed167ebf6..f38b7ee53 100644 --- a/crates/chain/src/lib.rs +++ b/crates/chain/src/lib.rs @@ -100,3 +100,11 @@ pub mod collections { /// How many confirmations are needed f or a coinbase output to be spent. pub const COINBASE_MATURITY: u32 = 100; + +impl From> + for (local_chain::ChangeSet, indexed_tx_graph::ChangeSet) +{ + fn from(indexed_changeset: indexed_tx_graph::ChangeSet) -> Self { + (local_chain::ChangeSet::default(), indexed_changeset) + } +} diff --git a/crates/chain/src/spk_txout_index.rs b/crates/chain/src/spk_txout_index.rs index db749f44c..6c69daf31 100644 --- a/crates/chain/src/spk_txout_index.rs +++ b/crates/chain/src/spk_txout_index.rs @@ -3,15 +3,15 @@ use core::ops::RangeBounds; use crate::{ collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap}, indexed_tx_graph::Indexer, - ForEachTxOut, }; use bitcoin::{self, OutPoint, Script, ScriptBuf, Transaction, TxOut, Txid}; /// An index storing [`TxOut`]s that have a script pubkey that matches those in a list. /// /// The basic idea is that you insert script pubkeys you care about into the index with -/// [`insert_spk`] and then when you call [`scan`], the index will look at any txouts you pass in and -/// store and index any txouts matching one of its script pubkeys. +/// [`insert_spk`] and then when you call [`Indexer::index_tx`] or [`Indexer::index_txout`], the +/// index will look at any txouts you pass in and store and index any txouts matching one of its +/// script pubkeys. /// /// Each script pubkey is associated with an application-defined index script index `I`, which must be /// [`Ord`]. Usually, this is used to associate the derivation index of the script pubkey or even a @@ -25,7 +25,6 @@ use bitcoin::{self, OutPoint, Script, ScriptBuf, Transaction, TxOut, Txid}; /// [`TxOut`]: bitcoin::TxOut /// [`insert_spk`]: Self::insert_spk /// [`Ord`]: core::cmp::Ord -/// [`scan`]: Self::scan /// [`TxGraph`]: crate::tx_graph::TxGraph #[derive(Clone, Debug)] pub struct SpkTxOutIndex { @@ -54,19 +53,35 @@ impl Default for SpkTxOutIndex { } impl Indexer for SpkTxOutIndex { - type ChangeSet = (); + type ChangeSet = BTreeSet; fn index_txout(&mut self, outpoint: OutPoint, txout: &TxOut) -> Self::ChangeSet { - self.scan_txout(outpoint, txout); - Default::default() + let spk_i = self.spk_indices.get(&txout.script_pubkey); + let mut scanned_indices = BTreeSet::new(); + if let Some(spk_i) = spk_i { + self.txouts.insert(outpoint, (spk_i.clone(), txout.clone())); + self.spk_txouts.insert((spk_i.clone(), outpoint)); + self.unused.remove(spk_i); + scanned_indices.insert(spk_i.clone()); + } + scanned_indices } fn index_tx(&mut self, tx: &Transaction) -> Self::ChangeSet { - self.scan(tx); - Default::default() + let mut scanned_indices = BTreeSet::new(); + + for (i, txout) in tx.output.iter().enumerate() { + let op = OutPoint::new(tx.txid(), i as u32); + let mut txout_indices = self.index_txout(op, txout); + scanned_indices.append(&mut txout_indices); + } + + scanned_indices } - fn initial_changeset(&self) -> Self::ChangeSet {} + fn initial_changeset(&self) -> Self::ChangeSet { + self.spks.keys().cloned().collect() + } fn apply_changeset(&mut self, _changeset: Self::ChangeSet) { // This applies nothing. @@ -77,51 +92,7 @@ impl Indexer for SpkTxOutIndex { } } -/// This macro is used instead of a member function of `SpkTxOutIndex`, which would result in a -/// compiler error[E0521]: "borrowed data escapes out of closure" when we attempt to take a -/// reference out of the `ForEachTxOut` closure during scanning. -macro_rules! scan_txout { - ($self:ident, $op:expr, $txout:expr) => {{ - let spk_i = $self.spk_indices.get(&$txout.script_pubkey); - if let Some(spk_i) = spk_i { - $self.txouts.insert($op, (spk_i.clone(), $txout.clone())); - $self.spk_txouts.insert((spk_i.clone(), $op)); - $self.unused.remove(&spk_i); - } - spk_i - }}; -} - impl SpkTxOutIndex { - /// Scans an object containing many txouts. - /// - /// Typically, this is used in two situations: - /// - /// 1. After loading transaction data from the disk, you may scan over all the txouts to restore all - /// your txouts. - /// 2. When getting new data from the chain, you usually scan it before incorporating it into your chain state. - /// - /// See [`ForEachTxout`] for the types that support this. - /// - /// [`ForEachTxout`]: crate::ForEachTxOut - pub fn scan(&mut self, txouts: &impl ForEachTxOut) -> BTreeSet { - let mut scanned_indices = BTreeSet::new(); - - txouts.for_each_txout(|(op, txout)| { - if let Some(spk_i) = scan_txout!(self, op, txout) { - scanned_indices.insert(spk_i.clone()); - } - }); - - scanned_indices - } - - /// Scan a single `TxOut` for a matching script pubkey and returns the index that matches the - /// script pubkey (if any). - pub fn scan_txout(&mut self, op: OutPoint, txout: &TxOut) -> Option<&I> { - scan_txout!(self, op, txout) - } - /// Get a reference to the set of indexed outpoints. pub fn outpoints(&self) -> &BTreeSet<(I, OutPoint)> { &self.spk_txouts diff --git a/crates/chain/src/tx_data_traits.rs b/crates/chain/src/tx_data_traits.rs index 811b1ff41..d0ed67d75 100644 --- a/crates/chain/src/tx_data_traits.rs +++ b/crates/chain/src/tx_data_traits.rs @@ -2,39 +2,6 @@ use crate::collections::BTreeMap; use crate::collections::BTreeSet; use crate::BlockId; use alloc::vec::Vec; -use bitcoin::{Block, OutPoint, Transaction, TxOut}; - -/// Trait to do something with every txout contained in a structure. -/// -/// We would prefer to just work with things that can give us an `Iterator` -/// here, but rust's type system makes it extremely hard to do this (without trait objects). -pub trait ForEachTxOut { - /// The provided closure `f` will be called with each `outpoint/txout` pair. - fn for_each_txout(&self, f: impl FnMut((OutPoint, &TxOut))); -} - -impl ForEachTxOut for Block { - fn for_each_txout(&self, mut f: impl FnMut((OutPoint, &TxOut))) { - for tx in self.txdata.iter() { - tx.for_each_txout(&mut f) - } - } -} - -impl ForEachTxOut for Transaction { - fn for_each_txout(&self, mut f: impl FnMut((OutPoint, &TxOut))) { - let txid = self.txid(); - for (i, txout) in self.output.iter().enumerate() { - f(( - OutPoint { - txid, - vout: i as u32, - }, - txout, - )) - } - } -} /// Trait that "anchors" blockchain data to a specific block of height and hash. /// diff --git a/crates/chain/src/tx_graph.rs b/crates/chain/src/tx_graph.rs index adb84ca22..a741ddb6d 100644 --- a/crates/chain/src/tx_graph.rs +++ b/crates/chain/src/tx_graph.rs @@ -52,7 +52,7 @@ use crate::{ collections::*, keychain::Balance, local_chain::LocalChain, Anchor, Append, BlockId, - ChainOracle, ChainPosition, ForEachTxOut, FullTxOut, + ChainOracle, ChainPosition, FullTxOut, }; use alloc::vec::Vec; use bitcoin::{OutPoint, Script, Transaction, TxOut, Txid}; @@ -1072,18 +1072,6 @@ impl AsRef> for TxGraph { } } -impl ForEachTxOut for ChangeSet { - fn for_each_txout(&self, f: impl FnMut((OutPoint, &TxOut))) { - self.txouts().for_each(f) - } -} - -impl ForEachTxOut for TxGraph { - fn for_each_txout(&self, f: impl FnMut((OutPoint, &TxOut))) { - self.all_txouts().for_each(f) - } -} - /// An iterator that traverses transaction descendants. /// /// This `struct` is created by the [`walk_descendants`] method of [`TxGraph`]. diff --git a/crates/chain/tests/test_indexed_tx_graph.rs b/crates/chain/tests/test_indexed_tx_graph.rs index 84506ec11..312096979 100644 --- a/crates/chain/tests/test_indexed_tx_graph.rs +++ b/crates/chain/tests/test_indexed_tx_graph.rs @@ -74,7 +74,7 @@ fn insert_relevant_txs() { }; assert_eq!( - graph.insert_relevant_txs(txs.iter().map(|tx| (tx, None)), None), + graph.insert_relevant_txs(txs.iter().map(|tx| (tx, None, None))), changeset, ); @@ -211,8 +211,8 @@ fn test_list_owned_txouts() { // Insert transactions into graph with respective anchors // For unconfirmed txs we pass in `None`. - let _ = graph.insert_relevant_txs( - [&tx1, &tx2, &tx3, &tx6].iter().enumerate().map(|(i, tx)| { + let _ = + graph.insert_relevant_txs([&tx1, &tx2, &tx3, &tx6].iter().enumerate().map(|(i, tx)| { let height = i as u32; ( *tx, @@ -225,12 +225,11 @@ fn test_list_owned_txouts() { anchor_block, confirmation_height: anchor_block.height, }), + None, ) - }), - None, - ); + })); - let _ = graph.insert_relevant_txs([&tx4, &tx5].iter().map(|tx| (*tx, None)), Some(100)); + let _ = graph.insert_relevant_txs([&tx4, &tx5].iter().map(|tx| (*tx, None, Some(100)))); // A helper lambda to extract and filter data from the graph. let fetch = diff --git a/crates/chain/tests/test_keychain_txout_index.rs b/crates/chain/tests/test_keychain_txout_index.rs index 96a1afd1a..f3886ab6f 100644 --- a/crates/chain/tests/test_keychain_txout_index.rs +++ b/crates/chain/tests/test_keychain_txout_index.rs @@ -4,6 +4,7 @@ mod common; use bdk_chain::{ collections::BTreeMap, + indexed_tx_graph::Indexer, keychain::{self, KeychainTxOutIndex}, Append, }; @@ -194,7 +195,7 @@ fn test_lookahead() { ], ..common::new_tx(external_index) }; - assert_eq!(txout_index.scan(&tx), keychain::ChangeSet::default()); + assert_eq!(txout_index.index_tx(&tx), keychain::ChangeSet::default()); assert_eq!( txout_index.last_revealed_index(&TestKeychain::External), Some(last_external_index) @@ -248,7 +249,7 @@ fn test_scan_with_lookahead() { value: 0, }; - let changeset = txout_index.scan_txout(op, &txout); + let changeset = txout_index.index_txout(op, &txout); assert_eq!( changeset.as_inner(), &[(TestKeychain::External, spk_i)].into() @@ -273,7 +274,7 @@ fn test_scan_with_lookahead() { script_pubkey: spk_41, value: 0, }; - let changeset = txout_index.scan_txout(op, &txout); + let changeset = txout_index.index_txout(op, &txout); assert!(changeset.is_empty()); } diff --git a/crates/chain/tests/test_spk_txout_index.rs b/crates/chain/tests/test_spk_txout_index.rs index 099b4ca88..e8b752146 100644 --- a/crates/chain/tests/test_spk_txout_index.rs +++ b/crates/chain/tests/test_spk_txout_index.rs @@ -1,4 +1,4 @@ -use bdk_chain::SpkTxOutIndex; +use bdk_chain::{indexed_tx_graph::Indexer, SpkTxOutIndex}; use bitcoin::{absolute, OutPoint, ScriptBuf, Transaction, TxIn, TxOut}; #[test] @@ -22,7 +22,7 @@ fn spk_txout_sent_and_received() { assert_eq!(index.sent_and_received(&tx1), (0, 42_000)); assert_eq!(index.net_value(&tx1), 42_000); - index.scan(&tx1); + index.index_tx(&tx1); assert_eq!( index.sent_and_received(&tx1), (0, 42_000), @@ -82,7 +82,7 @@ fn mark_used() { }], }; - spk_index.scan(&tx1); + spk_index.index_tx(&tx1); spk_index.unmark_used(&1); assert!( spk_index.is_used(&1), diff --git a/crates/electrum/src/electrum_ext.rs b/crates/electrum/src/electrum_ext.rs index c7859bdfe..b74358627 100644 --- a/crates/electrum/src/electrum_ext.rs +++ b/crates/electrum/src/electrum_ext.rs @@ -1,6 +1,5 @@ use bdk_chain::{ bitcoin::{OutPoint, ScriptBuf, Transaction, Txid}, - keychain::WalletUpdate, local_chain::{self, CheckPoint}, tx_graph::{self, TxGraph}, Anchor, BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor, @@ -15,90 +14,67 @@ use std::{ /// We assume that a block of this depth and deeper cannot be reorged. const ASSUME_FINAL_DEPTH: u32 = 8; -/// Represents an update fetched from an Electrum server, but excludes full transactions. +/// Represents a [`TxGraph`] update fetched from an Electrum server, but excludes full transactions. /// /// To provide a complete update to [`TxGraph`], you'll need to call [`Self::missing_full_txs`] to -/// determine the full transactions missing from [`TxGraph`]. Then call [`Self::finalize`] to fetch -/// the full transactions from Electrum and finalize the update. -#[derive(Debug, Clone)] -pub struct ElectrumUpdate { - /// Map of [`Txid`]s to associated [`Anchor`]s. - pub graph_update: HashMap>, - /// The latest chain tip, as seen by the Electrum server. - pub new_tip: local_chain::CheckPoint, - /// Last-used index update for [`KeychainTxOutIndex`](bdk_chain::keychain::KeychainTxOutIndex). - pub keychain_update: BTreeMap, -} - -impl ElectrumUpdate { - fn new(new_tip: local_chain::CheckPoint) -> Self { - Self { - new_tip, - graph_update: HashMap::new(), - keychain_update: BTreeMap::new(), - } - } +/// determine the full transactions missing from [`TxGraph`]. Then call [`Self::finalize`] to +/// fetch the full transactions from Electrum and finalize the update. +#[derive(Debug, Default, Clone)] +pub struct IncompleteTxGraph(HashMap>); +impl IncompleteTxGraph { /// Determine the full transactions that are missing from `graph`. /// - /// Refer to [`ElectrumUpdate`]. + /// Refer to [`IncompleteTxGraph`] for more. pub fn missing_full_txs(&self, graph: &TxGraph) -> Vec { - self.graph_update + self.0 .keys() .filter(move |&&txid| graph.as_ref().get_tx(txid).is_none()) .cloned() .collect() } - /// Finalizes update with `missing` txids to fetch from `client`. + /// Finalizes the [`TxGraph`] update by fetching `missing` txids from the `client`. /// - /// Refer to [`ElectrumUpdate`]. + /// Refer to [`IncompleteTxGraph`] for more. pub fn finalize( self, client: &Client, seen_at: Option, missing: Vec, - ) -> Result, Error> { + ) -> Result, Error> { let new_txs = client.batch_transaction_get(&missing)?; - let mut graph_update = TxGraph::::new(new_txs); - for (txid, anchors) in self.graph_update { + let mut graph = TxGraph::::new(new_txs); + for (txid, anchors) in self.0 { if let Some(seen_at) = seen_at { - let _ = graph_update.insert_seen_at(txid, seen_at); + let _ = graph.insert_seen_at(txid, seen_at); } for anchor in anchors { - let _ = graph_update.insert_anchor(txid, anchor); + let _ = graph.insert_anchor(txid, anchor); } } - Ok(WalletUpdate { - last_active_indices: self.keychain_update, - graph: graph_update, - chain: local_chain::Update { - tip: self.new_tip, - introduce_older_blocks: true, - }, - }) + Ok(graph) } } -impl ElectrumUpdate { - /// Finalizes the [`ElectrumUpdate`] with `new_txs` and anchors of type +impl IncompleteTxGraph { + /// Finalizes the [`IncompleteTxGraph`] with `new_txs` and anchors of type /// [`ConfirmationTimeAnchor`]. /// /// **Note:** The confirmation time might not be precisely correct if there has been a reorg. /// Electrum's API intends that we use the merkle proof API, we should change `bdk_electrum` to /// use it. - pub fn finalize_as_confirmation_time( + pub fn finalize_with_confirmation_time( self, client: &Client, seen_at: Option, missing: Vec, - ) -> Result, Error> { - let update = self.finalize(client, seen_at, missing)?; + ) -> Result, Error> { + let graph = self.finalize(client, seen_at, missing)?; let relevant_heights = { let mut visited_heights = HashSet::new(); - update - .graph + graph .all_anchors() .iter() .map(|(a, _)| a.confirmation_height_upper_bound()) @@ -118,7 +94,7 @@ impl ElectrumUpdate { .collect::>(); let graph_changeset = { - let old_changeset = TxGraph::default().apply_update(update.graph.clone()); + let old_changeset = TxGraph::default().apply_update(graph); tx_graph::ChangeSet { txs: old_changeset.txs, txouts: old_changeset.txouts, @@ -140,21 +116,16 @@ impl ElectrumUpdate { } }; - Ok(WalletUpdate { - last_active_indices: update.last_active_indices, - graph: { - let mut graph = TxGraph::default(); - graph.apply_changeset(graph_changeset); - graph - }, - chain: update.chain, - }) + let mut new_graph = TxGraph::default(); + new_graph.apply_changeset(graph_changeset); + Ok(new_graph) } } /// Trait to extend [`Client`] functionality. pub trait ElectrumExt { - /// Scan the blockchain (via electrum) for the data specified and returns a [`ElectrumUpdate`]. + /// Scan the blockchain (via electrum) for the data specified and returns updates for + /// [`bdk_chain`] data structures. /// /// - `prev_tip`: the most recent blockchain tip present locally /// - `keychain_spks`: keychains that we want to scan transactions for @@ -165,6 +136,7 @@ pub trait ElectrumExt { /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated /// transactions. `batch_size` specifies the max number of script pubkeys to request for in a /// single batch request. + #[allow(clippy::type_complexity)] fn scan( &self, prev_tip: Option, @@ -173,7 +145,7 @@ pub trait ElectrumExt { outpoints: impl IntoIterator, stop_gap: usize, batch_size: usize, - ) -> Result, Error>; + ) -> Result<(local_chain::Update, IncompleteTxGraph, BTreeMap), Error>; /// Convenience method to call [`scan`] without requiring a keychain. /// @@ -185,20 +157,22 @@ pub trait ElectrumExt { txids: impl IntoIterator, outpoints: impl IntoIterator, batch_size: usize, - ) -> Result, Error> { + ) -> Result<(local_chain::Update, IncompleteTxGraph), Error> { let spk_iter = misc_spks .into_iter() .enumerate() .map(|(i, spk)| (i as u32, spk)); - self.scan( + let (chain, graph, _) = self.scan( prev_tip, [((), spk_iter)].into(), txids, outpoints, usize::MAX, batch_size, - ) + )?; + + Ok((chain, graph)) } } @@ -211,7 +185,14 @@ impl ElectrumExt for Client { outpoints: impl IntoIterator, stop_gap: usize, batch_size: usize, - ) -> Result, Error> { + ) -> Result< + ( + local_chain::Update, + IncompleteTxGraph, + BTreeMap, + ), + Error, + > { let mut request_spks = keychain_spks .into_iter() .map(|(k, s)| (k, s.into_iter())) @@ -223,9 +204,8 @@ impl ElectrumExt for Client { let update = loop { let (tip, _) = construct_update_tip(self, prev_tip.clone())?; - let mut update = ElectrumUpdate::::new(tip.clone()); - let cps = update - .new_tip + let mut graph_update = IncompleteTxGraph::::default(); + let cps = tip .iter() .take(10) .map(|cp| (cp.height(), cp)) @@ -236,7 +216,7 @@ impl ElectrumExt for Client { scanned_spks.append(&mut populate_with_spks( self, &cps, - &mut update, + &mut graph_update, &mut scanned_spks .iter() .map(|(i, (spk, _))| (i.clone(), spk.clone())), @@ -249,7 +229,7 @@ impl ElectrumExt for Client { populate_with_spks( self, &cps, - &mut update, + &mut graph_update, keychain_spks, stop_gap, batch_size, @@ -260,10 +240,14 @@ impl ElectrumExt for Client { } } - populate_with_txids(self, &cps, &mut update, &mut txids.iter().cloned())?; + populate_with_txids(self, &cps, &mut graph_update, &mut txids.iter().cloned())?; - let _txs = - populate_with_outpoints(self, &cps, &mut update, &mut outpoints.iter().cloned())?; + let _txs = populate_with_outpoints( + self, + &cps, + &mut graph_update, + &mut outpoints.iter().cloned(), + )?; // check for reorgs during scan process let server_blockhash = self.block_header(tip.height() as usize)?.block_hash(); @@ -271,7 +255,12 @@ impl ElectrumExt for Client { continue; // reorg } - update.keychain_update = request_spks + let chain_update = local_chain::Update { + tip, + introduce_older_blocks: true, + }; + + let keychain_update = request_spks .into_keys() .filter_map(|k| { scanned_spks @@ -281,7 +270,8 @@ impl ElectrumExt for Client { .map(|((_, i), _)| (k, *i)) }) .collect::>(); - break update; + + break (chain_update, graph_update, keychain_update); }; Ok(update) @@ -405,10 +395,10 @@ fn determine_tx_anchor( } } -fn populate_with_outpoints( +fn populate_with_outpoints( client: &Client, cps: &BTreeMap, - update: &mut ElectrumUpdate, + graph_update: &mut IncompleteTxGraph, outpoints: &mut impl Iterator, ) -> Result, Error> { let mut full_txs = HashMap::new(); @@ -457,8 +447,7 @@ fn populate_with_outpoints( }; let anchor = determine_tx_anchor(cps, res.height, res.tx_hash); - - let tx_entry = update.graph_update.entry(res.tx_hash).or_default(); + let tx_entry = graph_update.0.entry(res.tx_hash).or_default(); if let Some(anchor) = anchor { tx_entry.insert(anchor); } @@ -467,10 +456,10 @@ fn populate_with_outpoints( Ok(full_txs) } -fn populate_with_txids( +fn populate_with_txids( client: &Client, cps: &BTreeMap, - update: &mut ElectrumUpdate, + graph_update: &mut IncompleteTxGraph, txids: &mut impl Iterator, ) -> Result<(), Error> { for txid in txids { @@ -495,7 +484,7 @@ fn populate_with_txids( None => continue, }; - let tx_entry = update.graph_update.entry(txid).or_default(); + let tx_entry = graph_update.0.entry(txid).or_default(); if let Some(anchor) = anchor { tx_entry.insert(anchor); } @@ -503,10 +492,10 @@ fn populate_with_txids( Ok(()) } -fn populate_with_spks( +fn populate_with_spks( client: &Client, cps: &BTreeMap, - update: &mut ElectrumUpdate, + graph_update: &mut IncompleteTxGraph, spks: &mut impl Iterator, stop_gap: usize, batch_size: usize, @@ -539,7 +528,7 @@ fn populate_with_spks( } for tx in spk_history { - let tx_entry = update.graph_update.entry(tx.tx_hash).or_default(); + let tx_entry = graph_update.0.entry(tx.tx_hash).or_default(); if let Some(anchor) = determine_tx_anchor(cps, tx.height, tx.tx_hash) { tx_entry.insert(anchor); } diff --git a/crates/electrum/src/lib.rs b/crates/electrum/src/lib.rs index 716c4d3f7..097726268 100644 --- a/crates/electrum/src/lib.rs +++ b/crates/electrum/src/lib.rs @@ -1,14 +1,16 @@ //! This crate is used for updating structures of the [`bdk_chain`] crate with data from electrum. //! //! The star of the show is the [`ElectrumExt::scan`] method, which scans for relevant blockchain -//! data (via electrum) and outputs an [`ElectrumUpdate`]. +//! data (via electrum) and outputs updates for [`bdk_chain`] structures as a tuple of form: //! -//! An [`ElectrumUpdate`] only includes `txid`s and no full transactions. The caller is responsible -//! for obtaining full transactions before applying. This can be done with +//! ([`bdk_chain::local_chain::Update`], [`IncompleteTxGraph`], `keychain_update`) +//! +//! An [`IncompleteTxGraph`] only includes `txid`s and no full transactions. The caller is +//! responsible for obtaining full transactions before applying. This can be done with //! these steps: //! //! 1. Determine which full transactions are missing. The method [`missing_full_txs`] of -//! [`ElectrumUpdate`] can be used. +//! [`IncompleteTxGraph`] can be used. //! //! 2. Obtaining the full transactions. To do this via electrum, the method //! [`batch_transaction_get`] can be used. @@ -16,7 +18,7 @@ //! Refer to [`bdk_electrum_example`] for a complete example. //! //! [`ElectrumClient::scan`]: electrum_client::ElectrumClient::scan -//! [`missing_full_txs`]: ElectrumUpdate::missing_full_txs +//! [`missing_full_txs`]: IncompleteTxGraph::missing_full_txs //! [`batch_transaction_get`]: electrum_client::ElectrumApi::batch_transaction_get //! [`bdk_electrum_example`]: https://github.com/LLFourn/bdk_core_staging/tree/master/bdk_electrum_example diff --git a/example-crates/example_electrum/src/main.rs b/example-crates/example_electrum/src/main.rs index 2a5c1310c..e8f1730ba 100644 --- a/example-crates/example_electrum/src/main.rs +++ b/example-crates/example_electrum/src/main.rs @@ -7,13 +7,13 @@ use std::{ use bdk_chain::{ bitcoin::{Address, Network, OutPoint, ScriptBuf, Txid}, indexed_tx_graph::{self, IndexedTxGraph}, - keychain::WalletChangeSet, - local_chain::LocalChain, + keychain, + local_chain::{self, LocalChain}, Append, ConfirmationHeightAnchor, }; use bdk_electrum::{ electrum_client::{self, ElectrumApi}, - ElectrumExt, ElectrumUpdate, + ElectrumExt, }; use example_cli::{ anyhow::{self, Context}, @@ -60,7 +60,10 @@ pub struct ScanOptions { pub batch_size: usize, } -type ChangeSet = WalletChangeSet; +type ChangeSet = ( + local_chain::ChangeSet, + indexed_tx_graph::ChangeSet>, +); fn main() -> anyhow::Result<()> { let (args, keymap, index, db, init_changeset) = @@ -68,11 +71,11 @@ fn main() -> anyhow::Result<()> { let graph = Mutex::new({ let mut graph = IndexedTxGraph::new(index); - graph.apply_changeset(init_changeset.index_tx_graph); + graph.apply_changeset(init_changeset.1); graph }); - let chain = Mutex::new(LocalChain::from_changeset(init_changeset.chain)); + let chain = Mutex::new(LocalChain::from_changeset(init_changeset.0)); let electrum_url = match args.network { Network::Bitcoin => "ssl://electrum.blockstream.info:50002", @@ -248,20 +251,18 @@ fn main() -> anyhow::Result<()> { // drop lock on graph and chain drop((graph, chain)); - let update = client + let (chain_update, graph_update) = client .scan_without_keychain(tip, spks, txids, outpoints, scan_options.batch_size) .context("scanning the blockchain")?; - ElectrumUpdate { - graph_update: update.graph_update, - new_tip: update.new_tip, - keychain_update: BTreeMap::new(), - } + (chain_update, graph_update, BTreeMap::new()) } }; + let (chain_update, incomplete_graph_update, keychain_update) = response; + let missing_txids = { let graph = &*graph.lock().unwrap(); - response.missing_full_txs(graph.graph()) + incomplete_graph_update.missing_full_txs(graph.graph()) }; let now = std::time::UNIX_EPOCH @@ -269,32 +270,27 @@ fn main() -> anyhow::Result<()> { .expect("must get time") .as_secs(); - let final_update = response.finalize(&client, Some(now), missing_txids)?; + let graph_update = incomplete_graph_update.finalize(&client, Some(now), missing_txids)?; let db_changeset = { let mut chain = chain.lock().unwrap(); let mut graph = graph.lock().unwrap(); - let chain = chain.apply_update(final_update.chain)?; + let chain = chain.apply_update(chain_update)?; let index_tx_graph = { let mut changeset = indexed_tx_graph::ChangeSet::::default(); - let (_, indexer) = graph - .index - .reveal_to_target_multi(&final_update.last_active_indices); + let (_, indexer) = graph.index.reveal_to_target_multi(&keychain_update); changeset.append(indexed_tx_graph::ChangeSet { indexer, ..Default::default() }); - changeset.append(graph.apply_update(final_update.graph)); + changeset.append(graph.apply_update(graph_update)); changeset }; - ChangeSet { - index_tx_graph, - chain, - } + (chain, index_tx_graph) }; let mut db = db.lock().unwrap(); diff --git a/example-crates/example_rpc/Cargo.toml b/example-crates/example_rpc/Cargo.toml new file mode 100644 index 000000000..c107c49b6 --- /dev/null +++ b/example-crates/example_rpc/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "example_rpc" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bdk_chain = { path = "../../crates/chain", features = ["serde"] } +bdk_bitcoind_rpc = { path = "../../crates/bitcoind_rpc" } +example_cli = { path = "../example_cli" } +ctrlc = { version = "^2" } diff --git a/example-crates/example_rpc/src/main.rs b/example-crates/example_rpc/src/main.rs new file mode 100644 index 000000000..3a80e131c --- /dev/null +++ b/example-crates/example_rpc/src/main.rs @@ -0,0 +1,304 @@ +use std::{ + path::PathBuf, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::sync_channel, + Arc, Mutex, + }, + time::{Duration, Instant, SystemTime}, +}; + +use bdk_bitcoind_rpc::{ + bitcoincore_rpc::{Auth, Client, RpcApi}, + EmittedUpdate, Emitter, +}; +use bdk_chain::{ + bitcoin::{address, Address, Transaction}, + indexed_tx_graph, keychain, + local_chain::{self, LocalChain}, + Append, BlockId, ConfirmationTimeAnchor, IndexedTxGraph, +}; +use example_cli::{ + anyhow, + clap::{self, Args, Subcommand}, + CoinSelectionAlgo, Keychain, +}; + +const DB_MAGIC: &[u8] = b"bdk_example_rpc"; +const DB_PATH: &str = ".bdk_example_rpc.db"; +const CHANNEL_BOUND: usize = 10; +const LIVE_POLL_DUR_SECS: Duration = Duration::from_secs(15); + +type ChangeSet = ( + local_chain::ChangeSet, + indexed_tx_graph::ChangeSet>, +); + +#[derive(Args, Debug, Clone)] +struct RpcArgs { + /// RPC URL + #[clap(env = "RPC_URL", long, default_value = "127.0.0.1:8332")] + url: String, + /// RPC auth cookie file + #[clap(env = "RPC_COOKIE", long)] + rpc_cookie: Option, + /// RPC auth username + #[clap(env = "RPC_USER", long)] + rpc_user: Option, + /// RPC auth password + #[clap(env = "RPC_PASS", long)] + rpc_password: Option, +} + +impl From for Auth { + fn from(args: RpcArgs) -> Self { + match (args.rpc_cookie, args.rpc_user, args.rpc_password) { + (None, None, None) => Self::None, + (Some(path), _, _) => Self::CookieFile(path), + (_, Some(user), Some(pass)) => Self::UserPass(user, pass), + (_, Some(_), None) => panic!("rpc auth: missing rpc_pass"), + (_, None, Some(_)) => panic!("rpc auth: missing rpc_user"), + } + } +} + +#[derive(Subcommand, Debug, Clone)] +enum RpcCommands { + /// Syncs local state with remote state via RPC (starting from last point of agreement) and + /// stores/indexes relevant transactions + Sync { + /// Starting block height to fallback to if no point of agreement if found + #[clap(env = "FALLBACK_HEIGHT", long, default_value = "0")] + fallback_height: u32, + /// The unused-scripts lookahead will be kept at this size + #[clap(long, default_value = "10")] + lookahead: u32, + /// Whether to be live! + #[clap(long, default_value = "false")] + live: bool, + #[clap(flatten)] + rpc_args: RpcArgs, + }, + /// Create and broadcast a transaction. + Tx { + value: u64, + address: Address, + #[clap(short, default_value = "bnb")] + coin_select: CoinSelectionAlgo, + #[clap(flatten)] + rpc_args: RpcArgs, + }, +} + +impl RpcCommands { + fn rpc_args(&self) -> &RpcArgs { + match self { + RpcCommands::Sync { rpc_args, .. } => rpc_args, + RpcCommands::Tx { rpc_args, .. } => rpc_args, + } + } +} + +fn main() -> anyhow::Result<()> { + let sigterm_flag = start_ctrlc_handler(); + + let (args, keymap, index, db, init_changeset) = + example_cli::init::(DB_MAGIC, DB_PATH)?; + + let graph = Mutex::new({ + let mut graph = IndexedTxGraph::new(index); + graph.apply_changeset(init_changeset.1); + graph + }); + + let chain = Mutex::new(LocalChain::from_changeset(init_changeset.0)); + + let rpc_cmd = match args.command { + example_cli::Commands::ChainSpecific(rpc_cmd) => rpc_cmd, + general_cmd => { + let res = example_cli::handle_commands( + &graph, + &db, + &chain, + &keymap, + args.network, + |_| Err(anyhow::anyhow!("use `tx` instead")), + general_cmd, + ); + db.lock().unwrap().commit()?; + return res; + } + }; + + let rpc_client = { + let a = rpc_cmd.rpc_args(); + Client::new( + &a.url, + match (&a.rpc_cookie, &a.rpc_user, &a.rpc_password) { + (None, None, None) => Auth::None, + (Some(path), _, _) => Auth::CookieFile(path.clone()), + (_, Some(user), Some(pass)) => Auth::UserPass(user.clone(), pass.clone()), + (_, Some(_), None) => panic!("rpc auth: missing rpc_pass"), + (_, None, Some(_)) => panic!("rpc auth: missing rpc_user"), + }, + )? + }; + + match rpc_cmd { + RpcCommands::Sync { + fallback_height, + lookahead, + live, + .. + } => { + graph.lock().unwrap().index.set_lookahead_for_all(lookahead); + + let (chan, recv) = sync_channel::<(EmittedUpdate, u32)>(CHANNEL_BOUND); + let prev_cp = chain.lock().unwrap().tip(); + + let join_handle = std::thread::spawn(move || -> anyhow::Result<()> { + let mut tip_height = Option::::None; + + let mut emitter = Emitter::new(&rpc_client, fallback_height, prev_cp); + loop { + let item = emitter.emit_update()?; + let is_mempool = item.is_mempool(); + + if tip_height.is_none() || is_mempool { + tip_height = Some(rpc_client.get_block_count()? as u32); + } + chan.send((item, tip_height.expect("must have tip height")))?; + + if !is_mempool { + // break if sigterm is detected + if sigterm_flag.load(Ordering::Acquire) { + break; + } + continue; + } + + // everything after this point is a mempool update + // mempool update is emitted after we reach the chain tip + // if we are are in "sync-once" mode, we break here + // otherwise, we sleep or wait for sigterm + if !live || await_flag(&sigterm_flag, LIVE_POLL_DUR_SECS) { + break; + } + } + + Ok(()) + }); + + let mut start = Instant::now(); + + for (item, tip_height) in recv { + let chain_update = item.chain_update(); + let tip = item.checkpoint(); + + let db_changeset = { + let mut indexed_changeset = indexed_tx_graph::ChangeSet::default(); + let mut chain = chain.lock().unwrap(); + let mut graph = graph.lock().unwrap(); + + let graph_update = + item.indexed_tx_graph_update(bdk_bitcoind_rpc::confirmation_time_anchor); + indexed_changeset.append(graph.insert_relevant_txs(graph_update)); + + let chain_changeset = match chain_update { + Some(update) => chain.apply_update(update)?, + None => local_chain::ChangeSet::default(), + }; + + (chain_changeset, indexed_changeset) + }; + + let mut db = db.lock().unwrap(); + db.stage(db_changeset); + + // print stuff every 3 seconds + if start.elapsed() >= Duration::from_secs(3) { + start = Instant::now(); + let balance = { + let chain = chain.lock().unwrap(); + let graph = graph.lock().unwrap(); + graph.graph().balance( + &*chain, + chain.tip().map_or(BlockId::default(), |cp| cp.block_id()), + graph.index.outpoints().iter().cloned(), + |(k, _), _| k == &Keychain::Internal, + ) + }; + println!( + "* scanned_to: {} / {} tip | total: {} sats", + match tip { + Some(cp) => cp.height().to_string(), + None => "mempool".to_string(), + }, + tip_height, + balance.confirmed + + balance.immature + + balance.trusted_pending + + balance.untrusted_pending + ); + } + } + + db.lock().unwrap().commit()?; + println!("commited to database!"); + + join_handle + .join() + .expect("failed to join chain source thread") + } + RpcCommands::Tx { + value, + address, + coin_select, + .. + } => { + let chain = chain.lock().unwrap(); + let broadcast = move |tx: &Transaction| -> anyhow::Result<()> { + rpc_client.send_raw_transaction(tx)?; + Ok(()) + }; + example_cli::run_send_cmd( + &graph, + &db, + &*chain, + &keymap, + coin_select, + address + .require_network(args.network) + .expect("address has the wrong network"), + value, + broadcast, + ) + } + } +} + +fn start_ctrlc_handler() -> Arc { + let flag = Arc::new(AtomicBool::new(false)); + let cloned_flag = flag.clone(); + + ctrlc::set_handler(move || cloned_flag.store(true, Ordering::Release)); + + flag +} + +fn await_flag(flag: &AtomicBool, duration: Duration) -> bool { + let start = SystemTime::now(); + loop { + if flag.load(Ordering::Acquire) { + return true; + } + if SystemTime::now() + .duration_since(start) + .expect("should succeed") + >= duration + { + return false; + } + std::thread::sleep(Duration::from_secs(1)); + } +} diff --git a/example-crates/wallet_electrum/src/main.rs b/example-crates/wallet_electrum/src/main.rs index d53317f8c..186d5906c 100644 --- a/example-crates/wallet_electrum/src/main.rs +++ b/example-crates/wallet_electrum/src/main.rs @@ -7,6 +7,7 @@ use std::io::Write; use std::str::FromStr; use bdk::bitcoin::Address; +use bdk::wallet::WalletUpdate; use bdk::SignOptions; use bdk::{bitcoin::Network, Wallet}; use bdk_electrum::electrum_client::{self, ElectrumApi}; @@ -52,14 +53,21 @@ fn main() -> Result<(), Box> { }) .collect(); - let electrum_update = client.scan(prev_tip, keychain_spks, None, None, STOP_GAP, BATCH_SIZE)?; + let (chain_update, incomplete_graph_update, keychain_update) = + client.scan(prev_tip, keychain_spks, None, None, STOP_GAP, BATCH_SIZE)?; println!(); - let missing = electrum_update.missing_full_txs(wallet.as_ref()); - let update = electrum_update.finalize_as_confirmation_time(&client, None, missing)?; + let missing = incomplete_graph_update.missing_full_txs(wallet.as_ref()); + let graph_update = + incomplete_graph_update.finalize_with_confirmation_time(&client, None, missing)?; - wallet.apply_update(update)?; + let wallet_update = WalletUpdate { + last_active_indices: keychain_update, + graph: graph_update, + chain: Some(chain_update), + }; + wallet.apply_update(wallet_update)?; wallet.commit()?; let balance = wallet.get_balance(); diff --git a/example-crates/wallet_esplora_async/src/main.rs b/example-crates/wallet_esplora_async/src/main.rs index 144e1edf5..435bea87e 100644 --- a/example-crates/wallet_esplora_async/src/main.rs +++ b/example-crates/wallet_esplora_async/src/main.rs @@ -2,8 +2,7 @@ use std::{io::Write, str::FromStr}; use bdk::{ bitcoin::{Address, Network}, - chain::keychain::WalletUpdate, - wallet::AddressIndex, + wallet::{AddressIndex, WalletUpdate}, SignOptions, Wallet, }; use bdk_esplora::{esplora_client, EsploraAsyncExt}; @@ -62,7 +61,7 @@ async fn main() -> Result<(), Box> { let update = WalletUpdate { last_active_indices, graph: update_graph, - ..WalletUpdate::new(chain_update) + chain: Some(chain_update), }; wallet.apply_update(update)?; wallet.commit()?; diff --git a/example-crates/wallet_esplora_blocking/src/main.rs b/example-crates/wallet_esplora_blocking/src/main.rs index 02d060430..2e5a850e3 100644 --- a/example-crates/wallet_esplora_blocking/src/main.rs +++ b/example-crates/wallet_esplora_blocking/src/main.rs @@ -7,8 +7,7 @@ use std::{io::Write, str::FromStr}; use bdk::{ bitcoin::{Address, Network}, - chain::keychain::WalletUpdate, - wallet::AddressIndex, + wallet::{AddressIndex, WalletUpdate}, SignOptions, Wallet, }; use bdk_esplora::{esplora_client, EsploraExt}; @@ -61,7 +60,7 @@ fn main() -> Result<(), Box> { let update = WalletUpdate { last_active_indices, graph: update_graph, - ..WalletUpdate::new(chain_update) + chain: Some(chain_update), }; wallet.apply_update(update)?;