diff --git a/crates/chain/src/spk_client.rs b/crates/chain/src/spk_client.rs index 3457dfef7..215273729 100644 --- a/crates/chain/src/spk_client.rs +++ b/crates/chain/src/spk_client.rs @@ -1,388 +1,566 @@ //! Helper types for spk-based blockchain clients. - use crate::{ - collections::BTreeMap, local_chain::CheckPoint, ConfirmationBlockTime, Indexed, TxGraph, + alloc::{boxed::Box, collections::VecDeque, vec::Vec}, + collections::BTreeMap, + local_chain::CheckPoint, + ConfirmationBlockTime, Indexed, TxGraph, }; -use alloc::boxed::Box; use bitcoin::{OutPoint, Script, ScriptBuf, Txid}; -use core::marker::PhantomData; -/// Data required to perform a spk-based blockchain client sync. -/// -/// A client sync fetches relevant chain data for a known list of scripts, transaction ids and -/// outpoints. The sync process also updates the chain from the given [`CheckPoint`]. -pub struct SyncRequest { - /// A checkpoint for the current chain [`LocalChain::tip`]. - /// The sync process will return a new chain update that extends this tip. - /// - /// [`LocalChain::tip`]: crate::local_chain::LocalChain::tip - pub chain_tip: CheckPoint, - /// Transactions that spend from or to these indexed script pubkeys. - pub spks: Box + Send>, - /// Transactions with these txids. - pub txids: Box + Send>, - /// Transactions with these outpoints or spent from these outpoints. - pub outpoints: Box + Send>, -} - -impl SyncRequest { - /// Construct a new [`SyncRequest`] from a given `cp` tip. - pub fn from_chain_tip(cp: CheckPoint) -> Self { +type InspectSync = dyn FnMut(SyncItem, SyncProgress) + Send + 'static; + +type InspectFullScan = dyn FnMut(K, u32, &Script) + Send + 'static; + +/// An item reported to the [`inspect`](SyncRequestBuilder::inspect) closure of [`SyncRequest`]. +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum SyncItem<'i, I> { + /// Script pubkey sync item. + Spk(I, &'i Script), + /// Txid sync item. + Txid(Txid), + /// Outpoint sync item. + OutPoint(OutPoint), +} + +impl<'i, I: core::fmt::Debug + core::any::Any> core::fmt::Display for SyncItem<'i, I> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + SyncItem::Spk(i, spk) => { + if (i as &dyn core::any::Any).is::<()>() { + write!(f, "script '{}'", spk) + } else { + write!(f, "script {:?} '{}'", i, spk) + } + } + SyncItem::Txid(txid) => write!(f, "txid '{}'", txid), + SyncItem::OutPoint(op) => write!(f, "outpoint '{}'", op), + } + } +} + +/// The progress of [`SyncRequest`]. +#[derive(Debug, Clone)] +pub struct SyncProgress { + /// Script pubkeys consumed by the request. + pub spks_consumed: usize, + /// Script pubkeys remaining in the request. + pub spks_remaining: usize, + /// Txids consumed by the request. + pub txids_consumed: usize, + /// Txids remaining in the request. + pub txids_remaining: usize, + /// Outpoints consumed by the request. + pub outpoints_consumed: usize, + /// Outpoints remaining in the request. + pub outpoints_remaining: usize, +} + +impl SyncProgress { + /// Total items, consumed and remaining, of the request. + pub fn total(&self) -> usize { + self.total_spks() + self.total_txids() + self.total_outpoints() + } + + /// Total script pubkeys, consumed and remaining, of the request. + pub fn total_spks(&self) -> usize { + self.spks_consumed + self.spks_remaining + } + + /// Total txids, consumed and remaining, of the request. + pub fn total_txids(&self) -> usize { + self.txids_consumed + self.txids_remaining + } + + /// Total outpoints, consumed and remaining, of the request. + pub fn total_outpoints(&self) -> usize { + self.outpoints_consumed + self.outpoints_remaining + } + + /// Total consumed items of the request. + pub fn consumed(&self) -> usize { + self.spks_consumed + self.txids_consumed + self.outpoints_consumed + } + + /// Total remaining items of the request. + pub fn remaining(&self) -> usize { + self.spks_remaining + self.txids_remaining + self.outpoints_remaining + } +} + +/// Builds a [`SyncRequest`]. +#[must_use] +pub struct SyncRequestBuilder { + inner: SyncRequest, +} + +impl Default for SyncRequestBuilder { + fn default() -> Self { Self { - chain_tip: cp, - spks: Box::new(core::iter::empty()), - txids: Box::new(core::iter::empty()), - outpoints: Box::new(core::iter::empty()), + inner: Default::default(), } } +} - /// Set the [`Script`]s that will be synced against. - /// - /// This consumes the [`SyncRequest`] and returns the updated one. - #[must_use] - pub fn set_spks( - mut self, - spks: impl IntoIterator + Send + 'static>, +#[cfg(feature = "miniscript")] +impl SyncRequestBuilder<(K, u32)> { + /// Add [`Script`]s that are revealed by the `indexer` of the given `spk_range` that will be + /// synced against. + pub fn revealed_spks_from_indexer( + self, + indexer: &crate::indexer::keychain_txout::KeychainTxOutIndex, + spk_range: impl core::ops::RangeBounds, ) -> Self { - self.spks = Box::new(spks.into_iter()); - self + use crate::alloc::borrow::ToOwned; + use crate::alloc::vec::Vec; + self.spks_with_labels( + indexer + .revealed_spks(spk_range) + .map(|(i, spk)| (i, spk.to_owned())) + .collect::>(), + ) } - /// Set the [`Txid`]s that will be synced against. - /// - /// This consumes the [`SyncRequest`] and returns the updated one. - #[must_use] - pub fn set_txids( - mut self, - txids: impl IntoIterator + Send + 'static>, + /// Add [`Script`]s that are revealed by the `indexer` but currently unused. + pub fn unused_spks_from_indexer( + self, + indexer: &crate::indexer::keychain_txout::KeychainTxOutIndex, ) -> Self { - self.txids = Box::new(txids.into_iter()); - self + use crate::alloc::borrow::ToOwned; + use crate::alloc::vec::Vec; + self.spks_with_labels( + indexer + .unused_spks() + .map(|(i, spk)| (i, spk.to_owned())) + .collect::>(), + ) } +} + +impl SyncRequestBuilder<()> { + /// Add [`Script`]s that will be synced against. + pub fn spks(self, spks: impl IntoIterator) -> Self { + self.spks_with_labels(spks.into_iter().map(|spk| ((), spk))) + } +} - /// Set the [`OutPoint`]s that will be synced against. +impl SyncRequestBuilder { + /// Set the initial chain tip for the sync request. /// - /// This consumes the [`SyncRequest`] and returns the updated one. - #[must_use] - pub fn set_outpoints( - mut self, - outpoints: impl IntoIterator< - IntoIter = impl ExactSizeIterator + Send + 'static, - >, - ) -> Self { - self.outpoints = Box::new(outpoints.into_iter()); + /// This is used to update [`LocalChain`](crate::local_chain::LocalChain). + pub fn chain_tip(mut self, cp: CheckPoint) -> Self { + self.inner.chain_tip = Some(cp); self } - /// Chain on additional [`Script`]s that will be synced against. - /// - /// This consumes the [`SyncRequest`] and returns the updated one. - #[must_use] - pub fn chain_spks( + /// Add [`Script`]s coupled with an associated label that will be synced against. + pub fn spks_with_labels( mut self, - spks: impl IntoIterator< - IntoIter = impl ExactSizeIterator + Send + 'static, - Item = ScriptBuf, - >, + spks: impl IntoIterator, ) -> Self { - self.spks = Box::new(ExactSizeChain::new(self.spks, spks.into_iter())); + self.inner.spks.extend(spks); self } - /// Chain on additional [`Txid`]s that will be synced against. - /// - /// This consumes the [`SyncRequest`] and returns the updated one. - #[must_use] - pub fn chain_txids( - mut self, - txids: impl IntoIterator< - IntoIter = impl ExactSizeIterator + Send + 'static, - Item = Txid, - >, - ) -> Self { - self.txids = Box::new(ExactSizeChain::new(self.txids, txids.into_iter())); + /// Add [`Txid`]s that will be synced against. + pub fn txids(mut self, txids: impl IntoIterator) -> Self { + self.inner.txids.extend(txids); self } - /// Chain on additional [`OutPoint`]s that will be synced against. - /// - /// This consumes the [`SyncRequest`] and returns the updated one. - #[must_use] - pub fn chain_outpoints( - mut self, - outpoints: impl IntoIterator< - IntoIter = impl ExactSizeIterator + Send + 'static, - Item = OutPoint, - >, - ) -> Self { - self.outpoints = Box::new(ExactSizeChain::new(self.outpoints, outpoints.into_iter())); + /// Add [`OutPoint`]s that will be synced against. + pub fn outpoints(mut self, outpoints: impl IntoIterator) -> Self { + self.inner.outpoints.extend(outpoints); self } - /// Add a closure that will be called for [`Script`]s previously added to this request. - /// - /// This consumes the [`SyncRequest`] and returns the updated one. - #[must_use] - pub fn inspect_spks( - mut self, - mut inspect: impl FnMut(&Script) + Send + Sync + 'static, - ) -> Self { - self.spks = Box::new(self.spks.inspect(move |spk| inspect(spk))); + /// Set the closure that will inspect every sync item visited. + pub fn inspect(mut self, inspect: F) -> Self + where + F: FnMut(SyncItem, SyncProgress) + Send + 'static, + { + self.inner.inspect = Box::new(inspect); self } - /// Add a closure that will be called for [`Txid`]s previously added to this request. + /// Build the [`SyncRequest`]. + pub fn build(self) -> SyncRequest { + self.inner + } +} + +/// Data required to perform a spk-based blockchain client sync. +/// +/// A client sync fetches relevant chain data for a known list of scripts, transaction ids and +/// outpoints. The sync process also updates the chain from the given +/// [`chain_tip`](SyncRequestBuilder::chain_tip) (if provided). +/// +/// ```rust +/// # use bdk_chain::{bitcoin::{hashes::Hash, ScriptBuf}, local_chain::LocalChain}; +/// # let (local_chain, _) = LocalChain::from_genesis_hash(Hash::all_zeros()); +/// # let scripts = [ScriptBuf::default(), ScriptBuf::default()]; +/// # use bdk_chain::spk_client::SyncRequest; +/// // Construct a sync request. +/// let sync_request = SyncRequest::builder() +/// // Provide chain tip of the local wallet. +/// .chain_tip(local_chain.tip()) +/// // Provide list of scripts to scan for transactions against. +/// .spks(scripts) +/// // This is called for every synced item. +/// .inspect(|item, progress| println!("{} (remaining: {})", item, progress.remaining())) +/// // Finish constructing the sync request. +/// .build(); +/// ``` +#[must_use] +pub struct SyncRequest { + chain_tip: Option, + spks: VecDeque<(I, ScriptBuf)>, + spks_consumed: usize, + txids: VecDeque, + txids_consumed: usize, + outpoints: VecDeque, + outpoints_consumed: usize, + inspect: Box>, +} + +impl Default for SyncRequest { + fn default() -> Self { + Self { + chain_tip: None, + spks: VecDeque::new(), + spks_consumed: 0, + txids: VecDeque::new(), + txids_consumed: 0, + outpoints: VecDeque::new(), + outpoints_consumed: 0, + inspect: Box::new(|_, _| {}), + } + } +} + +impl From> for SyncRequest { + fn from(builder: SyncRequestBuilder) -> Self { + builder.inner + } +} + +impl SyncRequest { + /// Start building a [`SyncRequest`]. + pub fn builder() -> SyncRequestBuilder { + SyncRequestBuilder { + inner: Default::default(), + } + } + + /// Get the [`SyncProgress`] of this request. + pub fn progress(&self) -> SyncProgress { + SyncProgress { + spks_consumed: self.spks_consumed, + spks_remaining: self.spks.len(), + txids_consumed: self.txids_consumed, + txids_remaining: self.txids.len(), + outpoints_consumed: self.outpoints_consumed, + outpoints_remaining: self.outpoints.len(), + } + } + + /// Get the chain tip [`CheckPoint`] of this request (if any). + pub fn chain_tip(&self) -> Option { + self.chain_tip.clone() + } + + /// Advances the sync request and returns the next [`ScriptBuf`]. /// - /// This consumes the [`SyncRequest`] and returns the updated one. - #[must_use] - pub fn inspect_txids(mut self, mut inspect: impl FnMut(&Txid) + Send + Sync + 'static) -> Self { - self.txids = Box::new(self.txids.inspect(move |txid| inspect(txid))); - self + /// Returns [`None`] when there are no more scripts remaining in the request. + pub fn next_spk(&mut self) -> Option { + let (i, spk) = self.spks.pop_front()?; + self.spks_consumed += 1; + self._call_inspect(SyncItem::Spk(i, spk.as_script())); + Some(spk) } - /// Add a closure that will be called for [`OutPoint`]s previously added to this request. + /// Advances the sync request and returns the next [`Txid`]. /// - /// This consumes the [`SyncRequest`] and returns the updated one. - #[must_use] - pub fn inspect_outpoints( - mut self, - mut inspect: impl FnMut(&OutPoint) + Send + Sync + 'static, - ) -> Self { - self.outpoints = Box::new(self.outpoints.inspect(move |op| inspect(op))); - self + /// Returns [`None`] when there are no more txids remaining in the request. + pub fn next_txid(&mut self) -> Option { + let txid = self.txids.pop_front()?; + self.txids_consumed += 1; + self._call_inspect(SyncItem::Txid(txid)); + Some(txid) } - /// Populate the request with revealed script pubkeys from `index` with the given `spk_range`. + /// Advances the sync request and returns the next [`OutPoint`]. /// - /// This consumes the [`SyncRequest`] and returns the updated one. - #[cfg(feature = "miniscript")] - #[must_use] - pub fn populate_with_revealed_spks( - self, - index: &crate::indexer::keychain_txout::KeychainTxOutIndex, - spk_range: impl core::ops::RangeBounds, - ) -> Self { - use alloc::borrow::ToOwned; - use alloc::vec::Vec; - self.chain_spks( - index - .revealed_spks(spk_range) - .map(|(_, spk)| spk.to_owned()) - .collect::>(), - ) + /// Returns [`None`] when there are no more outpoints in the request. + pub fn next_outpoint(&mut self) -> Option { + let outpoint = self.outpoints.pop_front()?; + self.outpoints_consumed += 1; + self._call_inspect(SyncItem::OutPoint(outpoint)); + Some(outpoint) + } + + /// Iterate over [`ScriptBuf`]s contained in this request. + pub fn iter_spks(&mut self) -> impl ExactSizeIterator + '_ { + SyncIter::::new(self) + } + + /// Iterate over [`Txid`]s contained in this request. + pub fn iter_txids(&mut self) -> impl ExactSizeIterator + '_ { + SyncIter::::new(self) + } + + /// Iterate over [`OutPoint`]s contained in this request. + pub fn iter_outpoints(&mut self) -> impl ExactSizeIterator + '_ { + SyncIter::::new(self) + } + + fn _call_inspect(&mut self, item: SyncItem) { + let progress = self.progress(); + (*self.inspect)(item, progress); } } /// Data returned from a spk-based blockchain client sync. /// /// See also [`SyncRequest`]. +#[must_use] +#[derive(Debug)] pub struct SyncResult { /// The update to apply to the receiving [`TxGraph`]. pub graph_update: TxGraph, /// The update to apply to the receiving [`LocalChain`](crate::local_chain::LocalChain). - pub chain_update: CheckPoint, -} - -/// Data required to perform a spk-based blockchain client full scan. -/// -/// A client full scan iterates through all the scripts for the given keychains, fetching relevant -/// data until some stop gap number of scripts is found that have no data. This operation is -/// generally only used when importing or restoring previously used keychains in which the list of -/// used scripts is not known. The full scan process also updates the chain from the given [`CheckPoint`]. -pub struct FullScanRequest { - /// A checkpoint for the current [`LocalChain::tip`]. - /// The full scan process will return a new chain update that extends this tip. - /// - /// [`LocalChain::tip`]: crate::local_chain::LocalChain::tip - pub chain_tip: CheckPoint, - /// Iterators of script pubkeys indexed by the keychain index. - pub spks_by_keychain: BTreeMap> + Send>>, + pub chain_update: Option, } -impl FullScanRequest { - /// Construct a new [`FullScanRequest`] from a given `chain_tip`. - #[must_use] - pub fn from_chain_tip(chain_tip: CheckPoint) -> Self { +impl Default for SyncResult { + fn default() -> Self { Self { - chain_tip, - spks_by_keychain: BTreeMap::new(), + graph_update: Default::default(), + chain_update: Default::default(), } } +} - /// Construct a new [`FullScanRequest`] from a given `chain_tip` and `index`. - /// - /// Unbounded script pubkey iterators for each keychain (`K`) are extracted using - /// [`KeychainTxOutIndex::all_unbounded_spk_iters`] and is used to populate the - /// [`FullScanRequest`]. - /// - /// [`KeychainTxOutIndex::all_unbounded_spk_iters`]: crate::indexer::keychain_txout::KeychainTxOutIndex::all_unbounded_spk_iters - #[cfg(feature = "miniscript")] - #[must_use] - pub fn from_keychain_txout_index( - chain_tip: CheckPoint, - index: &crate::indexer::keychain_txout::KeychainTxOutIndex, - ) -> Self - where - K: core::fmt::Debug, - { - let mut req = Self::from_chain_tip(chain_tip); - for (keychain, spks) in index.all_unbounded_spk_iters() { - req = req.set_spks_for_keychain(keychain, spks); +/// Builds a [`FullScanRequest`]. +#[must_use] +pub struct FullScanRequestBuilder { + inner: FullScanRequest, +} + +impl Default for FullScanRequestBuilder { + fn default() -> Self { + Self { + inner: Default::default(), } - req } +} - /// Set the [`Script`]s for a given `keychain`. - /// - /// This consumes the [`FullScanRequest`] and returns the updated one. - #[must_use] - pub fn set_spks_for_keychain( +#[cfg(feature = "miniscript")] +impl FullScanRequestBuilder { + /// Add spk iterators for each keychain tracked in `indexer`. + pub fn spks_from_indexer( mut self, - keychain: K, - spks: impl IntoIterator> + Send + 'static>, + indexer: &crate::indexer::keychain_txout::KeychainTxOutIndex, ) -> Self { - self.spks_by_keychain - .insert(keychain, Box::new(spks.into_iter())); + for (keychain, spks) in indexer.all_unbounded_spk_iters() { + self = self.spks_for_keychain(keychain, spks); + } self } +} - /// Chain on additional [`Script`]s that will be synced against. +impl FullScanRequestBuilder { + /// Set the initial chain tip for the full scan request. /// - /// This consumes the [`FullScanRequest`] and returns the updated one. - #[must_use] - pub fn chain_spks_for_keychain( + /// This is used to update [`LocalChain`](crate::local_chain::LocalChain). + pub fn chain_tip(mut self, tip: CheckPoint) -> Self { + self.inner.chain_tip = Some(tip); + self + } + + /// Set the spk iterator for a given `keychain`. + pub fn spks_for_keychain( mut self, keychain: K, spks: impl IntoIterator> + Send + 'static>, ) -> Self { - match self.spks_by_keychain.remove(&keychain) { - // clippy here suggests to remove `into_iter` from `spks.into_iter()`, but doing so - // results in a compilation error - #[allow(clippy::useless_conversion)] - Some(keychain_spks) => self - .spks_by_keychain - .insert(keychain, Box::new(keychain_spks.chain(spks.into_iter()))), - None => self - .spks_by_keychain - .insert(keychain, Box::new(spks.into_iter())), - }; + self.inner + .spks_by_keychain + .insert(keychain, Box::new(spks.into_iter())); self } - /// Add a closure that will be called for every [`Script`] previously added to any keychain in - /// this request. - /// - /// This consumes the [`SyncRequest`] and returns the updated one. - #[must_use] - pub fn inspect_spks_for_all_keychains( - mut self, - inspect: impl FnMut(K, u32, &Script) + Send + Sync + Clone + 'static, - ) -> Self + /// Set the closure that will inspect every sync item visited. + pub fn inspect(mut self, inspect: F) -> Self where - K: Send + 'static, + F: FnMut(K, u32, &Script) + Send + 'static, { - for (keychain, spks) in core::mem::take(&mut self.spks_by_keychain) { - let mut inspect = inspect.clone(); - self.spks_by_keychain.insert( - keychain.clone(), - Box::new(spks.inspect(move |(i, spk)| inspect(keychain.clone(), *i, spk))), - ); - } + self.inner.inspect = Box::new(inspect); self } - /// Add a closure that will be called for every [`Script`] previously added to a given - /// `keychain` in this request. - /// - /// This consumes the [`SyncRequest`] and returns the updated one. - #[must_use] - pub fn inspect_spks_for_keychain( - mut self, - keychain: K, - mut inspect: impl FnMut(u32, &Script) + Send + Sync + 'static, - ) -> Self - where - K: Send + 'static, - { - if let Some(spks) = self.spks_by_keychain.remove(&keychain) { - self.spks_by_keychain.insert( - keychain, - Box::new(spks.inspect(move |(i, spk)| inspect(*i, spk))), - ); + /// Build the [`FullScanRequest`]. + pub fn build(self) -> FullScanRequest { + self.inner + } +} + +/// Data required to perform a spk-based blockchain client full scan. +/// +/// A client full scan iterates through all the scripts for the given keychains, fetching relevant +/// data until some stop gap number of scripts is found that have no data. This operation is +/// generally only used when importing or restoring previously used keychains in which the list of +/// used scripts is not known. The full scan process also updates the chain from the given +/// [`chain_tip`](FullScanRequestBuilder::chain_tip) (if provided). +#[must_use] +pub struct FullScanRequest { + chain_tip: Option, + spks_by_keychain: BTreeMap> + Send>>, + inspect: Box>, +} + +impl From> for FullScanRequest { + fn from(builder: FullScanRequestBuilder) -> Self { + builder.inner + } +} + +impl Default for FullScanRequest { + fn default() -> Self { + Self { + chain_tip: None, + spks_by_keychain: Default::default(), + inspect: Box::new(|_, _, _| {}), + } + } +} + +impl FullScanRequest { + /// Start building a [`FullScanRequest`]. + pub fn builder() -> FullScanRequestBuilder { + FullScanRequestBuilder { + inner: Self::default(), + } + } + + /// Get the chain tip [`CheckPoint`] of this request (if any). + pub fn chain_tip(&self) -> Option { + self.chain_tip.clone() + } + + /// List all keychains contained in this request. + pub fn keychains(&self) -> Vec { + self.spks_by_keychain.keys().cloned().collect() + } + + /// Advances the full scan request and returns the next indexed [`ScriptBuf`] of the given + /// `keychain`. + pub fn next_spk(&mut self, keychain: K) -> Option> { + self.iter_spks(keychain).next() + } + + /// Iterate over indexed [`ScriptBuf`]s contained in this request of the given `keychain`. + pub fn iter_spks(&mut self, keychain: K) -> impl Iterator> + '_ { + let spks = self.spks_by_keychain.get_mut(&keychain); + let inspect = &mut self.inspect; + KeychainSpkIter { + keychain, + spks, + inspect, } - self } } /// Data returned from a spk-based blockchain client full scan. /// /// See also [`FullScanRequest`]. +#[must_use] +#[derive(Debug)] pub struct FullScanResult { /// The update to apply to the receiving [`LocalChain`](crate::local_chain::LocalChain). pub graph_update: TxGraph, /// The update to apply to the receiving [`TxGraph`]. - pub chain_update: CheckPoint, + pub chain_update: Option, /// Last active indices for the corresponding keychains (`K`). pub last_active_indices: BTreeMap, } -/// A version of [`core::iter::Chain`] which can combine two [`ExactSizeIterator`]s to form a new -/// [`ExactSizeIterator`]. -/// -/// The danger of this is explained in [the `ExactSizeIterator` docs] -/// (https://doc.rust-lang.org/core/iter/trait.ExactSizeIterator.html#when-shouldnt-an-adapter-be-exactsizeiterator). -/// This does not apply here since it would be impossible to scan an item count that overflows -/// `usize` anyway. -struct ExactSizeChain { - a: Option, - b: Option, - i: PhantomData, -} - -impl ExactSizeChain { - fn new(a: A, b: B) -> Self { - ExactSizeChain { - a: Some(a), - b: Some(b), - i: PhantomData, +impl Default for FullScanResult { + fn default() -> Self { + Self { + graph_update: Default::default(), + chain_update: Default::default(), + last_active_indices: Default::default(), } } } -impl Iterator for ExactSizeChain -where - A: Iterator, - B: Iterator, -{ - type Item = I; +struct KeychainSpkIter<'r, K> { + keychain: K, + spks: Option<&'r mut Box> + Send>>, + inspect: &'r mut Box>, +} + +impl<'r, K: Ord + Clone> Iterator for KeychainSpkIter<'r, K> { + type Item = Indexed; fn next(&mut self) -> Option { - if let Some(a) = &mut self.a { - let item = a.next(); - if item.is_some() { - return item; - } - self.a = None; - } - if let Some(b) = &mut self.b { - let item = b.next(); - if item.is_some() { - return item; - } - self.b = None; + let (i, spk) = self.spks.as_mut()?.next()?; + (*self.inspect)(self.keychain.clone(), i, &spk); + Some((i, spk)) + } +} + +struct SyncIter<'r, I, Item> { + request: &'r mut SyncRequest, + marker: core::marker::PhantomData, +} + +impl<'r, I, Item> SyncIter<'r, I, Item> { + fn new(request: &'r mut SyncRequest) -> Self { + Self { + request, + marker: core::marker::PhantomData, } - None } } -impl ExactSizeIterator for ExactSizeChain -where - A: ExactSizeIterator, - B: ExactSizeIterator, -{ - fn len(&self) -> usize { - let a_len = self.a.as_ref().map(|a| a.len()).unwrap_or(0); - let b_len = self.b.as_ref().map(|a| a.len()).unwrap_or(0); - a_len + b_len +impl<'r, I, Item> ExactSizeIterator for SyncIter<'r, I, Item> where SyncIter<'r, I, Item>: Iterator {} + +impl<'r, I> Iterator for SyncIter<'r, I, ScriptBuf> { + type Item = ScriptBuf; + + fn next(&mut self) -> Option { + self.request.next_spk() + } + + fn size_hint(&self) -> (usize, Option) { + let consumed = self.request.spks_consumed; + (consumed, Some(consumed)) + } +} + +impl<'r, I> Iterator for SyncIter<'r, I, Txid> { + type Item = Txid; + + fn next(&mut self) -> Option { + self.request.next_txid() + } + + fn size_hint(&self) -> (usize, Option) { + let consumed = self.request.txids_consumed; + (consumed, Some(consumed)) + } +} + +impl<'r, I> Iterator for SyncIter<'r, I, OutPoint> { + type Item = OutPoint; + + fn next(&mut self) -> Option { + self.request.next_outpoint() + } + + fn size_hint(&self) -> (usize, Option) { + let consumed = self.request.outpoints_consumed; + (consumed, Some(consumed)) } } diff --git a/crates/electrum/src/bdk_electrum_client.rs b/crates/electrum/src/bdk_electrum_client.rs index 9dfbdab73..3584e8bb6 100644 --- a/crates/electrum/src/bdk_electrum_client.rs +++ b/crates/electrum/src/bdk_electrum_client.rs @@ -126,17 +126,22 @@ impl BdkElectrumClient { /// [`Wallet.calculate_fee_rate`]: https://docs.rs/bdk_wallet/latest/bdk_wallet/struct.Wallet.html#method.calculate_fee_rate pub fn full_scan( &self, - request: FullScanRequest, + request: impl Into>, stop_gap: usize, batch_size: usize, fetch_prev_txouts: bool, ) -> Result, Error> { - let (tip, latest_blocks) = - fetch_tip_and_latest_blocks(&self.inner, request.chain_tip.clone())?; - let mut graph_update = TxGraph::::default(); - let mut last_active_indices = BTreeMap::::new(); + let mut request: FullScanRequest = request.into(); + + let tip_and_latest_blocks = match request.chain_tip() { + Some(chain_tip) => Some(fetch_tip_and_latest_blocks(&self.inner, chain_tip)?), + None => None, + }; - for (keychain, spks) in request.spks_by_keychain { + let mut graph_update = TxGraph::::default(); + let mut last_active_indices = BTreeMap::::default(); + for keychain in request.keychains() { + let spks = request.iter_spks(keychain.clone()); if let Some(last_active_index) = self.populate_with_spks(&mut graph_update, spks, stop_gap, batch_size)? { @@ -144,13 +149,20 @@ impl BdkElectrumClient { } } - let chain_update = chain_update(tip, &latest_blocks, graph_update.all_anchors())?; - // Fetch previous `TxOut`s for fee calculation if flag is enabled. if fetch_prev_txouts { self.fetch_prev_txout(&mut graph_update)?; } + let chain_update = match tip_and_latest_blocks { + Some((chain_tip, latest_blocks)) => Some(chain_update( + chain_tip, + &latest_blocks, + graph_update.all_anchors(), + )?), + _ => None, + }; + Ok(FullScanResult { graph_update, chain_update, @@ -180,35 +192,52 @@ impl BdkElectrumClient { /// [`CalculateFeeError::MissingTxOut`]: bdk_chain::tx_graph::CalculateFeeError::MissingTxOut /// [`Wallet.calculate_fee`]: https://docs.rs/bdk_wallet/latest/bdk_wallet/struct.Wallet.html#method.calculate_fee /// [`Wallet.calculate_fee_rate`]: https://docs.rs/bdk_wallet/latest/bdk_wallet/struct.Wallet.html#method.calculate_fee_rate - pub fn sync( + pub fn sync( &self, - request: SyncRequest, + request: impl Into>, batch_size: usize, fetch_prev_txouts: bool, ) -> Result { - let full_scan_req = FullScanRequest::from_chain_tip(request.chain_tip.clone()) - .set_spks_for_keychain((), request.spks.enumerate().map(|(i, spk)| (i as u32, spk))); - let mut full_scan_res = self.full_scan(full_scan_req, usize::MAX, batch_size, false)?; - let (tip, latest_blocks) = - fetch_tip_and_latest_blocks(&self.inner, request.chain_tip.clone())?; - - self.populate_with_txids(&mut full_scan_res.graph_update, request.txids)?; - self.populate_with_outpoints(&mut full_scan_res.graph_update, request.outpoints)?; - - let chain_update = chain_update( - tip, - &latest_blocks, - full_scan_res.graph_update.all_anchors(), - )?; + let mut request: SyncRequest = request.into(); + + let tip_and_latest_blocks = match request.chain_tip() { + Some(chain_tip) => Some(fetch_tip_and_latest_blocks(&self.inner, chain_tip)?), + None => None, + }; + + let full_scan_request = FullScanRequest::builder() + .spks_for_keychain( + (), + request + .iter_spks() + .enumerate() + .map(|(i, spk)| (i as u32, spk)) + .collect::>(), + ) + .build(); + let mut graph_update = self + .full_scan(full_scan_request, usize::MAX, batch_size, false)? + .graph_update; + self.populate_with_txids(&mut graph_update, request.iter_txids())?; + self.populate_with_outpoints(&mut graph_update, request.iter_outpoints())?; // Fetch previous `TxOut`s for fee calculation if flag is enabled. if fetch_prev_txouts { - self.fetch_prev_txout(&mut full_scan_res.graph_update)?; + self.fetch_prev_txout(&mut graph_update)?; } + let chain_update = match tip_and_latest_blocks { + Some((chain_tip, latest_blocks)) => Some(chain_update( + chain_tip, + &latest_blocks, + graph_update.all_anchors(), + )?), + None => None, + }; + Ok(SyncResult { + graph_update, chain_update, - graph_update: full_scan_res.graph_update, }) } diff --git a/crates/electrum/tests/test_electrum.rs b/crates/electrum/tests/test_electrum.rs index afe50be0a..63e91081b 100644 --- a/crates/electrum/tests/test_electrum.rs +++ b/crates/electrum/tests/test_electrum.rs @@ -39,7 +39,7 @@ where Spks::IntoIter: ExactSizeIterator + Send + 'static, { let mut update = client.sync( - SyncRequest::from_chain_tip(chain.tip()).chain_spks(spks), + SyncRequest::builder().chain_tip(chain.tip()).spks(spks), BATCH_SIZE, true, )?; @@ -51,9 +51,11 @@ where .as_secs(); let _ = update.graph_update.update_last_seen_unconfirmed(now); - let _ = chain - .apply_update(update.chain_update.clone()) - .map_err(|err| anyhow::anyhow!("LocalChain update error: {:?}", err))?; + if let Some(chain_update) = update.chain_update.clone() { + let _ = chain + .apply_update(chain_update) + .map_err(|err| anyhow::anyhow!("LocalChain update error: {:?}", err))?; + } let _ = graph.apply_update(update.graph_update.clone()); Ok(update) @@ -103,7 +105,9 @@ pub fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { let cp_tip = env.make_checkpoint_tip(); let sync_update = { - let request = SyncRequest::from_chain_tip(cp_tip.clone()).set_spks(misc_spks); + let request = SyncRequest::builder() + .chain_tip(cp_tip.clone()) + .spks(misc_spks); client.sync(request, 1, true)? }; @@ -207,15 +211,17 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> { // A scan with a stop_gap of 3 won't find the transaction, but a scan with a gap limit of 4 // will. let full_scan_update = { - let request = - FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + let request = FullScanRequest::builder() + .chain_tip(cp_tip.clone()) + .spks_for_keychain(0, spks.clone()); client.full_scan(request, 3, 1, false)? }; assert!(full_scan_update.graph_update.full_txs().next().is_none()); assert!(full_scan_update.last_active_indices.is_empty()); let full_scan_update = { - let request = - FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + let request = FullScanRequest::builder() + .chain_tip(cp_tip.clone()) + .spks_for_keychain(0, spks.clone()); client.full_scan(request, 4, 1, false)? }; assert_eq!( @@ -246,8 +252,9 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> { // A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will. // The last active indice won't be updated in the first case but will in the second one. let full_scan_update = { - let request = - FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + let request = FullScanRequest::builder() + .chain_tip(cp_tip.clone()) + .spks_for_keychain(0, spks.clone()); client.full_scan(request, 5, 1, false)? }; let txs: HashSet<_> = full_scan_update @@ -259,8 +266,9 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> { assert!(txs.contains(&txid_4th_addr)); assert_eq!(full_scan_update.last_active_indices[&0], 3); let full_scan_update = { - let request = - FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + let request = FullScanRequest::builder() + .chain_tip(cp_tip.clone()) + .spks_for_keychain(0, spks.clone()); client.full_scan(request, 6, 1, false)? }; let txs: HashSet<_> = full_scan_update @@ -311,7 +319,7 @@ fn test_sync() -> anyhow::Result<()> { let txid = env.send(&addr_to_track, SEND_AMOUNT)?; env.wait_until_electrum_sees_txid(txid, Duration::from_secs(6))?; - sync_with_electrum( + let _ = sync_with_electrum( &client, [spk_to_track.clone()], &mut recv_chain, @@ -332,7 +340,7 @@ fn test_sync() -> anyhow::Result<()> { env.mine_blocks(1, None)?; env.wait_until_electrum_sees_block(Duration::from_secs(6))?; - sync_with_electrum( + let _ = sync_with_electrum( &client, [spk_to_track.clone()], &mut recv_chain, @@ -353,7 +361,7 @@ fn test_sync() -> anyhow::Result<()> { env.reorg_empty_blocks(1)?; env.wait_until_electrum_sees_block(Duration::from_secs(6))?; - sync_with_electrum( + let _ = sync_with_electrum( &client, [spk_to_track.clone()], &mut recv_chain, @@ -373,7 +381,7 @@ fn test_sync() -> anyhow::Result<()> { env.mine_blocks(1, None)?; env.wait_until_electrum_sees_block(Duration::from_secs(6))?; - sync_with_electrum(&client, [spk_to_track], &mut recv_chain, &mut recv_graph)?; + let _ = sync_with_electrum(&client, [spk_to_track], &mut recv_chain, &mut recv_graph)?; // Check if balance is correct once transaction is confirmed again. assert_eq!( diff --git a/crates/esplora/src/async_ext.rs b/crates/esplora/src/async_ext.rs index fed6dd526..066b91e17 100644 --- a/crates/esplora/src/async_ext.rs +++ b/crates/esplora/src/async_ext.rs @@ -1,5 +1,4 @@ use std::collections::BTreeSet; -use std::usize; use async_trait::async_trait; use bdk_chain::spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult}; @@ -33,9 +32,9 @@ pub trait EsploraAsyncExt { /// the maximum number of HTTP requests to make in parallel. /// /// Refer to [crate-level docs](crate) for more. - async fn full_scan( + async fn full_scan> + Send>( &self, - request: FullScanRequest, + request: R, stop_gap: usize, parallel_requests: usize, ) -> Result, Error>; @@ -47,105 +46,52 @@ pub trait EsploraAsyncExt { /// in parallel. /// /// Refer to [crate-level docs](crate) for more. - async fn sync( + async fn sync> + Send>( &self, - request: SyncRequest, + request: R, parallel_requests: usize, ) -> Result; - - /// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning - /// `keychain_spks` against Esplora. - /// - /// `keychain_spks` is an *unbounded* indexed-[`ScriptBuf`] iterator that represents scripts - /// derived from a keychain. The scanning logic stops after a `stop_gap` number of consecutive - /// scripts with no transaction history is reached. `parallel_requests` specifies the maximum - /// number of HTTP requests to make in parallel. - /// - /// A [`TxGraph`] (containing the fetched transactions and anchors) and the last active - /// keychain index (if any) is returned. The last active keychain index is the keychain's last - /// script pubkey that contains a non-empty transaction history. - /// - /// Refer to [crate-level docs](crate) for more. - async fn fetch_txs_with_keychain_spks> + Send>( - &self, - keychain_spks: I, - stop_gap: usize, - parallel_requests: usize, - ) -> Result<(TxGraph, Option), Error>; - - /// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `spks` - /// against Esplora. - /// - /// Unlike with [`EsploraAsyncExt::fetch_txs_with_keychain_spks`], `spks` must be *bounded* as - /// all contained scripts will be scanned. `parallel_requests` specifies the maximum number of - /// HTTP requests to make in parallel. - /// - /// Refer to [crate-level docs](crate) for more. - async fn fetch_txs_with_spks + Send>( - &self, - spks: I, - parallel_requests: usize, - ) -> Result, Error> - where - I::IntoIter: ExactSizeIterator + Send; - - /// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `txids` - /// against Esplora. - /// - /// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel. - /// - /// Refer to [crate-level docs](crate) for more. - async fn fetch_txs_with_txids + Send>( - &self, - txids: I, - parallel_requests: usize, - ) -> Result, Error> - where - I::IntoIter: ExactSizeIterator + Send; - - /// Fetch transactions and [`ConfirmationBlockTime`]s that contain and spend the provided - /// `outpoints`. - /// - /// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel. - /// - /// Refer to [crate-level docs](crate) for more. - async fn fetch_txs_with_outpoints + Send>( - &self, - outpoints: I, - parallel_requests: usize, - ) -> Result, Error> - where - I::IntoIter: ExactSizeIterator + Send; } #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait)] impl EsploraAsyncExt for esplora_client::AsyncClient { - async fn full_scan( + async fn full_scan> + Send>( &self, - request: FullScanRequest, + request: R, stop_gap: usize, parallel_requests: usize, ) -> Result, Error> { - let latest_blocks = fetch_latest_blocks(self).await?; + let mut request = request.into(); + let keychains = request.keychains(); + + let chain_tip = request.chain_tip(); + let latest_blocks = if chain_tip.is_some() { + Some(fetch_latest_blocks(self).await?) + } else { + None + }; + let mut graph_update = TxGraph::default(); let mut last_active_indices = BTreeMap::::new(); - for (keychain, keychain_spks) in request.spks_by_keychain { - let (tx_graph, last_active_index) = self - .fetch_txs_with_keychain_spks(keychain_spks, stop_gap, parallel_requests) - .await?; + for keychain in keychains { + let keychain_spks = request.iter_spks(keychain.clone()); + let (tx_graph, last_active_index) = + fetch_txs_with_keychain_spks(self, keychain_spks, stop_gap, parallel_requests) + .await?; let _ = graph_update.apply_update(tx_graph); if let Some(last_active_index) = last_active_index { last_active_indices.insert(keychain, last_active_index); } } - let chain_update = chain_update( - self, - &latest_blocks, - &request.chain_tip, - graph_update.all_anchors(), - ) - .await?; + + let chain_update = match (chain_tip, latest_blocks) { + (Some(chain_tip), Some(latest_blocks)) => Some( + chain_update(self, &latest_blocks, &chain_tip, graph_update.all_anchors()).await?, + ), + _ => None, + }; + Ok(FullScanResult { chain_update, graph_update, @@ -153,231 +99,42 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { }) } - async fn sync( + async fn sync> + Send>( &self, - request: SyncRequest, + request: R, parallel_requests: usize, ) -> Result { - let latest_blocks = fetch_latest_blocks(self).await?; - let mut graph_update = TxGraph::default(); - let _ = graph_update.apply_update( - self.fetch_txs_with_spks(request.spks, parallel_requests) - .await?, - ); + let mut request = request.into(); + + let chain_tip = request.chain_tip(); + let latest_blocks = if chain_tip.is_some() { + Some(fetch_latest_blocks(self).await?) + } else { + None + }; + + let mut graph_update = TxGraph::::default(); + let _ = graph_update + .apply_update(fetch_txs_with_spks(self, request.iter_spks(), parallel_requests).await?); let _ = graph_update.apply_update( - self.fetch_txs_with_txids(request.txids, parallel_requests) - .await?, + fetch_txs_with_txids(self, request.iter_txids(), parallel_requests).await?, ); let _ = graph_update.apply_update( - self.fetch_txs_with_outpoints(request.outpoints, parallel_requests) - .await?, + fetch_txs_with_outpoints(self, request.iter_outpoints(), parallel_requests).await?, ); - let chain_update = chain_update( - self, - &latest_blocks, - &request.chain_tip, - graph_update.all_anchors(), - ) - .await?; + + let chain_update = match (chain_tip, latest_blocks) { + (Some(chain_tip), Some(latest_blocks)) => Some( + chain_update(self, &latest_blocks, &chain_tip, graph_update.all_anchors()).await?, + ), + _ => None, + }; + Ok(SyncResult { chain_update, graph_update, }) } - - async fn fetch_txs_with_keychain_spks> + Send>( - &self, - mut keychain_spks: I, - stop_gap: usize, - parallel_requests: usize, - ) -> Result<(TxGraph, Option), Error> { - type TxsOfSpkIndex = (u32, Vec); - - let mut tx_graph = TxGraph::default(); - let mut last_index = Option::::None; - let mut last_active_index = Option::::None; - - loop { - let handles = keychain_spks - .by_ref() - .take(parallel_requests) - .map(|(spk_index, spk)| { - let client = self.clone(); - async move { - let mut last_seen = None; - let mut spk_txs = Vec::new(); - loop { - let txs = client.scripthash_txs(&spk, last_seen).await?; - let tx_count = txs.len(); - last_seen = txs.last().map(|tx| tx.txid); - spk_txs.extend(txs); - if tx_count < 25 { - break Result::<_, Error>::Ok((spk_index, spk_txs)); - } - } - } - }) - .collect::>(); - - if handles.is_empty() { - break; - } - - for (index, txs) in handles.try_collect::>().await? { - last_index = Some(index); - if !txs.is_empty() { - last_active_index = Some(index); - } - for tx in txs { - let _ = tx_graph.insert_tx(tx.to_tx()); - insert_anchor_from_status(&mut tx_graph, tx.txid, tx.status); - insert_prevouts(&mut tx_graph, tx.vin); - } - } - - let last_index = last_index.expect("Must be set since handles wasn't empty."); - let gap_limit_reached = if let Some(i) = last_active_index { - last_index >= i.saturating_add(stop_gap as u32) - } else { - last_index + 1 >= stop_gap as u32 - }; - if gap_limit_reached { - break; - } - } - - Ok((tx_graph, last_active_index)) - } - - async fn fetch_txs_with_spks + Send>( - &self, - spks: I, - parallel_requests: usize, - ) -> Result, Error> - where - I::IntoIter: ExactSizeIterator + Send, - { - self.fetch_txs_with_keychain_spks( - spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)), - usize::MAX, - parallel_requests, - ) - .await - .map(|(tx_graph, _)| tx_graph) - } - - async fn fetch_txs_with_txids + Send>( - &self, - txids: I, - parallel_requests: usize, - ) -> Result, Error> - where - I::IntoIter: ExactSizeIterator + Send, - { - enum EsploraResp { - TxStatus(TxStatus), - Tx(Option), - } - - let mut tx_graph = TxGraph::default(); - let mut txids = txids.into_iter(); - loop { - let handles = txids - .by_ref() - .take(parallel_requests) - .map(|txid| { - let client = self.clone(); - let tx_already_exists = tx_graph.get_tx(txid).is_some(); - async move { - if tx_already_exists { - client - .get_tx_status(&txid) - .await - .map(|s| (txid, EsploraResp::TxStatus(s))) - } else { - client - .get_tx_info(&txid) - .await - .map(|t| (txid, EsploraResp::Tx(t))) - } - } - }) - .collect::>(); - - if handles.is_empty() { - break; - } - - for (txid, resp) in handles.try_collect::>().await? { - match resp { - EsploraResp::TxStatus(status) => { - insert_anchor_from_status(&mut tx_graph, txid, status); - } - EsploraResp::Tx(Some(tx_info)) => { - let _ = tx_graph.insert_tx(tx_info.to_tx()); - insert_anchor_from_status(&mut tx_graph, txid, tx_info.status); - insert_prevouts(&mut tx_graph, tx_info.vin); - } - _ => continue, - } - } - } - Ok(tx_graph) - } - - async fn fetch_txs_with_outpoints + Send>( - &self, - outpoints: I, - parallel_requests: usize, - ) -> Result, Error> - where - I::IntoIter: ExactSizeIterator + Send, - { - let outpoints = outpoints.into_iter().collect::>(); - - // make sure txs exists in graph and tx statuses are updated - // TODO: We should maintain a tx cache (like we do with Electrum). - let mut tx_graph = self - .fetch_txs_with_txids(outpoints.iter().map(|op| op.txid), parallel_requests) - .await?; - - // get outpoint spend-statuses - let mut outpoints = outpoints.into_iter(); - let mut missing_txs = Vec::::with_capacity(outpoints.len()); - loop { - let handles = outpoints - .by_ref() - .take(parallel_requests) - .map(|op| { - let client = self.clone(); - async move { client.get_output_status(&op.txid, op.vout as _).await } - }) - .collect::>(); - - if handles.is_empty() { - break; - } - - for op_status in handles.try_collect::>().await?.into_iter().flatten() { - let spend_txid = match op_status.txid { - Some(txid) => txid, - None => continue, - }; - if tx_graph.get_tx(spend_txid).is_none() { - missing_txs.push(spend_txid); - } - if let Some(spend_status) = op_status.status { - insert_anchor_from_status(&mut tx_graph, spend_txid, spend_status); - } - } - } - - let _ = tx_graph.apply_update( - self.fetch_txs_with_txids(missing_txs, parallel_requests) - .await?, - ); - Ok(tx_graph) - } } /// Fetch latest blocks from Esplora in an atomic call. @@ -480,6 +237,235 @@ async fn chain_update( Ok(tip) } +/// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning +/// `keychain_spks` against Esplora. +/// +/// `keychain_spks` is an *unbounded* indexed-[`ScriptBuf`] iterator that represents scripts +/// derived from a keychain. The scanning logic stops after a `stop_gap` number of consecutive +/// scripts with no transaction history is reached. `parallel_requests` specifies the maximum +/// number of HTTP requests to make in parallel. +/// +/// A [`TxGraph`] (containing the fetched transactions and anchors) and the last active +/// keychain index (if any) is returned. The last active keychain index is the keychain's last +/// script pubkey that contains a non-empty transaction history. +/// +/// Refer to [crate-level docs](crate) for more. +async fn fetch_txs_with_keychain_spks> + Send>( + client: &esplora_client::AsyncClient, + mut keychain_spks: I, + stop_gap: usize, + parallel_requests: usize, +) -> Result<(TxGraph, Option), Error> { + type TxsOfSpkIndex = (u32, Vec); + + let mut tx_graph = TxGraph::default(); + let mut last_index = Option::::None; + let mut last_active_index = Option::::None; + + loop { + let handles = keychain_spks + .by_ref() + .take(parallel_requests) + .map(|(spk_index, spk)| { + let client = client.clone(); + async move { + let mut last_seen = None; + let mut spk_txs = Vec::new(); + loop { + let txs = client.scripthash_txs(&spk, last_seen).await?; + let tx_count = txs.len(); + last_seen = txs.last().map(|tx| tx.txid); + spk_txs.extend(txs); + if tx_count < 25 { + break Result::<_, Error>::Ok((spk_index, spk_txs)); + } + } + } + }) + .collect::>(); + + if handles.is_empty() { + break; + } + + for (index, txs) in handles.try_collect::>().await? { + last_index = Some(index); + if !txs.is_empty() { + last_active_index = Some(index); + } + for tx in txs { + let _ = tx_graph.insert_tx(tx.to_tx()); + insert_anchor_from_status(&mut tx_graph, tx.txid, tx.status); + insert_prevouts(&mut tx_graph, tx.vin); + } + } + + let last_index = last_index.expect("Must be set since handles wasn't empty."); + let gap_limit_reached = if let Some(i) = last_active_index { + last_index >= i.saturating_add(stop_gap as u32) + } else { + last_index + 1 >= stop_gap as u32 + }; + if gap_limit_reached { + break; + } + } + + Ok((tx_graph, last_active_index)) +} + +/// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `spks` +/// against Esplora. +/// +/// Unlike with [`EsploraAsyncExt::fetch_txs_with_keychain_spks`], `spks` must be *bounded* as +/// all contained scripts will be scanned. `parallel_requests` specifies the maximum number of +/// HTTP requests to make in parallel. +/// +/// Refer to [crate-level docs](crate) for more. +async fn fetch_txs_with_spks + Send>( + client: &esplora_client::AsyncClient, + spks: I, + parallel_requests: usize, +) -> Result, Error> +where + I::IntoIter: Send, +{ + fetch_txs_with_keychain_spks( + client, + spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)), + usize::MAX, + parallel_requests, + ) + .await + .map(|(tx_graph, _)| tx_graph) +} + +/// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `txids` +/// against Esplora. +/// +/// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel. +/// +/// Refer to [crate-level docs](crate) for more. +async fn fetch_txs_with_txids + Send>( + client: &esplora_client::AsyncClient, + txids: I, + parallel_requests: usize, +) -> Result, Error> +where + I::IntoIter: Send, +{ + enum EsploraResp { + TxStatus(TxStatus), + Tx(Option), + } + + let mut tx_graph = TxGraph::default(); + let mut txids = txids.into_iter(); + loop { + let handles = txids + .by_ref() + .take(parallel_requests) + .map(|txid| { + let client = client.clone(); + let tx_already_exists = tx_graph.get_tx(txid).is_some(); + async move { + if tx_already_exists { + client + .get_tx_status(&txid) + .await + .map(|s| (txid, EsploraResp::TxStatus(s))) + } else { + client + .get_tx_info(&txid) + .await + .map(|t| (txid, EsploraResp::Tx(t))) + } + } + }) + .collect::>(); + + if handles.is_empty() { + break; + } + + for (txid, resp) in handles.try_collect::>().await? { + match resp { + EsploraResp::TxStatus(status) => { + insert_anchor_from_status(&mut tx_graph, txid, status); + } + EsploraResp::Tx(Some(tx_info)) => { + let _ = tx_graph.insert_tx(tx_info.to_tx()); + insert_anchor_from_status(&mut tx_graph, txid, tx_info.status); + insert_prevouts(&mut tx_graph, tx_info.vin); + } + _ => continue, + } + } + } + Ok(tx_graph) +} + +/// Fetch transactions and [`ConfirmationBlockTime`]s that contain and spend the provided +/// `outpoints`. +/// +/// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel. +/// +/// Refer to [crate-level docs](crate) for more. +async fn fetch_txs_with_outpoints + Send>( + client: &esplora_client::AsyncClient, + outpoints: I, + parallel_requests: usize, +) -> Result, Error> +where + I::IntoIter: Send, +{ + let outpoints = outpoints.into_iter().collect::>(); + + // make sure txs exists in graph and tx statuses are updated + // TODO: We should maintain a tx cache (like we do with Electrum). + let mut tx_graph = fetch_txs_with_txids( + client, + outpoints.iter().copied().map(|op| op.txid), + parallel_requests, + ) + .await?; + + // get outpoint spend-statuses + let mut outpoints = outpoints.into_iter(); + let mut missing_txs = Vec::::with_capacity(outpoints.len()); + loop { + let handles = outpoints + .by_ref() + .take(parallel_requests) + .map(|op| { + let client = client.clone(); + async move { client.get_output_status(&op.txid, op.vout as _).await } + }) + .collect::>(); + + if handles.is_empty() { + break; + } + + for op_status in handles.try_collect::>().await?.into_iter().flatten() { + let spend_txid = match op_status.txid { + Some(txid) => txid, + None => continue, + }; + if tx_graph.get_tx(spend_txid).is_none() { + missing_txs.push(spend_txid); + } + if let Some(spend_status) = op_status.status { + insert_anchor_from_status(&mut tx_graph, spend_txid, spend_status); + } + } + } + + let _ = + tx_graph.apply_update(fetch_txs_with_txids(client, missing_txs, parallel_requests).await?); + Ok(tx_graph) +} + #[cfg(test)] mod test { use std::{collections::BTreeSet, time::Duration}; diff --git a/crates/esplora/src/blocking_ext.rs b/crates/esplora/src/blocking_ext.rs index 81ce76848..6e3e25afe 100644 --- a/crates/esplora/src/blocking_ext.rs +++ b/crates/esplora/src/blocking_ext.rs @@ -29,9 +29,9 @@ pub trait EsploraExt { /// the maximum number of HTTP requests to make in parallel. /// /// Refer to [crate-level docs](crate) for more. - fn full_scan( + fn full_scan>>( &self, - request: FullScanRequest, + request: R, stop_gap: usize, parallel_requests: usize, ) -> Result, Error>; @@ -43,97 +43,51 @@ pub trait EsploraExt { /// in parallel. /// /// Refer to [crate-level docs](crate) for more. - fn sync(&self, request: SyncRequest, parallel_requests: usize) -> Result; - - /// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning - /// `keychain_spks` against Esplora. - /// - /// `keychain_spks` is an *unbounded* indexed-[`ScriptBuf`] iterator that represents scripts - /// derived from a keychain. The scanning logic stops after a `stop_gap` number of consecutive - /// scripts with no transaction history is reached. `parallel_requests` specifies the maximum - /// number of HTTP requests to make in parallel. - /// - /// A [`TxGraph`] (containing the fetched transactions and anchors) and the last active keychain - /// index (if any) is returned. The last active keychain index is the keychain's last script - /// pubkey that contains a non-empty transaction history. - /// - /// Refer to [crate-level docs](crate) for more. - fn fetch_txs_with_keychain_spks>>( - &self, - keychain_spks: I, - stop_gap: usize, - parallel_requests: usize, - ) -> Result<(TxGraph, Option), Error>; - - /// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `spks` - /// against Esplora. - /// - /// Unlike with [`EsploraExt::fetch_txs_with_keychain_spks`], `spks` must be *bounded* as all - /// contained scripts will be scanned. `parallel_requests` specifies the maximum number of HTTP - /// requests to make in parallel. - /// - /// Refer to [crate-level docs](crate) for more. - fn fetch_txs_with_spks>( + fn sync>>( &self, - spks: I, + request: R, parallel_requests: usize, - ) -> Result, Error> - where - I::IntoIter: ExactSizeIterator; - - /// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `txids` - /// against Esplora. - /// - /// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel. - /// - /// Refer to [crate-level docs](crate) for more. - fn fetch_txs_with_txids>( - &self, - txids: I, - parallel_requests: usize, - ) -> Result, Error> - where - I::IntoIter: ExactSizeIterator; - - /// Fetch transactions and [`ConfirmationBlockTime`]s that contain and spend the provided - /// `outpoints`. - /// - /// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel. - /// - /// Refer to [crate-level docs](crate) for more. - fn fetch_txs_with_outpoints>( - &self, - outpoints: I, - parallel_requests: usize, - ) -> Result, Error> - where - I::IntoIter: ExactSizeIterator; + ) -> Result; } impl EsploraExt for esplora_client::BlockingClient { - fn full_scan( + fn full_scan>>( &self, - request: FullScanRequest, + request: R, stop_gap: usize, parallel_requests: usize, ) -> Result, Error> { - let latest_blocks = fetch_latest_blocks(self)?; + let mut request = request.into(); + + let chain_tip = request.chain_tip(); + let latest_blocks = if chain_tip.is_some() { + Some(fetch_latest_blocks(self)?) + } else { + None + }; + let mut graph_update = TxGraph::default(); let mut last_active_indices = BTreeMap::::new(); - for (keychain, keychain_spks) in request.spks_by_keychain { + for keychain in request.keychains() { + let keychain_spks = request.iter_spks(keychain.clone()); let (tx_graph, last_active_index) = - self.fetch_txs_with_keychain_spks(keychain_spks, stop_gap, parallel_requests)?; + fetch_txs_with_keychain_spks(self, keychain_spks, stop_gap, parallel_requests)?; let _ = graph_update.apply_update(tx_graph); if let Some(last_active_index) = last_active_index { last_active_indices.insert(keychain, last_active_index); } } - let chain_update = chain_update( - self, - &latest_blocks, - &request.chain_tip, - graph_update.all_anchors(), - )?; + + let chain_update = match (chain_tip, latest_blocks) { + (Some(chain_tip), Some(latest_blocks)) => Some(chain_update( + self, + &latest_blocks, + &chain_tip, + graph_update.all_anchors(), + )?), + _ => None, + }; + Ok(FullScanResult { chain_update, graph_update, @@ -141,224 +95,51 @@ impl EsploraExt for esplora_client::BlockingClient { }) } - fn sync(&self, request: SyncRequest, parallel_requests: usize) -> Result { - let latest_blocks = fetch_latest_blocks(self)?; - let mut graph_update = TxGraph::default(); - let _ = - graph_update.apply_update(self.fetch_txs_with_spks(request.spks, parallel_requests)?); - let _ = - graph_update.apply_update(self.fetch_txs_with_txids(request.txids, parallel_requests)?); - let _ = graph_update - .apply_update(self.fetch_txs_with_outpoints(request.outpoints, parallel_requests)?); - let chain_update = chain_update( - self, - &latest_blocks, - &request.chain_tip, - graph_update.all_anchors(), - )?; - Ok(SyncResult { - chain_update, - graph_update, - }) - } - - fn fetch_txs_with_keychain_spks>>( + fn sync>>( &self, - mut keychain_spks: I, - stop_gap: usize, + request: R, parallel_requests: usize, - ) -> Result<(TxGraph, Option), Error> { - type TxsOfSpkIndex = (u32, Vec); - - let mut tx_graph = TxGraph::default(); - let mut last_index = Option::::None; - let mut last_active_index = Option::::None; - - loop { - let handles = keychain_spks - .by_ref() - .take(parallel_requests) - .map(|(spk_index, spk)| { - std::thread::spawn({ - let client = self.clone(); - move || -> Result { - let mut last_seen = None; - let mut spk_txs = Vec::new(); - loop { - let txs = client.scripthash_txs(&spk, last_seen)?; - let tx_count = txs.len(); - last_seen = txs.last().map(|tx| tx.txid); - spk_txs.extend(txs); - if tx_count < 25 { - break Ok((spk_index, spk_txs)); - } - } - } - }) - }) - .collect::>>>(); - - if handles.is_empty() { - break; - } + ) -> Result { + let mut request: SyncRequest = request.into(); - for handle in handles { - let (index, txs) = handle.join().expect("thread must not panic")?; - last_index = Some(index); - if !txs.is_empty() { - last_active_index = Some(index); - } - for tx in txs { - let _ = tx_graph.insert_tx(tx.to_tx()); - insert_anchor_from_status(&mut tx_graph, tx.txid, tx.status); - insert_prevouts(&mut tx_graph, tx.vin); - } - } - - let last_index = last_index.expect("Must be set since handles wasn't empty."); - let gap_limit_reached = if let Some(i) = last_active_index { - last_index >= i.saturating_add(stop_gap as u32) - } else { - last_index + 1 >= stop_gap as u32 - }; - if gap_limit_reached { - break; - } - } - - Ok((tx_graph, last_active_index)) - } + let chain_tip = request.chain_tip(); + let latest_blocks = if chain_tip.is_some() { + Some(fetch_latest_blocks(self)?) + } else { + None + }; - fn fetch_txs_with_spks>( - &self, - spks: I, - parallel_requests: usize, - ) -> Result, Error> - where - I::IntoIter: ExactSizeIterator, - { - self.fetch_txs_with_keychain_spks( - spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)), - usize::MAX, + let mut graph_update = TxGraph::default(); + let _ = graph_update.apply_update(fetch_txs_with_spks( + self, + request.iter_spks(), parallel_requests, - ) - .map(|(tx_graph, _)| tx_graph) - } - - fn fetch_txs_with_txids>( - &self, - txids: I, - parallel_requests: usize, - ) -> Result, Error> - where - I::IntoIter: ExactSizeIterator, - { - enum EsploraResp { - TxStatus(TxStatus), - Tx(Option), - } - - let mut tx_graph = TxGraph::default(); - let mut txids = txids.into_iter(); - loop { - let handles = txids - .by_ref() - .take(parallel_requests) - .map(|txid| { - let client = self.clone(); - let tx_already_exists = tx_graph.get_tx(txid).is_some(); - std::thread::spawn(move || { - if tx_already_exists { - client - .get_tx_status(&txid) - .map_err(Box::new) - .map(|s| (txid, EsploraResp::TxStatus(s))) - } else { - client - .get_tx_info(&txid) - .map_err(Box::new) - .map(|t| (txid, EsploraResp::Tx(t))) - } - }) - }) - .collect::>>>(); - - if handles.is_empty() { - break; - } - - for handle in handles { - let (txid, resp) = handle.join().expect("thread must not panic")?; - match resp { - EsploraResp::TxStatus(status) => { - insert_anchor_from_status(&mut tx_graph, txid, status); - } - EsploraResp::Tx(Some(tx_info)) => { - let _ = tx_graph.insert_tx(tx_info.to_tx()); - insert_anchor_from_status(&mut tx_graph, txid, tx_info.status); - insert_prevouts(&mut tx_graph, tx_info.vin); - } - _ => continue, - } - } - } - Ok(tx_graph) - } - - fn fetch_txs_with_outpoints>( - &self, - outpoints: I, - parallel_requests: usize, - ) -> Result, Error> - where - I::IntoIter: ExactSizeIterator, - { - let outpoints = outpoints.into_iter().collect::>(); - - // make sure txs exists in graph and tx statuses are updated - // TODO: We should maintain a tx cache (like we do with Electrum). - let mut tx_graph = - self.fetch_txs_with_txids(outpoints.iter().map(|op| op.txid), parallel_requests)?; - - // get outpoint spend-statuses - let mut outpoints = outpoints.into_iter(); - let mut missing_txs = Vec::::with_capacity(outpoints.len()); - loop { - let handles = outpoints - .by_ref() - .take(parallel_requests) - .map(|op| { - let client = self.clone(); - std::thread::spawn(move || { - client - .get_output_status(&op.txid, op.vout as _) - .map_err(Box::new) - }) - }) - .collect::, Error>>>>(); - - if handles.is_empty() { - break; - } - - for handle in handles { - if let Some(op_status) = handle.join().expect("thread must not panic")? { - let spend_txid = match op_status.txid { - Some(txid) => txid, - None => continue, - }; - if tx_graph.get_tx(spend_txid).is_none() { - missing_txs.push(spend_txid); - } - if let Some(spend_status) = op_status.status { - insert_anchor_from_status(&mut tx_graph, spend_txid, spend_status); - } - } - } - } + )?); + let _ = graph_update.apply_update(fetch_txs_with_txids( + self, + request.iter_txids(), + parallel_requests, + )?); + let _ = graph_update.apply_update(fetch_txs_with_outpoints( + self, + request.iter_outpoints(), + parallel_requests, + )?); + + let chain_update = match (chain_tip, latest_blocks) { + (Some(chain_tip), Some(latest_blocks)) => Some(chain_update( + self, + &latest_blocks, + &chain_tip, + graph_update.all_anchors(), + )?), + _ => None, + }; - let _ = tx_graph.apply_update(self.fetch_txs_with_txids(missing_txs, parallel_requests)?); - Ok(tx_graph) + Ok(SyncResult { + chain_update, + graph_update, + }) } } @@ -461,6 +242,224 @@ fn chain_update( Ok(tip) } +fn fetch_txs_with_keychain_spks>>( + client: &esplora_client::BlockingClient, + mut keychain_spks: I, + stop_gap: usize, + parallel_requests: usize, +) -> Result<(TxGraph, Option), Error> { + type TxsOfSpkIndex = (u32, Vec); + + let mut tx_graph = TxGraph::default(); + let mut last_index = Option::::None; + let mut last_active_index = Option::::None; + + loop { + let handles = keychain_spks + .by_ref() + .take(parallel_requests) + .map(|(spk_index, spk)| { + std::thread::spawn({ + let client = client.clone(); + move || -> Result { + let mut last_seen = None; + let mut spk_txs = Vec::new(); + loop { + let txs = client.scripthash_txs(&spk, last_seen)?; + let tx_count = txs.len(); + last_seen = txs.last().map(|tx| tx.txid); + spk_txs.extend(txs); + if tx_count < 25 { + break Ok((spk_index, spk_txs)); + } + } + } + }) + }) + .collect::>>>(); + + if handles.is_empty() { + break; + } + + for handle in handles { + let (index, txs) = handle.join().expect("thread must not panic")?; + last_index = Some(index); + if !txs.is_empty() { + last_active_index = Some(index); + } + for tx in txs { + let _ = tx_graph.insert_tx(tx.to_tx()); + insert_anchor_from_status(&mut tx_graph, tx.txid, tx.status); + insert_prevouts(&mut tx_graph, tx.vin); + } + } + + let last_index = last_index.expect("Must be set since handles wasn't empty."); + let gap_limit_reached = if let Some(i) = last_active_index { + last_index >= i.saturating_add(stop_gap as u32) + } else { + last_index + 1 >= stop_gap as u32 + }; + if gap_limit_reached { + break; + } + } + + Ok((tx_graph, last_active_index)) +} + +/// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `spks` +/// against Esplora. +/// +/// Unlike with [`EsploraExt::fetch_txs_with_keychain_spks`], `spks` must be *bounded* as all +/// contained scripts will be scanned. `parallel_requests` specifies the maximum number of HTTP +/// requests to make in parallel. +/// +/// Refer to [crate-level docs](crate) for more. +fn fetch_txs_with_spks>( + client: &esplora_client::BlockingClient, + spks: I, + parallel_requests: usize, +) -> Result, Error> { + fetch_txs_with_keychain_spks( + client, + spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)), + usize::MAX, + parallel_requests, + ) + .map(|(tx_graph, _)| tx_graph) +} + +/// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `txids` +/// against Esplora. +/// +/// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel. +/// +/// Refer to [crate-level docs](crate) for more. +fn fetch_txs_with_txids>( + client: &esplora_client::BlockingClient, + txids: I, + parallel_requests: usize, +) -> Result, Error> { + enum EsploraResp { + TxStatus(TxStatus), + Tx(Option), + } + + let mut tx_graph = TxGraph::default(); + let mut txids = txids.into_iter(); + loop { + let handles = txids + .by_ref() + .take(parallel_requests) + .map(|txid| { + let client = client.clone(); + let tx_already_exists = tx_graph.get_tx(txid).is_some(); + std::thread::spawn(move || { + if tx_already_exists { + client + .get_tx_status(&txid) + .map_err(Box::new) + .map(|s| (txid, EsploraResp::TxStatus(s))) + } else { + client + .get_tx_info(&txid) + .map_err(Box::new) + .map(|t| (txid, EsploraResp::Tx(t))) + } + }) + }) + .collect::>>>(); + + if handles.is_empty() { + break; + } + + for handle in handles { + let (txid, resp) = handle.join().expect("thread must not panic")?; + match resp { + EsploraResp::TxStatus(status) => { + insert_anchor_from_status(&mut tx_graph, txid, status); + } + EsploraResp::Tx(Some(tx_info)) => { + let _ = tx_graph.insert_tx(tx_info.to_tx()); + insert_anchor_from_status(&mut tx_graph, txid, tx_info.status); + insert_prevouts(&mut tx_graph, tx_info.vin); + } + _ => continue, + } + } + } + Ok(tx_graph) +} + +/// Fetch transactions and [`ConfirmationBlockTime`]s that contain and spend the provided +/// `outpoints`. +/// +/// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel. +/// +/// Refer to [crate-level docs](crate) for more. +fn fetch_txs_with_outpoints>( + client: &esplora_client::BlockingClient, + outpoints: I, + parallel_requests: usize, +) -> Result, Error> { + let outpoints = outpoints.into_iter().collect::>(); + + // make sure txs exists in graph and tx statuses are updated + // TODO: We should maintain a tx cache (like we do with Electrum). + let mut tx_graph = fetch_txs_with_txids( + client, + outpoints.iter().map(|op| op.txid), + parallel_requests, + )?; + + // get outpoint spend-statuses + let mut outpoints = outpoints.into_iter(); + let mut missing_txs = Vec::::with_capacity(outpoints.len()); + loop { + let handles = outpoints + .by_ref() + .take(parallel_requests) + .map(|op| { + let client = client.clone(); + std::thread::spawn(move || { + client + .get_output_status(&op.txid, op.vout as _) + .map_err(Box::new) + }) + }) + .collect::, Error>>>>(); + + if handles.is_empty() { + break; + } + + for handle in handles { + if let Some(op_status) = handle.join().expect("thread must not panic")? { + let spend_txid = match op_status.txid { + Some(txid) => txid, + None => continue, + }; + if tx_graph.get_tx(spend_txid).is_none() { + missing_txs.push(spend_txid); + } + if let Some(spend_status) = op_status.status { + insert_anchor_from_status(&mut tx_graph, spend_txid, spend_status); + } + } + } + } + + let _ = tx_graph.apply_update(fetch_txs_with_txids( + client, + missing_txs, + parallel_requests, + )?); + Ok(tx_graph) +} + #[cfg(test)] mod test { use crate::blocking_ext::{chain_update, fetch_latest_blocks}; diff --git a/crates/esplora/src/lib.rs b/crates/esplora/src/lib.rs index 9ce0c7be1..c74fe74a1 100644 --- a/crates/esplora/src/lib.rs +++ b/crates/esplora/src/lib.rs @@ -1,22 +1,8 @@ #![doc = include_str!("../README.md")] -//! # Low-Level Methods -//! -//! [`EsploraExt::sync`] and [`EsploraExt::full_scan`] returns updates which are *complete* and can -//! be used directly to determine confirmation statuses of each transaction. This is because a -//! [`LocalChain`] update is contained in the returned update structures. However, sometimes the -//! caller wishes to use a custom [`ChainOracle`] implementation (something other than -//! [`LocalChain`]). The following methods ONLY returns an update [`TxGraph`]: -//! -//! * [`EsploraExt::fetch_txs_with_keychain_spks`] -//! * [`EsploraExt::fetch_txs_with_spks`] -//! * [`EsploraExt::fetch_txs_with_txids`] -//! * [`EsploraExt::fetch_txs_with_outpoints`] -//! //! # Stop Gap //! -//! Methods [`EsploraExt::full_scan`] and [`EsploraExt::fetch_txs_with_keychain_spks`] takes in a -//! `stop_gap` input which is defined as the maximum number of consecutive unused script pubkeys to -//! scan transactions for before stopping. +//! [`EsploraExt::full_scan`] takes in a `stop_gap` input which is defined as the maximum number of +//! consecutive unused script pubkeys to scan transactions for before stopping. //! //! For example, with a `stop_gap` of 3, `full_scan` will keep scanning until it encounters 3 //! consecutive script pubkeys with no associated transactions. diff --git a/crates/esplora/tests/async_ext.rs b/crates/esplora/tests/async_ext.rs index 2258c9d60..70d464194 100644 --- a/crates/esplora/tests/async_ext.rs +++ b/crates/esplora/tests/async_ext.rs @@ -55,7 +55,9 @@ pub async fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { let cp_tip = env.make_checkpoint_tip(); let sync_update = { - let request = SyncRequest::from_chain_tip(cp_tip.clone()).set_spks(misc_spks); + let request = SyncRequest::builder() + .chain_tip(cp_tip.clone()) + .spks(misc_spks); client.sync(request, 1).await? }; @@ -160,15 +162,17 @@ pub async fn test_async_update_tx_graph_stop_gap() -> anyhow::Result<()> { // A scan with a gap limit of 3 won't find the transaction, but a scan with a gap limit of 4 // will. let full_scan_update = { - let request = - FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + let request = FullScanRequest::builder() + .chain_tip(cp_tip.clone()) + .spks_for_keychain(0, spks.clone()); client.full_scan(request, 3, 1).await? }; assert!(full_scan_update.graph_update.full_txs().next().is_none()); assert!(full_scan_update.last_active_indices.is_empty()); let full_scan_update = { - let request = - FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + let request = FullScanRequest::builder() + .chain_tip(cp_tip.clone()) + .spks_for_keychain(0, spks.clone()); client.full_scan(request, 4, 1).await? }; assert_eq!( @@ -201,8 +205,9 @@ pub async fn test_async_update_tx_graph_stop_gap() -> anyhow::Result<()> { // A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will. // The last active indice won't be updated in the first case but will in the second one. let full_scan_update = { - let request = - FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + let request = FullScanRequest::builder() + .chain_tip(cp_tip.clone()) + .spks_for_keychain(0, spks.clone()); client.full_scan(request, 5, 1).await? }; let txs: HashSet<_> = full_scan_update @@ -214,8 +219,9 @@ pub async fn test_async_update_tx_graph_stop_gap() -> anyhow::Result<()> { assert!(txs.contains(&txid_4th_addr)); assert_eq!(full_scan_update.last_active_indices[&0], 3); let full_scan_update = { - let request = - FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + let request = FullScanRequest::builder() + .chain_tip(cp_tip.clone()) + .spks_for_keychain(0, spks.clone()); client.full_scan(request, 6, 1).await? }; let txs: HashSet<_> = full_scan_update diff --git a/crates/esplora/tests/blocking_ext.rs b/crates/esplora/tests/blocking_ext.rs index 2e363f4e6..818f1f5fb 100644 --- a/crates/esplora/tests/blocking_ext.rs +++ b/crates/esplora/tests/blocking_ext.rs @@ -55,7 +55,9 @@ pub fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { let cp_tip = env.make_checkpoint_tip(); let sync_update = { - let request = SyncRequest::from_chain_tip(cp_tip.clone()).set_spks(misc_spks); + let request = SyncRequest::builder() + .chain_tip(cp_tip.clone()) + .spks(misc_spks); client.sync(request, 1)? }; @@ -161,15 +163,17 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> { // A scan with a stop_gap of 3 won't find the transaction, but a scan with a gap limit of 4 // will. let full_scan_update = { - let request = - FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + let request = FullScanRequest::builder() + .chain_tip(cp_tip.clone()) + .spks_for_keychain(0, spks.clone()); client.full_scan(request, 3, 1)? }; assert!(full_scan_update.graph_update.full_txs().next().is_none()); assert!(full_scan_update.last_active_indices.is_empty()); let full_scan_update = { - let request = - FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + let request = FullScanRequest::builder() + .chain_tip(cp_tip.clone()) + .spks_for_keychain(0, spks.clone()); client.full_scan(request, 4, 1)? }; assert_eq!( @@ -202,8 +206,9 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> { // A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will. // The last active indice won't be updated in the first case but will in the second one. let full_scan_update = { - let request = - FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + let request = FullScanRequest::builder() + .chain_tip(cp_tip.clone()) + .spks_for_keychain(0, spks.clone()); client.full_scan(request, 5, 1)? }; let txs: HashSet<_> = full_scan_update @@ -215,8 +220,9 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> { assert!(txs.contains(&txid_4th_addr)); assert_eq!(full_scan_update.last_active_indices[&0], 3); let full_scan_update = { - let request = - FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + let request = FullScanRequest::builder() + .chain_tip(cp_tip.clone()) + .spks_for_keychain(0, spks.clone()); client.full_scan(request, 6, 1)? }; let txs: HashSet<_> = full_scan_update diff --git a/crates/wallet/src/wallet/mod.rs b/crates/wallet/src/wallet/mod.rs index 4303ad298..02936eaef 100644 --- a/crates/wallet/src/wallet/mod.rs +++ b/crates/wallet/src/wallet/mod.rs @@ -29,7 +29,10 @@ use bdk_chain::{ local_chain::{ self, ApplyHeaderError, CannotConnectError, CheckPoint, CheckPointIter, LocalChain, }, - spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult}, + spk_client::{ + FullScanRequest, FullScanRequestBuilder, FullScanResult, SyncRequest, SyncRequestBuilder, + SyncResult, + }, tx_graph::{CanonicalTx, TxGraph, TxNode}, BlockId, ChainPosition, ConfirmationBlockTime, ConfirmationTime, DescriptorExt, FullTxOut, Indexed, IndexedTxGraph, Merge, @@ -151,7 +154,7 @@ impl From> for Update { Self { last_active_indices: value.last_active_indices, graph: value.graph_update, - chain: Some(value.chain_update), + chain: value.chain_update, } } } @@ -161,7 +164,7 @@ impl From for Update { Self { last_active_indices: BTreeMap::new(), graph: value.graph_update, - chain: Some(value.chain_update), + chain: value.chain_update, } } } @@ -2437,9 +2440,10 @@ impl Wallet { /// This is the first step when performing a spk-based wallet partial sync, the returned /// [`SyncRequest`] collects all revealed script pubkeys from the wallet keychain needed to /// start a blockchain sync with a spk based blockchain client. - pub fn start_sync_with_revealed_spks(&self) -> SyncRequest { - SyncRequest::from_chain_tip(self.chain.tip()) - .populate_with_revealed_spks(&self.indexed_graph.index, ..) + pub fn start_sync_with_revealed_spks(&self) -> SyncRequestBuilder<(KeychainKind, u32)> { + SyncRequest::builder() + .chain_tip(self.chain.tip()) + .revealed_spks_from_indexer(&self.indexed_graph.index, ..) } /// Create a [`FullScanRequest] for this wallet. @@ -2450,8 +2454,10 @@ impl Wallet { /// /// This operation is generally only used when importing or restoring a previously used wallet /// in which the list of used scripts is not known. - pub fn start_full_scan(&self) -> FullScanRequest { - FullScanRequest::from_keychain_txout_index(self.chain.tip(), &self.indexed_graph.index) + pub fn start_full_scan(&self) -> FullScanRequestBuilder { + FullScanRequest::builder() + .chain_tip(self.chain.tip()) + .spks_from_indexer(&self.indexed_graph.index) } } diff --git a/example-crates/example_electrum/src/main.rs b/example-crates/example_electrum/src/main.rs index cda8c5526..bcb0b3ed5 100644 --- a/example-crates/example_electrum/src/main.rs +++ b/example-crates/example_electrum/src/main.rs @@ -1,7 +1,7 @@ use std::io::{self, Write}; use bdk_chain::{ - bitcoin::{Address, Network, Txid}, + bitcoin::Network, collections::BTreeSet, indexed_tx_graph, spk_client::{FullScanRequest, SyncRequest}, @@ -139,8 +139,9 @@ fn main() -> anyhow::Result<()> { let graph = &*graph.lock().unwrap(); let chain = &*chain.lock().unwrap(); - FullScanRequest::from_chain_tip(chain.tip()) - .set_spks_for_keychain( + FullScanRequest::builder() + .chain_tip(chain.tip()) + .spks_for_keychain( Keychain::External, graph .index @@ -148,7 +149,7 @@ fn main() -> anyhow::Result<()> { .into_iter() .flatten(), ) - .set_spks_for_keychain( + .spks_for_keychain( Keychain::Internal, graph .index @@ -156,7 +157,7 @@ fn main() -> anyhow::Result<()> { .into_iter() .flatten(), ) - .inspect_spks_for_all_keychains({ + .inspect({ let mut once = BTreeSet::new(); move |k, spk_i, _| { if once.insert(k) { @@ -199,99 +200,55 @@ fn main() -> anyhow::Result<()> { } let chain_tip = chain.tip(); - let mut request = SyncRequest::from_chain_tip(chain_tip.clone()); + let mut request = + SyncRequest::builder() + .chain_tip(chain_tip.clone()) + .inspect(|item, progress| { + let pc = (100 * progress.consumed()) as f32 / progress.total() as f32; + eprintln!("[ SCANNING {:03.0}% ] {}", pc, item); + }); if all_spks { - let all_spks = graph - .index - .revealed_spks(..) - .map(|(index, spk)| (index, spk.to_owned())) - .collect::>(); - request = request.chain_spks(all_spks.into_iter().map(|((k, spk_i), spk)| { - eprint!("Scanning {}: {}", k, spk_i); - spk - })); + request = request.spks_with_labels( + graph + .index + .revealed_spks(..) + .map(|(index, spk)| (index, spk.to_owned())), + ); } if unused_spks { - let unused_spks = graph - .index - .unused_spks() - .map(|(index, spk)| (index, spk.to_owned())) - .collect::>(); - request = - request.chain_spks(unused_spks.into_iter().map(move |((k, spk_i), spk)| { - eprint!( - "Checking if address {} {}:{} has been used", - Address::from_script(&spk, network).unwrap(), - k, - spk_i, - ); - spk - })); + request = request.spks_with_labels( + graph + .index + .unused_spks() + .map(|(index, spk)| (index, spk.to_owned())), + ); } if utxos { let init_outpoints = graph.index.outpoints(); - - let utxos = graph - .graph() - .filter_chain_unspents( - &*chain, - chain_tip.block_id(), - init_outpoints.iter().cloned(), - ) - .map(|(_, utxo)| utxo) - .collect::>(); - request = request.chain_outpoints(utxos.into_iter().map(|utxo| { - eprint!( - "Checking if outpoint {} (value: {}) has been spent", - utxo.outpoint, utxo.txout.value - ); - utxo.outpoint - })); + request = request.outpoints( + graph + .graph() + .filter_chain_unspents( + &*chain, + chain_tip.block_id(), + init_outpoints.iter().cloned(), + ) + .map(|(_, utxo)| utxo.outpoint), + ); }; if unconfirmed { - let unconfirmed_txids = graph - .graph() - .list_canonical_txs(&*chain, chain_tip.block_id()) - .filter(|canonical_tx| !canonical_tx.chain_position.is_confirmed()) - .map(|canonical_tx| canonical_tx.tx_node.txid) - .collect::>(); - - request = request.chain_txids( - unconfirmed_txids - .into_iter() - .inspect(|txid| eprint!("Checking if {} is confirmed yet", txid)), + request = request.txids( + graph + .graph() + .list_canonical_txs(&*chain, chain_tip.block_id()) + .filter(|canonical_tx| !canonical_tx.chain_position.is_confirmed()) + .map(|canonical_tx| canonical_tx.tx_node.txid), ); } - let total_spks = request.spks.len(); - let total_txids = request.txids.len(); - let total_ops = request.outpoints.len(); - request = request - .inspect_spks({ - let mut visited = 0; - move |_| { - visited += 1; - eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_spks as f32) - } - }) - .inspect_txids({ - let mut visited = 0; - move |_| { - visited += 1; - eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_txids as f32) - } - }) - .inspect_outpoints({ - let mut visited = 0; - move |_| { - visited += 1; - eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_ops as f32) - } - }); - let res = client .sync(request, scan_options.batch_size, false) .context("scanning the blockchain")?; @@ -313,7 +270,7 @@ fn main() -> anyhow::Result<()> { let mut chain = chain.lock().unwrap(); let mut graph = graph.lock().unwrap(); - let chain_changeset = chain.apply_update(chain_update)?; + let chain_changeset = chain.apply_update(chain_update.expect("request has chain tip"))?; let mut indexed_tx_graph_changeset = indexed_tx_graph::ChangeSet::::default(); diff --git a/example-crates/example_esplora/src/main.rs b/example-crates/example_esplora/src/main.rs index 608e58d11..0ea99c775 100644 --- a/example-crates/example_esplora/src/main.rs +++ b/example-crates/example_esplora/src/main.rs @@ -1,10 +1,11 @@ +use core::f32; use std::{ collections::BTreeSet, io::{self, Write}, }; use bdk_chain::{ - bitcoin::{Address, Network, Txid}, + bitcoin::Network, spk_client::{FullScanRequest, SyncRequest}, Merge, }; @@ -144,8 +145,10 @@ fn main() -> anyhow::Result<()> { let request = { let chain_tip = chain.lock().expect("mutex must not be poisoned").tip(); let indexed_graph = &*graph.lock().expect("mutex must not be poisoned"); - FullScanRequest::from_keychain_txout_index(chain_tip, &indexed_graph.index) - .inspect_spks_for_all_keychains({ + FullScanRequest::builder() + .chain_tip(chain_tip) + .spks_from_indexer(&indexed_graph.index) + .inspect({ let mut once = BTreeSet::::new(); move |keychain, spk_i, _| { if once.insert(keychain) { @@ -156,6 +159,7 @@ fn main() -> anyhow::Result<()> { let _ = io::stderr().flush(); } }) + .build() }; // The client scans keychain spks for transaction histories, stopping after `stop_gap` @@ -176,14 +180,17 @@ fn main() -> anyhow::Result<()> { // deriviation indices. Usually before a scan you are on a fresh wallet with no // addresses derived so we need to derive up to last active addresses the scan found // before adding the transactions. - (chain.apply_update(update.chain_update)?, { - let index_changeset = graph - .index - .reveal_to_target_multi(&update.last_active_indices); - let mut indexed_tx_graph_changeset = graph.apply_update(update.graph_update); - indexed_tx_graph_changeset.merge(index_changeset.into()); - indexed_tx_graph_changeset - }) + ( + chain.apply_update(update.chain_update.expect("request included chain tip"))?, + { + let index_changeset = graph + .index + .reveal_to_target_multi(&update.last_active_indices); + let mut indexed_tx_graph_changeset = graph.apply_update(update.graph_update); + indexed_tx_graph_changeset.merge(index_changeset.into()); + indexed_tx_graph_changeset + }, + ) } EsploraCommands::Sync { mut unused_spks, @@ -206,7 +213,15 @@ fn main() -> anyhow::Result<()> { let local_tip = chain.lock().expect("mutex must not be poisoned").tip(); // Spks, outpoints and txids we want updates on will be accumulated here. - let mut request = SyncRequest::from_chain_tip(local_tip.clone()); + let mut request = + SyncRequest::builder() + .chain_tip(local_tip.clone()) + .inspect(|item, progress| { + let pc = (100 * progress.consumed()) as f32 / progress.total() as f32; + eprintln!("[ SCANNING {:03.0}% ] {}", pc, item); + // Flush early to ensure we print at every iteration. + let _ = io::stderr().flush(); + }); // Get a short lock on the structures to get spks, utxos, and txs that we are interested // in. @@ -215,108 +230,51 @@ fn main() -> anyhow::Result<()> { let chain = chain.lock().unwrap(); if *all_spks { - let all_spks = graph - .index - .revealed_spks(..) - .map(|((k, i), spk)| (k, i, spk.to_owned())) - .collect::>(); - request = request.chain_spks(all_spks.into_iter().map(|(k, i, spk)| { - eprint!("scanning {}:{}", k, i); - // Flush early to ensure we print at every iteration. - let _ = io::stderr().flush(); - spk - })); + request = request.spks_with_labels( + graph + .index + .revealed_spks(..) + .map(|(i, spk)| (i, spk.to_owned())), + ); } if unused_spks { - let unused_spks = graph - .index - .unused_spks() - .map(|(index, spk)| (index, spk.to_owned())) - .collect::>(); - request = - request.chain_spks(unused_spks.into_iter().map(move |((k, i), spk)| { - eprint!( - "Checking if address {} {}:{} has been used", - Address::from_script(&spk, network).unwrap(), - k, - i, - ); - // Flush early to ensure we print at every iteration. - let _ = io::stderr().flush(); - spk - })); + request = request.spks_with_labels( + graph + .index + .unused_spks() + .map(|(index, spk)| (index, spk.to_owned())), + ); } if utxos { // We want to search for whether the UTXO is spent, and spent by which // transaction. We provide the outpoint of the UTXO to // `EsploraExt::update_tx_graph_without_keychain`. let init_outpoints = graph.index.outpoints(); - let utxos = graph - .graph() - .filter_chain_unspents( - &*chain, - local_tip.block_id(), - init_outpoints.iter().cloned(), - ) - .map(|(_, utxo)| utxo) - .collect::>(); - request = request.chain_outpoints( - utxos - .into_iter() - .inspect(|utxo| { - eprint!( - "Checking if outpoint {} (value: {}) has been spent", - utxo.outpoint, utxo.txout.value - ); - // Flush early to ensure we print at every iteration. - let _ = io::stderr().flush(); - }) - .map(|utxo| utxo.outpoint), + request = request.outpoints( + graph + .graph() + .filter_chain_unspents( + &*chain, + local_tip.block_id(), + init_outpoints.iter().cloned(), + ) + .map(|(_, utxo)| utxo.outpoint), ); }; if unconfirmed { // We want to search for whether the unconfirmed transaction is now confirmed. // We provide the unconfirmed txids to // `EsploraExt::update_tx_graph_without_keychain`. - let unconfirmed_txids = graph - .graph() - .list_canonical_txs(&*chain, local_tip.block_id()) - .filter(|canonical_tx| !canonical_tx.chain_position.is_confirmed()) - .map(|canonical_tx| canonical_tx.tx_node.txid) - .collect::>(); - request = request.chain_txids(unconfirmed_txids.into_iter().inspect(|txid| { - eprint!("Checking if {} is confirmed yet", txid); - // Flush early to ensure we print at every iteration. - let _ = io::stderr().flush(); - })); + request = request.txids( + graph + .graph() + .list_canonical_txs(&*chain, local_tip.block_id()) + .filter(|canonical_tx| !canonical_tx.chain_position.is_confirmed()) + .map(|canonical_tx| canonical_tx.tx_node.txid), + ); } } - let total_spks = request.spks.len(); - let total_txids = request.txids.len(); - let total_ops = request.outpoints.len(); - request = request - .inspect_spks({ - let mut visited = 0; - move |_| { - visited += 1; - eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_spks as f32) - } - }) - .inspect_txids({ - let mut visited = 0; - move |_| { - visited += 1; - eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_txids as f32) - } - }) - .inspect_outpoints({ - let mut visited = 0; - move |_| { - visited += 1; - eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_ops as f32) - } - }); let mut update = client.sync(request, scan_options.parallel_requests)?; // Update last seen unconfirmed @@ -324,7 +282,10 @@ fn main() -> anyhow::Result<()> { let _ = update.graph_update.update_last_seen_unconfirmed(now); ( - chain.lock().unwrap().apply_update(update.chain_update)?, + chain + .lock() + .unwrap() + .apply_update(update.chain_update.expect("request has chain tip"))?, graph.lock().unwrap().apply_update(update.graph_update), ) } diff --git a/example-crates/wallet_electrum/src/main.rs b/example-crates/wallet_electrum/src/main.rs index b1e7655de..cda1889d0 100644 --- a/example-crates/wallet_electrum/src/main.rs +++ b/example-crates/wallet_electrum/src/main.rs @@ -52,19 +52,18 @@ fn main() -> Result<(), anyhow::Error> { // already have. client.populate_tx_cache(wallet.tx_graph()); - let request = wallet - .start_full_scan() - .inspect_spks_for_all_keychains({ - let mut once = HashSet::::new(); - move |k, spk_i, _| { - if once.insert(k) { - print!("\nScanning keychain [{:?}]", k) - } else { - print!(" {:<3}", spk_i) - } + let request = wallet.start_full_scan().inspect({ + let mut stdout = std::io::stdout(); + let mut once = HashSet::::new(); + move |k, spk_i, _| { + if once.insert(k) { + print!("\nScanning keychain [{:?}]", k) + } else { + print!(" {:<3}", spk_i) } - }) - .inspect_spks_for_all_keychains(|_, _, _| std::io::stdout().flush().expect("must flush")); + stdout.flush().expect("must flush"); + } + }); let mut update = client.full_scan(request, STOP_GAP, BATCH_SIZE, false)?; diff --git a/example-crates/wallet_esplora_async/src/main.rs b/example-crates/wallet_esplora_async/src/main.rs index 535abc6af..ae73e603c 100644 --- a/example-crates/wallet_esplora_async/src/main.rs +++ b/example-crates/wallet_esplora_async/src/main.rs @@ -45,14 +45,15 @@ async fn main() -> Result<(), anyhow::Error> { print!("Syncing..."); let client = esplora_client::Builder::new(ESPLORA_URL).build_async()?; - let request = wallet.start_full_scan().inspect_spks_for_all_keychains({ + let request = wallet.start_full_scan().inspect({ + let mut stdout = std::io::stdout(); let mut once = BTreeSet::::new(); move |keychain, spk_i, _| { if once.insert(keychain) { - print!("\nScanning keychain [{:?}] ", keychain); + print!("\nScanning keychain [{:?}]", keychain); } print!(" {:<3}", spk_i); - std::io::stdout().flush().expect("must flush") + stdout.flush().expect("must flush") } }); diff --git a/example-crates/wallet_esplora_blocking/src/main.rs b/example-crates/wallet_esplora_blocking/src/main.rs index 7e825150d..9bfed70ee 100644 --- a/example-crates/wallet_esplora_blocking/src/main.rs +++ b/example-crates/wallet_esplora_blocking/src/main.rs @@ -47,14 +47,15 @@ fn main() -> Result<(), anyhow::Error> { print!("Syncing..."); let client = esplora_client::Builder::new(ESPLORA_URL).build_blocking(); - let request = wallet.start_full_scan().inspect_spks_for_all_keychains({ + let request = wallet.start_full_scan().inspect({ + let mut stdout = std::io::stdout(); let mut once = BTreeSet::::new(); move |keychain, spk_i, _| { if once.insert(keychain) { print!("\nScanning keychain [{:?}] ", keychain); } print!(" {:<3}", spk_i); - std::io::stdout().flush().expect("must flush") + stdout.flush().expect("must flush") } });