diff --git a/crates/esplora/Cargo.toml b/crates/esplora/Cargo.toml index 1cd0d1c0d..109a7399b 100644 --- a/crates/esplora/Cargo.toml +++ b/crates/esplora/Cargo.toml @@ -14,6 +14,8 @@ readme = "README.md" [dependencies] bdk_chain = { path = "../chain", version = "0.3.1", features = ["serde", "miniscript"] } esplora-client = { version = "0.3", default-features = false } +async-trait = "0.1.66" +futures = "0.3.26" [features] default = ["async", "blocking"] diff --git a/crates/esplora/src/lib.rs b/crates/esplora/src/lib.rs index 18ab22b60..9ef2b1bda 100644 --- a/crates/esplora/src/lib.rs +++ b/crates/esplora/src/lib.rs @@ -3,6 +3,7 @@ //! The star of the show is the [`EsploraExt::scan`] method which scans for relevant //! blockchain data (via esplora) and outputs a [`KeychainScan`]. +use async_trait::async_trait; use bdk_chain::{ bitcoin::{BlockHash, OutPoint, Script, Txid}, chain_graph::ChainGraph, @@ -10,6 +11,7 @@ use bdk_chain::{ sparse_chain, BlockId, ConfirmationTime, }; use esplora_client::{OutputStatus, TxStatus}; +use futures::stream::{FuturesOrdered, TryStreamExt}; use std::collections::BTreeMap; pub use esplora_client; @@ -20,6 +22,8 @@ use esplora_client::Error; /// Refer to [crate-level documentation] for more. /// /// [crate-level documentation]: crate + +#[cfg(feature = "blocking")] pub trait EsploraExt { /// Scan the blockchain (via esplora) for the data specified and returns a [`KeychainScan`]. /// @@ -305,16 +309,286 @@ fn map_confirmation_time(tx_status: &TxStatus, height_at_start: u32) -> Confirma } #[cfg(feature = "async")] -impl EsploraExt for esplora_client::AsyncClient { - fn scan( +#[async_trait(?Send)] +pub trait EsploraAsyncExt { + /// Scan the blockchain (via esplora) for the data specified and returns a [`KeychainScan`]. + /// + /// - `local_chain`: the most recent block hashes present locally + /// - `keychain_spks`: keychains that we want to scan transactions for + /// - `txids`: transactions that we want updated [`ChainPosition`]s for + /// - `outpoints`: transactions associated with these outpoints (residing, spending) that we + /// want to included in the update + /// + /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated + /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in + /// parallel. + /// + /// [`ChainPosition`]: bdk_chain::sparse_chain::ChainPosition + #[allow(clippy::result_large_err)] // FIXME + async fn scan( &self, - _local_chain: &BTreeMap, - _keychain_spks: BTreeMap>, - _txids: impl IntoIterator, - _outpoints: impl IntoIterator, - _stop_gap: usize, - _parallel_requests: usize, + local_chain: &BTreeMap, + keychain_spks: BTreeMap>, + txids: impl IntoIterator, + outpoints: impl IntoIterator, + stop_gap: usize, + parallel_requests: usize, + ) -> Result, Error>; + + /// Convenience method to call [`scan`] without requiring a keychain. + /// + /// [`scan`]: EsploraAsyncExt::scan + #[allow(clippy::result_large_err)] // FIXME + async fn scan_without_keychain( + &self, + local_chain: &BTreeMap, + misc_spks: impl IntoIterator, + txids: impl IntoIterator, + outpoints: impl IntoIterator, + parallel_requests: usize, + ) -> Result, Error> { + let wallet_scan = self + .scan( + local_chain, + [( + (), + misc_spks + .into_iter() + .enumerate() + .map(|(i, spk)| (i as u32, spk)), + )] + .into(), + txids, + outpoints, + usize::MAX, + parallel_requests, + ) + .await?; + + Ok(wallet_scan.update) + } +} + +#[cfg(feature = "async")] +#[async_trait(?Send)] +impl EsploraAsyncExt for esplora_client::AsyncClient { + async fn scan( + &self, + local_chain: &BTreeMap, + keychain_spks: BTreeMap>, + txids: impl IntoIterator, + outpoints: impl IntoIterator, + stop_gap: usize, + parallel_requests: usize, ) -> Result, Error> { - todo!() + let parallel_requests = parallel_requests.max(1); + let mut scan = KeychainScan::default(); + let update = &mut scan.update; + let last_active_indices = &mut scan.last_active_indices; + + for (&height, &original_hash) in local_chain.iter().rev() { + let update_block_id = BlockId { + height, + hash: self.get_block_hash(height).await?, + }; + let _ = update + .insert_checkpoint(update_block_id) + .expect("cannot repeat height here"); + if update_block_id.hash == original_hash { + break; + } + } + let tip_at_start = BlockId { + height: self.get_height().await?, + hash: self.get_tip_hash().await?, + }; + if let Err(failure) = update.insert_checkpoint(tip_at_start) { + match failure { + sparse_chain::InsertCheckpointError::HashNotMatching { .. } => { + // there has been a re-org before we started scanning. We haven't consumed any iterators so it's safe to recursively call. + return EsploraAsyncExt::scan( + self, + local_chain, + keychain_spks, + txids, + outpoints, + stop_gap, + parallel_requests, + ) + .await; + } + } + } + + for (keychain, spks) in keychain_spks { + let mut spks = spks.into_iter(); + let mut last_active_index = None; + let mut empty_scripts = 0; + type IndexWithTxs = (u32, Vec); + + loop { + let futures: FuturesOrdered<_> = (0..parallel_requests) + .filter_map(|_| { + let (index, script) = spks.next()?; + let client = self.clone(); + Some(async move { + let mut related_txs = client.scripthash_txs(&script, None).await?; + + let n_confirmed = + related_txs.iter().filter(|tx| tx.status.confirmed).count(); + // esplora pages on 25 confirmed transactions. If there's 25 or more we + // keep requesting to see if there's more. + if n_confirmed >= 25 { + loop { + let new_related_txs = client + .scripthash_txs( + &script, + Some(related_txs.last().unwrap().txid), + ) + .await?; + let n = new_related_txs.len(); + related_txs.extend(new_related_txs); + // we've reached the end + if n < 25 { + break; + } + } + } + + Result::<_, esplora_client::Error>::Ok((index, related_txs)) + }) + }) + .collect(); + + let n_futures = futures.len(); + + let idx_with_tx: Vec = futures.try_collect().await?; + + for (index, related_txs) in idx_with_tx { + if related_txs.is_empty() { + empty_scripts += 1; + } else { + last_active_index = Some(index); + empty_scripts = 0; + } + for tx in related_txs { + let confirmation_time = + map_confirmation_time(&tx.status, tip_at_start.height); + + if let Err(failure) = update.insert_tx(tx.to_tx(), confirmation_time) { + use bdk_chain::{ + chain_graph::InsertTxError, sparse_chain::InsertTxError::*, + }; + match failure { + InsertTxError::Chain(TxTooHigh { .. }) => { + unreachable!("chain position already checked earlier") + } + InsertTxError::Chain(TxMovedUnexpectedly { .. }) + | InsertTxError::UnresolvableConflict(_) => { + /* implies reorg during scan. We deal with that below */ + } + } + } + } + } + + if n_futures == 0 || empty_scripts >= stop_gap { + break; + } + } + + if let Some(last_active_index) = last_active_index { + last_active_indices.insert(keychain, last_active_index); + } + } + + for txid in txids.into_iter() { + let (tx, tx_status) = + match (self.get_tx(&txid).await?, self.get_tx_status(&txid).await?) { + (Some(tx), Some(tx_status)) => (tx, tx_status), + _ => continue, + }; + + let confirmation_time = map_confirmation_time(&tx_status, tip_at_start.height); + + if let Err(failure) = update.insert_tx(tx, confirmation_time) { + use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*}; + match failure { + InsertTxError::Chain(TxTooHigh { .. }) => { + unreachable!("chain position already checked earlier") + } + InsertTxError::Chain(TxMovedUnexpectedly { .. }) + | InsertTxError::UnresolvableConflict(_) => { + /* implies reorg during scan. We deal with that below */ + } + } + } + } + + for op in outpoints.into_iter() { + let mut op_txs = Vec::with_capacity(2); + if let (Some(tx), Some(tx_status)) = ( + self.get_tx(&op.txid).await?, + self.get_tx_status(&op.txid).await?, + ) { + op_txs.push((tx, tx_status)); + if let Some(OutputStatus { + txid: Some(txid), + status: Some(spend_status), + .. + }) = self.get_output_status(&op.txid, op.vout as _).await? + { + if let Some(spend_tx) = self.get_tx(&txid).await? { + op_txs.push((spend_tx, spend_status)); + } + } + } + + for (tx, status) in op_txs { + let confirmation_time = map_confirmation_time(&status, tip_at_start.height); + + if let Err(failure) = update.insert_tx(tx, confirmation_time) { + use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*}; + match failure { + InsertTxError::Chain(TxTooHigh { .. }) => { + unreachable!("chain position already checked earlier") + } + InsertTxError::Chain(TxMovedUnexpectedly { .. }) + | InsertTxError::UnresolvableConflict(_) => { + /* implies reorg during scan. We deal with that below */ + } + } + } + } + } + + let reorg_occurred = { + if let Some(checkpoint) = update.chain().latest_checkpoint() { + self.get_block_hash(checkpoint.height).await? != checkpoint.hash + } else { + false + } + }; + + if reorg_occurred { + // A reorg occurred so lets find out where all the txids we found are in the chain now. + // XXX: collect required because of weird type naming issues + let txids_found = update + .chain() + .txids() + .map(|(_, txid)| *txid) + .collect::>(); + scan.update = EsploraAsyncExt::scan_without_keychain( + self, + local_chain, + [], + txids_found, + [], + parallel_requests, + ) + .await?; + } + + Ok(scan) } }