Skip to content

Commit

Permalink
feat(esplora): update to use SyncRequest and FullScanRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
notmandatory committed Feb 1, 2024
1 parent 4ce7846 commit cf73cab
Showing 1 changed file with 29 additions and 32 deletions.
61 changes: 29 additions & 32 deletions crates/esplora/src/async_ext.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use async_trait::async_trait;
use bdk_chain::collections::btree_map;
use bdk_chain::spk_client::{FullScanRequest, SyncRequest};
use bdk_chain::{
bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid},
bitcoin::{BlockHash, ScriptBuf, Txid},
collections::BTreeMap,
local_chain::{self, CheckPoint},
BlockId, ConfirmationTimeHeightAnchor, TxGraph,
Expand Down Expand Up @@ -43,17 +44,15 @@ pub trait EsploraAsyncExt {
) -> Result<local_chain::Update, Error>;

/// Full scan the keychain scripts specified with the blockchain (via an Esplora client) and
/// returns a [`TxGraph`] and a map of last active indices.
///
/// * `keychain_spks`: keychains that we want to scan transactions for
/// returns a [`TxGraph`] and a map of keychain last active indices.
///
/// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
/// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
/// parallel.
#[allow(clippy::result_large_err)]
async fn full_scan<K: Ord + Clone + Send>(
&self,
keychain_spks: BTreeMap<
request: FullScanRequest<
K,
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send,
>,
Expand All @@ -76,9 +75,7 @@ pub trait EsploraAsyncExt {
#[allow(clippy::result_large_err)]
async fn sync(
&self,
misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = ScriptBuf> + Send> + Send,
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
request: SyncRequest,
parallel_requests: usize,
) -> Result<TxGraph<ConfirmationTimeHeightAnchor>, Error>;
}
Expand Down Expand Up @@ -151,7 +148,7 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {

async fn full_scan<K: Ord + Clone + Send>(
&self,
keychain_spks: BTreeMap<
request: FullScanRequest<
K,
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send,
>,
Expand All @@ -161,13 +158,12 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
let parallel_requests = Ord::max(parallel_requests, 1);
let mut graph = TxGraph::<ConfirmationTimeHeightAnchor>::default();
let mut last_active_indexes = BTreeMap::<K, u32>::new();
let mut last_active_indices = BTreeMap::<K, u32>::new();

for (keychain, spks) in keychain_spks {
for (keychain, spks) in request.spks_by_keychain {
let mut spks = spks.into_iter();
let mut last_index = Option::<u32>::None;
let mut last_active_index = Option::<u32>::None;

loop {
let handles = spks
.by_ref()
Expand Down Expand Up @@ -219,45 +215,46 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
}

if let Some(last_active_index) = last_active_index {
last_active_indexes.insert(keychain, last_active_index);
last_active_indices.insert(keychain, last_active_index);
}
}

Ok((graph, last_active_indexes))
Ok((graph, last_active_indices))
}

async fn sync(
&self,
misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = ScriptBuf> + Send> + Send,
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
request: SyncRequest,
parallel_requests: usize,
) -> Result<TxGraph<ConfirmationTimeHeightAnchor>, Error> {
let full_scan_request = FullScanRequest {
spks_by_keychain: [(
0,
request
.spks
.into_iter()
.enumerate()
.map(|(i, spk)| (i as u32, spk)),
)]
.into(),
checkpoint: request.checkpoint,
};
let mut graph = self
.full_scan(
[(
(),
misc_spks
.into_iter()
.enumerate()
.map(|(i, spk)| (i as u32, spk)),
)]
.into(),
usize::MAX,
parallel_requests,
)
.full_scan(full_scan_request, usize::MAX, parallel_requests)
.await
.map(|(g, _)| g)?;

let mut txids = txids.into_iter();
let mut txids = request.txids.into_iter();
loop {
let handles = txids
.by_ref()
.take(parallel_requests)
.filter(|&txid| graph.get_tx(txid).is_none())
.map(|txid| {
let client = self.clone();
async move { client.get_tx_status(&txid).await.map(|s| (txid, s)) }
async move {
client.get_tx_status(&txid).await.map(|s| (txid, s))
}
})
.collect::<FuturesOrdered<_>>();

Expand All @@ -272,7 +269,7 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
}
}

for op in outpoints.into_iter() {
for op in request.outpoints.into_iter() {
if graph.get_tx(op.txid).is_none() {
if let Some(tx) = self.get_tx(&op.txid).await? {
let _ = graph.insert_tx(tx);
Expand Down

0 comments on commit cf73cab

Please sign in to comment.