Skip to content

Commit

Permalink
feat(esplora): update to use SyncRequest and FullScanRequest structures
Browse files Browse the repository at this point in the history
  • Loading branch information
notmandatory committed Mar 29, 2024
1 parent 5e169e1 commit 98508f8
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 198 deletions.
119 changes: 64 additions & 55 deletions crates/esplora/src/async_ext.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::collections::BTreeSet;
use std::fmt::Debug;

use async_trait::async_trait;
use bdk_chain::collections::btree_map;
use bdk_chain::spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult};
use bdk_chain::Anchor;
use bdk_chain::{
bitcoin::{BlockHash, OutPoint, ScriptBuf, TxOut, Txid},
Expand All @@ -12,7 +14,7 @@ use bdk_chain::{
use esplora_client::TxStatus;
use futures::{stream::FuturesOrdered, TryStreamExt};

use crate::{anchor_from_status, FullScanUpdate, SyncUpdate};
use crate::anchor_from_status;

/// [`esplora_client::Error`]
type Error = Box<esplora_client::Error>;
Expand All @@ -28,90 +30,95 @@ pub trait EsploraAsyncExt {
/// Scan keychain scripts for transactions against Esplora, returning an update that can be
/// applied to the receiving structures.
///
/// * `local_tip`: the previously seen tip from [`LocalChain::tip`].
/// * `keychain_spks`: keychains that we want to scan transactions for
///
/// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no
/// associated transactions. `parallel_requests` specifies the max number of HTTP requests to
/// make in parallel.
///
/// [`LocalChain::tip`]: local_chain::LocalChain::tip
async fn full_scan<K: Ord + Clone + Send>(
async fn full_scan<
K: Ord + Clone + Send + Debug + 'static,
I: Iterator<Item = (u32, ScriptBuf)> + Send,
>(
&self,
local_tip: CheckPoint,
keychain_spks: BTreeMap<
K,
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send,
>,
request: FullScanRequest<K, I>,
stop_gap: usize,
parallel_requests: usize,
) -> Result<FullScanUpdate<K>, Error>;
) -> Result<FullScanResult<K>, Error>;

/// Sync a set of scripts with the blockchain (via an Esplora client) for the data
/// specified and return a [`TxGraph`].
///
/// * `local_tip`: the previously seen tip from [`LocalChain::tip`].
/// * `misc_spks`: scripts that we want to sync transactions for
/// * `txids`: transactions for which we want updated [`ConfirmationTimeHeightAnchor`]s
/// * `outpoints`: transactions associated with these outpoints (residing, spending) that we
/// want to include in the update
///
/// If the scripts to sync are unknown, such as when restoring or importing a keychain that
/// may include scripts that have been used, use [`full_scan`] with the keychain.
///
/// [`LocalChain::tip`]: local_chain::LocalChain::tip
/// [`full_scan`]: EsploraAsyncExt::full_scan
async fn sync(
&self,
local_tip: CheckPoint,
misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = ScriptBuf> + Send> + Send,
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
request: SyncRequest,
parallel_requests: usize,
) -> Result<SyncUpdate, Error>;
) -> Result<SyncResult, Error>;
}

#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl EsploraAsyncExt for esplora_client::AsyncClient {
async fn full_scan<K: Ord + Clone + Send>(
async fn full_scan<
K: Ord + Clone + Send + Debug + 'static,
I: Iterator<Item = (u32, ScriptBuf)> + Send,
>(
&self,
local_tip: CheckPoint,
keychain_spks: BTreeMap<
K,
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send,
>,
mut request: FullScanRequest<K, I>,
stop_gap: usize,
parallel_requests: usize,
) -> Result<FullScanUpdate<K>, Error> {
let update_blocks = init_chain_update(self, &local_tip).await?;
let (tx_graph, last_active_indices) =
full_scan_for_index_and_graph(self, keychain_spks, stop_gap, parallel_requests).await?;
let local_chain =
finalize_chain_update(self, &local_tip, tx_graph.all_anchors(), update_blocks).await?;
Ok(FullScanUpdate {
local_chain,
tx_graph,
) -> Result<FullScanResult<K>, Error> {
let update_blocks = init_chain_update(self, &request.chain_tip).await?;
let (graph_update, last_active_indices) = full_scan_for_index_and_graph(
self,
request.take_spks_by_keychain(),
stop_gap,
parallel_requests,
)
.await?;
let chain_update = finalize_chain_update(
self,
&request.chain_tip,
graph_update.all_anchors(),
update_blocks,
)
.await?;

Ok(FullScanResult {
graph_update,
chain_update,
last_active_indices,
})
}

async fn sync(
&self,
local_tip: CheckPoint,
misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = ScriptBuf> + Send> + Send,
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
mut request: SyncRequest,
parallel_requests: usize,
) -> Result<SyncUpdate, Error> {
let update_blocks = init_chain_update(self, &local_tip).await?;
let tx_graph =
sync_for_index_and_graph(self, misc_spks, txids, outpoints, parallel_requests).await?;
let local_chain =
finalize_chain_update(self, &local_tip, tx_graph.all_anchors(), update_blocks).await?;
Ok(SyncUpdate {
tx_graph,
local_chain,
) -> Result<SyncResult, Error> {
let update_blocks = init_chain_update(self, &request.chain_tip).await?;
let graph_update = sync_for_index_and_graph(
self,
request.take_spks().map(|(_i, spk)| spk),
request.take_txids(),
request.take_outpoints(),
parallel_requests,
)
.await?;
let chain_update = finalize_chain_update(
self,
&request.chain_tip,
graph_update.all_anchors(),
update_blocks,
)
.await?;
Ok(SyncResult {
graph_update,
chain_update,
})
}
}
Expand Down Expand Up @@ -238,7 +245,7 @@ pub async fn full_scan_for_index_and_graph<K: Ord + Clone + Send>(
client: &esplora_client::AsyncClient,
keychain_spks: BTreeMap<
K,
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send,
Box<impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send>,
>,
stop_gap: usize,
parallel_requests: usize,
Expand Down Expand Up @@ -341,10 +348,12 @@ pub async fn sync_for_index_and_graph(
client,
[(
(),
misc_spks
.into_iter()
.enumerate()
.map(|(i, spk)| (i as u32, spk)),
Box::new(
misc_spks
.into_iter()
.enumerate()
.map(|(i, spk)| (i as u32, spk)),
),
)]
.into(),
usize::MAX,
Expand Down
109 changes: 49 additions & 60 deletions crates/esplora/src/blocking_ext.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::collections::BTreeSet;
use std::fmt::Debug;
use std::thread::JoinHandle;
use std::usize;

use bdk_chain::collections::btree_map;
use bdk_chain::collections::BTreeMap;
use bdk_chain::spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult};
use bdk_chain::Anchor;
use bdk_chain::{
bitcoin::{BlockHash, OutPoint, ScriptBuf, TxOut, Txid},
Expand All @@ -13,8 +15,6 @@ use bdk_chain::{
use esplora_client::TxStatus;

use crate::anchor_from_status;
use crate::FullScanUpdate;
use crate::SyncUpdate;

/// [`esplora_client::Error`]
pub type Error = Box<esplora_client::Error>;
Expand All @@ -28,99 +28,83 @@ pub trait EsploraExt {
/// Scan keychain scripts for transactions against Esplora, returning an update that can be
/// applied to the receiving structures.
///
/// * `local_tip`: the previously seen tip from [`LocalChain::tip`].
/// * `keychain_spks`: keychains that we want to scan transactions for
///
/// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no
/// associated transactions. `parallel_requests` specifies the max number of HTTP requests to
/// make in parallel.
///
/// [`LocalChain::tip`]: local_chain::LocalChain::tip
fn full_scan<K: Ord + Clone>(
fn full_scan<
K: Ord + Clone + Send + Debug + 'static,
I: Iterator<Item = (u32, ScriptBuf)> + Send,
>(
&self,
local_tip: CheckPoint,
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, ScriptBuf)>>,
request: FullScanRequest<K, I>,
stop_gap: usize,
parallel_requests: usize,
) -> Result<FullScanUpdate<K>, Error>;
) -> Result<FullScanResult<K>, Error>;

/// Sync a set of scripts with the blockchain (via an Esplora client) for the data
/// specified and return a [`TxGraph`].
///
/// * `local_tip`: the previously seen tip from [`LocalChain::tip`].
/// * `misc_spks`: scripts that we want to sync transactions for
/// * `txids`: transactions for which we want updated [`ConfirmationTimeHeightAnchor`]s
/// * `outpoints`: transactions associated with these outpoints (residing, spending) that we
/// want to include in the update
///
/// If the scripts to sync are unknown, such as when restoring or importing a keychain that
/// may include scripts that have been used, use [`full_scan`] with the keychain.
///
/// [`LocalChain::tip`]: local_chain::LocalChain::tip
/// [`full_scan`]: EsploraExt::full_scan
fn sync(
&self,
local_tip: CheckPoint,
misc_spks: impl IntoIterator<Item = ScriptBuf>,
txids: impl IntoIterator<Item = Txid>,
outpoints: impl IntoIterator<Item = OutPoint>,
parallel_requests: usize,
) -> Result<SyncUpdate, Error>;
fn sync(&self, request: SyncRequest, parallel_requests: usize) -> Result<SyncResult, Error>;
}

impl EsploraExt for esplora_client::BlockingClient {
fn full_scan<K: Ord + Clone>(
fn full_scan<
K: Ord + Clone + Send + Debug + 'static,
I: Iterator<Item = (u32, ScriptBuf)> + Send,
>(
&self,
local_tip: CheckPoint,
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, ScriptBuf)>>,
mut request: FullScanRequest<K, I>,
stop_gap: usize,
parallel_requests: usize,
) -> Result<FullScanUpdate<K>, Error> {
let update_blocks = init_chain_update_blocking(self, &local_tip)?;
let (tx_graph, last_active_indices) = full_scan_for_index_and_graph_blocking(
) -> Result<FullScanResult<K>, Error> {
let update_blocks = init_chain_update_blocking(self, &request.chain_tip)?;
let (graph_update, last_active_indices) = full_scan_for_index_and_graph_blocking(
self,
keychain_spks,
request.take_spks_by_keychain(),
stop_gap,
parallel_requests,
)?;
let local_chain = finalize_chain_update_blocking(
let chain_update = finalize_chain_update_blocking(
self,
&local_tip,
tx_graph.all_anchors(),
&request.chain_tip,
graph_update.all_anchors(),
update_blocks,
)?;
Ok(FullScanUpdate {
local_chain,
tx_graph,
Ok(FullScanResult {
graph_update,
chain_update,
last_active_indices,
})
}

fn sync(
&self,
local_tip: CheckPoint,
misc_spks: impl IntoIterator<Item = ScriptBuf>,
txids: impl IntoIterator<Item = Txid>,
outpoints: impl IntoIterator<Item = OutPoint>,
mut request: SyncRequest,
parallel_requests: usize,
) -> Result<SyncUpdate, Error> {
let update_blocks = init_chain_update_blocking(self, &local_tip)?;
let tx_graph = sync_for_index_and_graph_blocking(
) -> Result<SyncResult, Error> {
let update_blocks = init_chain_update_blocking(self, &request.chain_tip)?;
let graph_update = sync_for_index_and_graph_blocking(
self,
misc_spks,
txids,
outpoints,
request.take_spks().map(|(_i, spk)| spk),
request.take_txids(),
request.take_outpoints(),
parallel_requests,
)?;
let local_chain = finalize_chain_update_blocking(
let chain_update = finalize_chain_update_blocking(
self,
&local_tip,
tx_graph.all_anchors(),
&request.chain_tip,
graph_update.all_anchors(),
update_blocks,
)?;
Ok(SyncUpdate {
local_chain,
tx_graph,
Ok(SyncResult {
graph_update,
chain_update,
})
}
}
Expand Down Expand Up @@ -242,9 +226,12 @@ pub fn finalize_chain_update_blocking<A: Anchor>(
/// This performs a full scan to get an update for the [`TxGraph`] and
/// [`KeychainTxOutIndex`](bdk_chain::keychain::KeychainTxOutIndex).
#[doc(hidden)]
pub fn full_scan_for_index_and_graph_blocking<K: Ord + Clone>(
pub fn full_scan_for_index_and_graph_blocking<K: Ord + Clone + Send>(
client: &esplora_client::BlockingClient,
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, ScriptBuf)>>,
keychain_spks: BTreeMap<
K,
Box<impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send>,
>,
stop_gap: usize,
parallel_requests: usize,
) -> Result<(TxGraph<ConfirmationTimeHeightAnchor>, BTreeMap<K, u32>), Error> {
Expand Down Expand Up @@ -340,7 +327,7 @@ pub fn full_scan_for_index_and_graph_blocking<K: Ord + Clone>(
#[doc(hidden)]
pub fn sync_for_index_and_graph_blocking(
client: &esplora_client::BlockingClient,
misc_spks: impl IntoIterator<Item = ScriptBuf>,
misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = ScriptBuf> + Send> + Send,
txids: impl IntoIterator<Item = Txid>,
outpoints: impl IntoIterator<Item = OutPoint>,
parallel_requests: usize,
Expand All @@ -351,10 +338,12 @@ pub fn sync_for_index_and_graph_blocking(
let mut keychains = BTreeMap::new();
keychains.insert(
(),
misc_spks
.into_iter()
.enumerate()
.map(|(i, spk)| (i as u32, spk)),
Box::new(
misc_spks
.into_iter()
.enumerate()
.map(|(i, spk)| (i as u32, spk)),
),
);
keychains
},
Expand Down
Loading

0 comments on commit 98508f8

Please sign in to comment.