diff --git a/Cargo.toml b/Cargo.toml index 78adeb450..2104196be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "example-crates/keychain_tracker_example_cli", "example-crates/wallet_electrum", "example-crates/wallet_esplora", + "example-crates/wallet_esplora_async", "nursery/tmp_plan", "nursery/coin_select" ] diff --git a/crates/esplora/Cargo.toml b/crates/esplora/Cargo.toml index 109a7399b..bacb2aad3 100644 --- a/crates/esplora/Cargo.toml +++ b/crates/esplora/Cargo.toml @@ -14,10 +14,11 @@ readme = "README.md" [dependencies] bdk_chain = { path = "../chain", version = "0.3.1", features = ["serde", "miniscript"] } esplora-client = { version = "0.3", default-features = false } -async-trait = "0.1.66" -futures = "0.3.26" +async-trait = { version = "0.1.66", optional = true } +futures = { version = "0.3.26", optional = true } [features] -default = ["async", "blocking"] -async = ["esplora-client/async"] +default = ["async-https", "blocking"] +async = ["async-trait", "futures", "esplora-client/async"] +async-https = ["async", "esplora-client/async-https"] blocking = ["esplora-client/blocking"] diff --git a/crates/esplora/src/async_ext.rs b/crates/esplora/src/async_ext.rs new file mode 100644 index 000000000..7796050f9 --- /dev/null +++ b/crates/esplora/src/async_ext.rs @@ -0,0 +1,296 @@ +use std::collections::BTreeMap; + +use async_trait::async_trait; +use bdk_chain::{ + bitcoin::{BlockHash, OutPoint, Script, Txid}, + chain_graph::ChainGraph, + keychain::KeychainScan, + sparse_chain, BlockId, ConfirmationTime, +}; +use esplora_client::{Error, OutputStatus}; +use futures::stream::{FuturesOrdered, TryStreamExt}; + +use crate::map_confirmation_time; + +#[async_trait(?Send)] +pub trait EsploraAsyncExt { + /// Scan the blockchain (via esplora) for the data specified and returns a [`KeychainScan`]. + /// + /// - `local_chain`: the most recent block hashes present locally + /// - `keychain_spks`: keychains that we want to scan transactions for + /// - `txids`: transactions that we want updated [`ChainPosition`]s for + /// - `outpoints`: transactions associated with these outpoints (residing, spending) that we + /// want to included in the update + /// + /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated + /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in + /// parallel. + /// + /// [`ChainPosition`]: bdk_chain::sparse_chain::ChainPosition + #[allow(clippy::result_large_err)] // FIXME + async fn scan( + &self, + local_chain: &BTreeMap, + keychain_spks: BTreeMap>, + txids: impl IntoIterator, + outpoints: impl IntoIterator, + stop_gap: usize, + parallel_requests: usize, + ) -> Result, Error>; + + /// Convenience method to call [`scan`] without requiring a keychain. + /// + /// [`scan`]: EsploraAsyncExt::scan + #[allow(clippy::result_large_err)] // FIXME + async fn scan_without_keychain( + &self, + local_chain: &BTreeMap, + misc_spks: impl IntoIterator, + txids: impl IntoIterator, + outpoints: impl IntoIterator, + parallel_requests: usize, + ) -> Result, Error> { + let wallet_scan = self + .scan( + local_chain, + [( + (), + misc_spks + .into_iter() + .enumerate() + .map(|(i, spk)| (i as u32, spk)), + )] + .into(), + txids, + outpoints, + usize::MAX, + parallel_requests, + ) + .await?; + + Ok(wallet_scan.update) + } +} + +#[async_trait(?Send)] +impl EsploraAsyncExt for esplora_client::AsyncClient { + async fn scan( + &self, + local_chain: &BTreeMap, + keychain_spks: BTreeMap>, + txids: impl IntoIterator, + outpoints: impl IntoIterator, + stop_gap: usize, + parallel_requests: usize, + ) -> Result, Error> { + let parallel_requests = parallel_requests.max(1); + let mut scan = KeychainScan::default(); + let update = &mut scan.update; + let last_active_indices = &mut scan.last_active_indices; + + for (&height, &original_hash) in local_chain.iter().rev() { + let update_block_id = BlockId { + height, + hash: self.get_block_hash(height).await?, + }; + let _ = update + .insert_checkpoint(update_block_id) + .expect("cannot repeat height here"); + if update_block_id.hash == original_hash { + break; + } + } + let tip_at_start = BlockId { + height: self.get_height().await?, + hash: self.get_tip_hash().await?, + }; + if let Err(failure) = update.insert_checkpoint(tip_at_start) { + match failure { + sparse_chain::InsertCheckpointError::HashNotMatching { .. } => { + // there has been a re-org before we started scanning. We haven't consumed any iterators so it's safe to recursively call. + return EsploraAsyncExt::scan( + self, + local_chain, + keychain_spks, + txids, + outpoints, + stop_gap, + parallel_requests, + ) + .await; + } + } + } + + for (keychain, spks) in keychain_spks { + let mut spks = spks.into_iter(); + let mut last_active_index = None; + let mut empty_scripts = 0; + type IndexWithTxs = (u32, Vec); + + loop { + let futures: FuturesOrdered<_> = (0..parallel_requests) + .filter_map(|_| { + let (index, script) = spks.next()?; + let client = self.clone(); + Some(async move { + let mut related_txs = client.scripthash_txs(&script, None).await?; + + let n_confirmed = + related_txs.iter().filter(|tx| tx.status.confirmed).count(); + // esplora pages on 25 confirmed transactions. If there's 25 or more we + // keep requesting to see if there's more. + if n_confirmed >= 25 { + loop { + let new_related_txs = client + .scripthash_txs( + &script, + Some(related_txs.last().unwrap().txid), + ) + .await?; + let n = new_related_txs.len(); + related_txs.extend(new_related_txs); + // we've reached the end + if n < 25 { + break; + } + } + } + + Result::<_, esplora_client::Error>::Ok((index, related_txs)) + }) + }) + .collect(); + + let n_futures = futures.len(); + + let idx_with_tx: Vec = futures.try_collect().await?; + + for (index, related_txs) in idx_with_tx { + if related_txs.is_empty() { + empty_scripts += 1; + } else { + last_active_index = Some(index); + empty_scripts = 0; + } + for tx in related_txs { + let confirmation_time = + map_confirmation_time(&tx.status, tip_at_start.height); + + if let Err(failure) = update.insert_tx(tx.to_tx(), confirmation_time) { + use bdk_chain::{ + chain_graph::InsertTxError, sparse_chain::InsertTxError::*, + }; + match failure { + InsertTxError::Chain(TxTooHigh { .. }) => { + unreachable!("chain position already checked earlier") + } + InsertTxError::Chain(TxMovedUnexpectedly { .. }) + | InsertTxError::UnresolvableConflict(_) => { + /* implies reorg during scan. We deal with that below */ + } + } + } + } + } + + if n_futures == 0 || empty_scripts >= stop_gap { + break; + } + } + + if let Some(last_active_index) = last_active_index { + last_active_indices.insert(keychain, last_active_index); + } + } + + for txid in txids.into_iter() { + let (tx, tx_status) = + match (self.get_tx(&txid).await?, self.get_tx_status(&txid).await?) { + (Some(tx), Some(tx_status)) => (tx, tx_status), + _ => continue, + }; + + let confirmation_time = map_confirmation_time(&tx_status, tip_at_start.height); + + if let Err(failure) = update.insert_tx(tx, confirmation_time) { + use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*}; + match failure { + InsertTxError::Chain(TxTooHigh { .. }) => { + unreachable!("chain position already checked earlier") + } + InsertTxError::Chain(TxMovedUnexpectedly { .. }) + | InsertTxError::UnresolvableConflict(_) => { + /* implies reorg during scan. We deal with that below */ + } + } + } + } + + for op in outpoints.into_iter() { + let mut op_txs = Vec::with_capacity(2); + if let (Some(tx), Some(tx_status)) = ( + self.get_tx(&op.txid).await?, + self.get_tx_status(&op.txid).await?, + ) { + op_txs.push((tx, tx_status)); + if let Some(OutputStatus { + txid: Some(txid), + status: Some(spend_status), + .. + }) = self.get_output_status(&op.txid, op.vout as _).await? + { + if let Some(spend_tx) = self.get_tx(&txid).await? { + op_txs.push((spend_tx, spend_status)); + } + } + } + + for (tx, status) in op_txs { + let confirmation_time = map_confirmation_time(&status, tip_at_start.height); + + if let Err(failure) = update.insert_tx(tx, confirmation_time) { + use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*}; + match failure { + InsertTxError::Chain(TxTooHigh { .. }) => { + unreachable!("chain position already checked earlier") + } + InsertTxError::Chain(TxMovedUnexpectedly { .. }) + | InsertTxError::UnresolvableConflict(_) => { + /* implies reorg during scan. We deal with that below */ + } + } + } + } + } + + let reorg_occurred = { + if let Some(checkpoint) = update.chain().latest_checkpoint() { + self.get_block_hash(checkpoint.height).await? != checkpoint.hash + } else { + false + } + }; + + if reorg_occurred { + // A reorg occurred so lets find out where all the txids we found are in the chain now. + // XXX: collect required because of weird type naming issues + let txids_found = update + .chain() + .txids() + .map(|(_, txid)| *txid) + .collect::>(); + scan.update = EsploraAsyncExt::scan_without_keychain( + self, + local_chain, + [], + txids_found, + [], + parallel_requests, + ) + .await?; + } + + Ok(scan) + } +} diff --git a/crates/esplora/src/blocking_ext.rs b/crates/esplora/src/blocking_ext.rs new file mode 100644 index 000000000..3f461c03b --- /dev/null +++ b/crates/esplora/src/blocking_ext.rs @@ -0,0 +1,290 @@ +use std::collections::BTreeMap; + +use bdk_chain::{ + bitcoin::{BlockHash, OutPoint, Script, Txid}, + chain_graph::ChainGraph, + keychain::KeychainScan, + sparse_chain, BlockId, ConfirmationTime, +}; +use esplora_client::{Error, OutputStatus}; + +use crate::map_confirmation_time; + +/// Trait to extend [`esplora_client::BlockingClient`] functionality. +/// +/// Refer to [crate-level documentation] for more. +/// +/// [crate-level documentation]: crate +pub trait EsploraExt { + /// Scan the blockchain (via esplora) for the data specified and returns a [`KeychainScan`]. + /// + /// - `local_chain`: the most recent block hashes present locally + /// - `keychain_spks`: keychains that we want to scan transactions for + /// - `txids`: transactions that we want updated [`ChainPosition`]s for + /// - `outpoints`: transactions associated with these outpoints (residing, spending) that we + /// want to included in the update + /// + /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated + /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in + /// parallel. + /// + /// [`ChainPosition`]: bdk_chain::sparse_chain::ChainPosition + #[allow(clippy::result_large_err)] // FIXME + fn scan( + &self, + local_chain: &BTreeMap, + keychain_spks: BTreeMap>, + txids: impl IntoIterator, + outpoints: impl IntoIterator, + stop_gap: usize, + parallel_requests: usize, + ) -> Result, Error>; + + /// Convenience method to call [`scan`] without requiring a keychain. + /// + /// [`scan`]: EsploraExt::scan + #[allow(clippy::result_large_err)] // FIXME + fn scan_without_keychain( + &self, + local_chain: &BTreeMap, + misc_spks: impl IntoIterator, + txids: impl IntoIterator, + outpoints: impl IntoIterator, + parallel_requests: usize, + ) -> Result, Error> { + let wallet_scan = self.scan( + local_chain, + [( + (), + misc_spks + .into_iter() + .enumerate() + .map(|(i, spk)| (i as u32, spk)), + )] + .into(), + txids, + outpoints, + usize::MAX, + parallel_requests, + )?; + + Ok(wallet_scan.update) + } +} + +impl EsploraExt for esplora_client::BlockingClient { + fn scan( + &self, + local_chain: &BTreeMap, + keychain_spks: BTreeMap>, + txids: impl IntoIterator, + outpoints: impl IntoIterator, + stop_gap: usize, + parallel_requests: usize, + ) -> Result, Error> { + let parallel_requests = parallel_requests.max(1); + let mut scan = KeychainScan::default(); + let update = &mut scan.update; + let last_active_indices = &mut scan.last_active_indices; + + for (&height, &original_hash) in local_chain.iter().rev() { + let update_block_id = BlockId { + height, + hash: self.get_block_hash(height)?, + }; + let _ = update + .insert_checkpoint(update_block_id) + .expect("cannot repeat height here"); + if update_block_id.hash == original_hash { + break; + } + } + let tip_at_start = BlockId { + height: self.get_height()?, + hash: self.get_tip_hash()?, + }; + if let Err(failure) = update.insert_checkpoint(tip_at_start) { + match failure { + sparse_chain::InsertCheckpointError::HashNotMatching { .. } => { + // there has been a re-org before we started scanning. We haven't consumed any iterators so it's safe to recursively call. + return EsploraExt::scan( + self, + local_chain, + keychain_spks, + txids, + outpoints, + stop_gap, + parallel_requests, + ); + } + } + } + + for (keychain, spks) in keychain_spks { + let mut spks = spks.into_iter(); + let mut last_active_index = None; + let mut empty_scripts = 0; + type IndexWithTxs = (u32, Vec); + + loop { + let handles = (0..parallel_requests) + .filter_map( + |_| -> Option>> { + let (index, script) = spks.next()?; + let client = self.clone(); + Some(std::thread::spawn(move || { + let mut related_txs = client.scripthash_txs(&script, None)?; + + let n_confirmed = + related_txs.iter().filter(|tx| tx.status.confirmed).count(); + // esplora pages on 25 confirmed transactions. If there's 25 or more we + // keep requesting to see if there's more. + if n_confirmed >= 25 { + loop { + let new_related_txs = client.scripthash_txs( + &script, + Some(related_txs.last().unwrap().txid), + )?; + let n = new_related_txs.len(); + related_txs.extend(new_related_txs); + // we've reached the end + if n < 25 { + break; + } + } + } + + Result::<_, esplora_client::Error>::Ok((index, related_txs)) + })) + }, + ) + .collect::>(); + + let n_handles = handles.len(); + + for handle in handles { + let (index, related_txs) = handle.join().unwrap()?; // TODO: don't unwrap + if related_txs.is_empty() { + empty_scripts += 1; + } else { + last_active_index = Some(index); + empty_scripts = 0; + } + for tx in related_txs { + let confirmation_time = + map_confirmation_time(&tx.status, tip_at_start.height); + + if let Err(failure) = update.insert_tx(tx.to_tx(), confirmation_time) { + use bdk_chain::{ + chain_graph::InsertTxError, sparse_chain::InsertTxError::*, + }; + match failure { + InsertTxError::Chain(TxTooHigh { .. }) => { + unreachable!("chain position already checked earlier") + } + InsertTxError::Chain(TxMovedUnexpectedly { .. }) + | InsertTxError::UnresolvableConflict(_) => { + /* implies reorg during scan. We deal with that below */ + } + } + } + } + } + + if n_handles == 0 || empty_scripts >= stop_gap { + break; + } + } + + if let Some(last_active_index) = last_active_index { + last_active_indices.insert(keychain, last_active_index); + } + } + + for txid in txids.into_iter() { + let (tx, tx_status) = match (self.get_tx(&txid)?, self.get_tx_status(&txid)?) { + (Some(tx), Some(tx_status)) => (tx, tx_status), + _ => continue, + }; + + let confirmation_time = map_confirmation_time(&tx_status, tip_at_start.height); + + if let Err(failure) = update.insert_tx(tx, confirmation_time) { + use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*}; + match failure { + InsertTxError::Chain(TxTooHigh { .. }) => { + unreachable!("chain position already checked earlier") + } + InsertTxError::Chain(TxMovedUnexpectedly { .. }) + | InsertTxError::UnresolvableConflict(_) => { + /* implies reorg during scan. We deal with that below */ + } + } + } + } + + for op in outpoints.into_iter() { + let mut op_txs = Vec::with_capacity(2); + if let (Some(tx), Some(tx_status)) = + (self.get_tx(&op.txid)?, self.get_tx_status(&op.txid)?) + { + op_txs.push((tx, tx_status)); + if let Some(OutputStatus { + txid: Some(txid), + status: Some(spend_status), + .. + }) = self.get_output_status(&op.txid, op.vout as _)? + { + if let Some(spend_tx) = self.get_tx(&txid)? { + op_txs.push((spend_tx, spend_status)); + } + } + } + + for (tx, status) in op_txs { + let confirmation_time = map_confirmation_time(&status, tip_at_start.height); + + if let Err(failure) = update.insert_tx(tx, confirmation_time) { + use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*}; + match failure { + InsertTxError::Chain(TxTooHigh { .. }) => { + unreachable!("chain position already checked earlier") + } + InsertTxError::Chain(TxMovedUnexpectedly { .. }) + | InsertTxError::UnresolvableConflict(_) => { + /* implies reorg during scan. We deal with that below */ + } + } + } + } + } + + let reorg_occurred = { + if let Some(checkpoint) = update.chain().latest_checkpoint() { + self.get_block_hash(checkpoint.height)? != checkpoint.hash + } else { + false + } + }; + + if reorg_occurred { + // A reorg occurred so lets find out where all the txids we found are in the chain now. + // XXX: collect required because of weird type naming issues + let txids_found = update + .chain() + .txids() + .map(|(_, txid)| *txid) + .collect::>(); + scan.update = EsploraExt::scan_without_keychain( + self, + local_chain, + [], + txids_found, + [], + parallel_requests, + )?; + } + + Ok(scan) + } +} diff --git a/crates/esplora/src/lib.rs b/crates/esplora/src/lib.rs index 9ef2b1bda..5964dc5b3 100644 --- a/crates/esplora/src/lib.rs +++ b/crates/esplora/src/lib.rs @@ -1,305 +1,27 @@ //! This crate is used for updating structures of [`bdk_chain`] with data from an esplora server. //! //! The star of the show is the [`EsploraExt::scan`] method which scans for relevant -//! blockchain data (via esplora) and outputs a [`KeychainScan`]. +//! blockchain data (via esplora) and outputs a [`KeychainScan`](bdk_chain::keychain::KeychainScan). -use async_trait::async_trait; -use bdk_chain::{ - bitcoin::{BlockHash, OutPoint, Script, Txid}, - chain_graph::ChainGraph, - keychain::KeychainScan, - sparse_chain, BlockId, ConfirmationTime, -}; -use esplora_client::{OutputStatus, TxStatus}; -use futures::stream::{FuturesOrdered, TryStreamExt}; -use std::collections::BTreeMap; +use bdk_chain::ConfirmationTime; +use esplora_client::TxStatus; pub use esplora_client; -use esplora_client::Error; - -/// Trait to extend [`esplora_client::BlockingClient`] functionality. -/// -/// Refer to [crate-level documentation] for more. -/// -/// [crate-level documentation]: crate #[cfg(feature = "blocking")] -pub trait EsploraExt { - /// Scan the blockchain (via esplora) for the data specified and returns a [`KeychainScan`]. - /// - /// - `local_chain`: the most recent block hashes present locally - /// - `keychain_spks`: keychains that we want to scan transactions for - /// - `txids`: transactions that we want updated [`ChainPosition`]s for - /// - `outpoints`: transactions associated with these outpoints (residing, spending) that we - /// want to included in the update - /// - /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated - /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in - /// parallel. - /// - /// [`ChainPosition`]: bdk_chain::sparse_chain::ChainPosition - #[allow(clippy::result_large_err)] // FIXME - fn scan( - &self, - local_chain: &BTreeMap, - keychain_spks: BTreeMap>, - txids: impl IntoIterator, - outpoints: impl IntoIterator, - stop_gap: usize, - parallel_requests: usize, - ) -> Result, Error>; - - /// Convenience method to call [`scan`] without requiring a keychain. - /// - /// [`scan`]: EsploraExt::scan - #[allow(clippy::result_large_err)] // FIXME - fn scan_without_keychain( - &self, - local_chain: &BTreeMap, - misc_spks: impl IntoIterator, - txids: impl IntoIterator, - outpoints: impl IntoIterator, - parallel_requests: usize, - ) -> Result, Error> { - let wallet_scan = self.scan( - local_chain, - [( - (), - misc_spks - .into_iter() - .enumerate() - .map(|(i, spk)| (i as u32, spk)), - )] - .into(), - txids, - outpoints, - usize::MAX, - parallel_requests, - )?; - - Ok(wallet_scan.update) - } -} - +mod blocking_ext; #[cfg(feature = "blocking")] -impl EsploraExt for esplora_client::BlockingClient { - fn scan( - &self, - local_chain: &BTreeMap, - keychain_spks: BTreeMap>, - txids: impl IntoIterator, - outpoints: impl IntoIterator, - stop_gap: usize, - parallel_requests: usize, - ) -> Result, Error> { - let parallel_requests = parallel_requests.max(1); - let mut scan = KeychainScan::default(); - let update = &mut scan.update; - let last_active_indices = &mut scan.last_active_indices; - - for (&height, &original_hash) in local_chain.iter().rev() { - let update_block_id = BlockId { - height, - hash: self.get_block_hash(height)?, - }; - let _ = update - .insert_checkpoint(update_block_id) - .expect("cannot repeat height here"); - if update_block_id.hash == original_hash { - break; - } - } - let tip_at_start = BlockId { - height: self.get_height()?, - hash: self.get_tip_hash()?, - }; - if let Err(failure) = update.insert_checkpoint(tip_at_start) { - match failure { - sparse_chain::InsertCheckpointError::HashNotMatching { .. } => { - // there has been a re-org before we started scanning. We haven't consumed any iterators so it's safe to recursively call. - return EsploraExt::scan( - self, - local_chain, - keychain_spks, - txids, - outpoints, - stop_gap, - parallel_requests, - ); - } - } - } +pub use blocking_ext::*; - for (keychain, spks) in keychain_spks { - let mut spks = spks.into_iter(); - let mut last_active_index = None; - let mut empty_scripts = 0; - type IndexWithTxs = (u32, Vec); - - loop { - let handles = (0..parallel_requests) - .filter_map( - |_| -> Option>> { - let (index, script) = spks.next()?; - let client = self.clone(); - Some(std::thread::spawn(move || { - let mut related_txs = client.scripthash_txs(&script, None)?; - - let n_confirmed = - related_txs.iter().filter(|tx| tx.status.confirmed).count(); - // esplora pages on 25 confirmed transactions. If there's 25 or more we - // keep requesting to see if there's more. - if n_confirmed >= 25 { - loop { - let new_related_txs = client.scripthash_txs( - &script, - Some(related_txs.last().unwrap().txid), - )?; - let n = new_related_txs.len(); - related_txs.extend(new_related_txs); - // we've reached the end - if n < 25 { - break; - } - } - } - - Result::<_, esplora_client::Error>::Ok((index, related_txs)) - })) - }, - ) - .collect::>(); - - let n_handles = handles.len(); - - for handle in handles { - let (index, related_txs) = handle.join().unwrap()?; // TODO: don't unwrap - if related_txs.is_empty() { - empty_scripts += 1; - } else { - last_active_index = Some(index); - empty_scripts = 0; - } - for tx in related_txs { - let confirmation_time = - map_confirmation_time(&tx.status, tip_at_start.height); - - if let Err(failure) = update.insert_tx(tx.to_tx(), confirmation_time) { - use bdk_chain::{ - chain_graph::InsertTxError, sparse_chain::InsertTxError::*, - }; - match failure { - InsertTxError::Chain(TxTooHigh { .. }) => { - unreachable!("chain position already checked earlier") - } - InsertTxError::Chain(TxMovedUnexpectedly { .. }) - | InsertTxError::UnresolvableConflict(_) => { - /* implies reorg during scan. We deal with that below */ - } - } - } - } - } - - if n_handles == 0 || empty_scripts >= stop_gap { - break; - } - } - - if let Some(last_active_index) = last_active_index { - last_active_indices.insert(keychain, last_active_index); - } - } - - for txid in txids.into_iter() { - let (tx, tx_status) = match (self.get_tx(&txid)?, self.get_tx_status(&txid)?) { - (Some(tx), Some(tx_status)) => (tx, tx_status), - _ => continue, - }; - - let confirmation_time = map_confirmation_time(&tx_status, tip_at_start.height); - - if let Err(failure) = update.insert_tx(tx, confirmation_time) { - use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*}; - match failure { - InsertTxError::Chain(TxTooHigh { .. }) => { - unreachable!("chain position already checked earlier") - } - InsertTxError::Chain(TxMovedUnexpectedly { .. }) - | InsertTxError::UnresolvableConflict(_) => { - /* implies reorg during scan. We deal with that below */ - } - } - } - } - - for op in outpoints.into_iter() { - let mut op_txs = Vec::with_capacity(2); - if let (Some(tx), Some(tx_status)) = - (self.get_tx(&op.txid)?, self.get_tx_status(&op.txid)?) - { - op_txs.push((tx, tx_status)); - if let Some(OutputStatus { - txid: Some(txid), - status: Some(spend_status), - .. - }) = self.get_output_status(&op.txid, op.vout as _)? - { - if let Some(spend_tx) = self.get_tx(&txid)? { - op_txs.push((spend_tx, spend_status)); - } - } - } - - for (tx, status) in op_txs { - let confirmation_time = map_confirmation_time(&status, tip_at_start.height); - - if let Err(failure) = update.insert_tx(tx, confirmation_time) { - use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*}; - match failure { - InsertTxError::Chain(TxTooHigh { .. }) => { - unreachable!("chain position already checked earlier") - } - InsertTxError::Chain(TxMovedUnexpectedly { .. }) - | InsertTxError::UnresolvableConflict(_) => { - /* implies reorg during scan. We deal with that below */ - } - } - } - } - } - - let reorg_occurred = { - if let Some(checkpoint) = update.chain().latest_checkpoint() { - self.get_block_hash(checkpoint.height)? != checkpoint.hash - } else { - false - } - }; - - if reorg_occurred { - // A reorg occurred so lets find out where all the txids we found are in the chain now. - // XXX: collect required because of weird type naming issues - let txids_found = update - .chain() - .txids() - .map(|(_, txid)| *txid) - .collect::>(); - scan.update = EsploraExt::scan_without_keychain( - self, - local_chain, - [], - txids_found, - [], - parallel_requests, - )?; - } - - Ok(scan) - } -} +#[cfg(feature = "async")] +mod async_ext; +#[cfg(feature = "async")] +pub use async_ext::*; -fn map_confirmation_time(tx_status: &TxStatus, height_at_start: u32) -> ConfirmationTime { +pub(crate) fn map_confirmation_time( + tx_status: &TxStatus, + height_at_start: u32, +) -> ConfirmationTime { match (tx_status.block_time, tx_status.block_height) { (Some(time), Some(height)) if height <= height_at_start => { ConfirmationTime::Confirmed { height, time } @@ -307,288 +29,3 @@ fn map_confirmation_time(tx_status: &TxStatus, height_at_start: u32) -> Confirma _ => ConfirmationTime::Unconfirmed, } } - -#[cfg(feature = "async")] -#[async_trait(?Send)] -pub trait EsploraAsyncExt { - /// Scan the blockchain (via esplora) for the data specified and returns a [`KeychainScan`]. - /// - /// - `local_chain`: the most recent block hashes present locally - /// - `keychain_spks`: keychains that we want to scan transactions for - /// - `txids`: transactions that we want updated [`ChainPosition`]s for - /// - `outpoints`: transactions associated with these outpoints (residing, spending) that we - /// want to included in the update - /// - /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated - /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in - /// parallel. - /// - /// [`ChainPosition`]: bdk_chain::sparse_chain::ChainPosition - #[allow(clippy::result_large_err)] // FIXME - async fn scan( - &self, - local_chain: &BTreeMap, - keychain_spks: BTreeMap>, - txids: impl IntoIterator, - outpoints: impl IntoIterator, - stop_gap: usize, - parallel_requests: usize, - ) -> Result, Error>; - - /// Convenience method to call [`scan`] without requiring a keychain. - /// - /// [`scan`]: EsploraAsyncExt::scan - #[allow(clippy::result_large_err)] // FIXME - async fn scan_without_keychain( - &self, - local_chain: &BTreeMap, - misc_spks: impl IntoIterator, - txids: impl IntoIterator, - outpoints: impl IntoIterator, - parallel_requests: usize, - ) -> Result, Error> { - let wallet_scan = self - .scan( - local_chain, - [( - (), - misc_spks - .into_iter() - .enumerate() - .map(|(i, spk)| (i as u32, spk)), - )] - .into(), - txids, - outpoints, - usize::MAX, - parallel_requests, - ) - .await?; - - Ok(wallet_scan.update) - } -} - -#[cfg(feature = "async")] -#[async_trait(?Send)] -impl EsploraAsyncExt for esplora_client::AsyncClient { - async fn scan( - &self, - local_chain: &BTreeMap, - keychain_spks: BTreeMap>, - txids: impl IntoIterator, - outpoints: impl IntoIterator, - stop_gap: usize, - parallel_requests: usize, - ) -> Result, Error> { - let parallel_requests = parallel_requests.max(1); - let mut scan = KeychainScan::default(); - let update = &mut scan.update; - let last_active_indices = &mut scan.last_active_indices; - - for (&height, &original_hash) in local_chain.iter().rev() { - let update_block_id = BlockId { - height, - hash: self.get_block_hash(height).await?, - }; - let _ = update - .insert_checkpoint(update_block_id) - .expect("cannot repeat height here"); - if update_block_id.hash == original_hash { - break; - } - } - let tip_at_start = BlockId { - height: self.get_height().await?, - hash: self.get_tip_hash().await?, - }; - if let Err(failure) = update.insert_checkpoint(tip_at_start) { - match failure { - sparse_chain::InsertCheckpointError::HashNotMatching { .. } => { - // there has been a re-org before we started scanning. We haven't consumed any iterators so it's safe to recursively call. - return EsploraAsyncExt::scan( - self, - local_chain, - keychain_spks, - txids, - outpoints, - stop_gap, - parallel_requests, - ) - .await; - } - } - } - - for (keychain, spks) in keychain_spks { - let mut spks = spks.into_iter(); - let mut last_active_index = None; - let mut empty_scripts = 0; - type IndexWithTxs = (u32, Vec); - - loop { - let futures: FuturesOrdered<_> = (0..parallel_requests) - .filter_map(|_| { - let (index, script) = spks.next()?; - let client = self.clone(); - Some(async move { - let mut related_txs = client.scripthash_txs(&script, None).await?; - - let n_confirmed = - related_txs.iter().filter(|tx| tx.status.confirmed).count(); - // esplora pages on 25 confirmed transactions. If there's 25 or more we - // keep requesting to see if there's more. - if n_confirmed >= 25 { - loop { - let new_related_txs = client - .scripthash_txs( - &script, - Some(related_txs.last().unwrap().txid), - ) - .await?; - let n = new_related_txs.len(); - related_txs.extend(new_related_txs); - // we've reached the end - if n < 25 { - break; - } - } - } - - Result::<_, esplora_client::Error>::Ok((index, related_txs)) - }) - }) - .collect(); - - let n_futures = futures.len(); - - let idx_with_tx: Vec = futures.try_collect().await?; - - for (index, related_txs) in idx_with_tx { - if related_txs.is_empty() { - empty_scripts += 1; - } else { - last_active_index = Some(index); - empty_scripts = 0; - } - for tx in related_txs { - let confirmation_time = - map_confirmation_time(&tx.status, tip_at_start.height); - - if let Err(failure) = update.insert_tx(tx.to_tx(), confirmation_time) { - use bdk_chain::{ - chain_graph::InsertTxError, sparse_chain::InsertTxError::*, - }; - match failure { - InsertTxError::Chain(TxTooHigh { .. }) => { - unreachable!("chain position already checked earlier") - } - InsertTxError::Chain(TxMovedUnexpectedly { .. }) - | InsertTxError::UnresolvableConflict(_) => { - /* implies reorg during scan. We deal with that below */ - } - } - } - } - } - - if n_futures == 0 || empty_scripts >= stop_gap { - break; - } - } - - if let Some(last_active_index) = last_active_index { - last_active_indices.insert(keychain, last_active_index); - } - } - - for txid in txids.into_iter() { - let (tx, tx_status) = - match (self.get_tx(&txid).await?, self.get_tx_status(&txid).await?) { - (Some(tx), Some(tx_status)) => (tx, tx_status), - _ => continue, - }; - - let confirmation_time = map_confirmation_time(&tx_status, tip_at_start.height); - - if let Err(failure) = update.insert_tx(tx, confirmation_time) { - use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*}; - match failure { - InsertTxError::Chain(TxTooHigh { .. }) => { - unreachable!("chain position already checked earlier") - } - InsertTxError::Chain(TxMovedUnexpectedly { .. }) - | InsertTxError::UnresolvableConflict(_) => { - /* implies reorg during scan. We deal with that below */ - } - } - } - } - - for op in outpoints.into_iter() { - let mut op_txs = Vec::with_capacity(2); - if let (Some(tx), Some(tx_status)) = ( - self.get_tx(&op.txid).await?, - self.get_tx_status(&op.txid).await?, - ) { - op_txs.push((tx, tx_status)); - if let Some(OutputStatus { - txid: Some(txid), - status: Some(spend_status), - .. - }) = self.get_output_status(&op.txid, op.vout as _).await? - { - if let Some(spend_tx) = self.get_tx(&txid).await? { - op_txs.push((spend_tx, spend_status)); - } - } - } - - for (tx, status) in op_txs { - let confirmation_time = map_confirmation_time(&status, tip_at_start.height); - - if let Err(failure) = update.insert_tx(tx, confirmation_time) { - use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*}; - match failure { - InsertTxError::Chain(TxTooHigh { .. }) => { - unreachable!("chain position already checked earlier") - } - InsertTxError::Chain(TxMovedUnexpectedly { .. }) - | InsertTxError::UnresolvableConflict(_) => { - /* implies reorg during scan. We deal with that below */ - } - } - } - } - } - - let reorg_occurred = { - if let Some(checkpoint) = update.chain().latest_checkpoint() { - self.get_block_hash(checkpoint.height).await? != checkpoint.hash - } else { - false - } - }; - - if reorg_occurred { - // A reorg occurred so lets find out where all the txids we found are in the chain now. - // XXX: collect required because of weird type naming issues - let txids_found = update - .chain() - .txids() - .map(|(_, txid)| *txid) - .collect::>(); - scan.update = EsploraAsyncExt::scan_without_keychain( - self, - local_chain, - [], - txids_found, - [], - parallel_requests, - ) - .await?; - } - - Ok(scan) - } -} diff --git a/example-crates/wallet_esplora/Cargo.toml b/example-crates/wallet_esplora/Cargo.toml index 917bea8f5..8e19cb7bd 100644 --- a/example-crates/wallet_esplora/Cargo.toml +++ b/example-crates/wallet_esplora/Cargo.toml @@ -8,5 +8,5 @@ publish = false [dependencies] bdk = { path = "../../crates/bdk" } -bdk_esplora = { path = "../../crates/esplora" } +bdk_esplora = { path = "../../crates/esplora", features = ["blocking"] } bdk_file_store = { path = "../../crates/file_store" } diff --git a/example-crates/wallet_esplora_async/Cargo.toml b/example-crates/wallet_esplora_async/Cargo.toml new file mode 100644 index 000000000..af368fc8d --- /dev/null +++ b/example-crates/wallet_esplora_async/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "wallet_esplora_async" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bdk = { path = "../../crates/bdk" } +bdk_esplora = { path = "../../crates/esplora", features = ["async-https"] } +bdk_file_store = { path = "../../crates/file_store" } +tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros"] } diff --git a/example-crates/wallet_esplora_async/src/main.rs b/example-crates/wallet_esplora_async/src/main.rs new file mode 100644 index 000000000..b78b09dfa --- /dev/null +++ b/example-crates/wallet_esplora_async/src/main.rs @@ -0,0 +1,99 @@ +use std::{io::Write, str::FromStr}; + +use bdk::{ + bitcoin::{Address, Network}, + wallet::AddressIndex, + SignOptions, Wallet, +}; +use bdk_esplora::{esplora_client, EsploraAsyncExt}; +use bdk_file_store::KeychainStore; + +const SEND_AMOUNT: u64 = 5000; +const STOP_GAP: usize = 50; +const PARALLEL_REQUESTS: usize = 5; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let db_path = std::env::temp_dir().join("bdk-esplora-example"); + let db = KeychainStore::new_from_path(db_path)?; + let external_descriptor = "wpkh(tprv8ZgxMBicQKsPdy6LMhUtFHAgpocR8GC6QmwMSFpZs7h6Eziw3SpThFfczTDh5rW2krkqffa11UpX3XkeTTB2FvzZKWXqPY54Y6Rq4AQ5R8L/84'/0'/0'/0/*)"; + let internal_descriptor = "wpkh(tprv8ZgxMBicQKsPdy6LMhUtFHAgpocR8GC6QmwMSFpZs7h6Eziw3SpThFfczTDh5rW2krkqffa11UpX3XkeTTB2FvzZKWXqPY54Y6Rq4AQ5R8L/84'/0'/0'/1/*)"; + + let mut wallet = Wallet::new( + external_descriptor, + Some(internal_descriptor), + db, + Network::Testnet, + )?; + + let address = wallet.get_address(AddressIndex::New); + println!("Generated Address: {}", address); + + let balance = wallet.get_balance(); + println!("Wallet balance before syncing: {} sats", balance.total()); + + print!("Syncing..."); + // Scanning the blockchain + let esplora_url = "https://mempool.space/testnet/api"; + let client = esplora_client::Builder::new(esplora_url).build_async()?; + let checkpoints = wallet.checkpoints(); + let spks = wallet + .spks_of_all_keychains() + .into_iter() + .map(|(k, spks)| { + let mut first = true; + ( + k, + spks.inspect(move |(spk_i, _)| { + if first { + first = false; + print!("\nScanning keychain [{:?}]:", k); + } + print!(" {}", spk_i); + let _ = std::io::stdout().flush(); + }), + ) + }) + .collect(); + let update = client + .scan( + checkpoints, + spks, + std::iter::empty(), + std::iter::empty(), + STOP_GAP, + PARALLEL_REQUESTS, + ) + .await?; + println!(); + wallet.apply_update(update)?; + wallet.commit()?; + + let balance = wallet.get_balance(); + println!("Wallet balance after syncing: {} sats", balance.total()); + + if balance.total() < SEND_AMOUNT { + println!( + "Please send at least {} sats to the receiving address", + SEND_AMOUNT + ); + std::process::exit(0); + } + + let faucet_address = Address::from_str("mkHS9ne12qx9pS9VojpwU5xtRd4T7X7ZUt")?; + + let mut tx_builder = wallet.build_tx(); + tx_builder + .add_recipient(faucet_address.script_pubkey(), SEND_AMOUNT) + .enable_rbf(); + + let (mut psbt, _) = tx_builder.finish()?; + let finalized = wallet.sign(&mut psbt, SignOptions::default())?; + assert!(finalized); + + let tx = psbt.extract_tx(); + client.broadcast(&tx).await?; + println!("Tx broadcasted! Txid: {}", tx.txid()); + + Ok(()) +}