Skip to content

Commit

Permalink
feat(esplora): update to use SyncRequest,SyncResult and FullScanReque…
Browse files Browse the repository at this point in the history
…st,FullScanResult
  • Loading branch information
notmandatory committed Feb 1, 2024
1 parent 0f00885 commit 0b7075f
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 136 deletions.
131 changes: 78 additions & 53 deletions crates/esplora/src/async_ext.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use async_trait::async_trait;
use bdk_chain::collections::btree_map;
use bdk_chain::spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult};
use bdk_chain::{
bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid},
bitcoin::{BlockHash, ScriptBuf, Txid},
collections::BTreeMap,
local_chain::{self, CheckPoint},
BlockId, ConfirmationTimeHeightAnchor, TxGraph,
Expand Down Expand Up @@ -45,22 +46,21 @@ pub trait EsploraAsyncExt {
) -> Result<local_chain::Update, Error>;

/// Full scan the keychain scripts specified with the blockchain (via an Esplora client) and
/// returns a [`TxGraph`] and a map of last active indices.
///
/// * `keychain_spks`: keychains that we want to scan transactions for
/// returns a [`TxGraph`] and a map of keychain last active indices.
///
/// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
/// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
/// parallel.
async fn full_scan<K: Ord + Clone + Send>(
async fn full_scan<'a, K: Ord + Clone + Send>(
&self,
keychain_spks: BTreeMap<
request: FullScanRequest<
'a,
K,
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send,
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send + Clone,
>,
stop_gap: usize,
parallel_requests: usize,
) -> Result<(TxGraph<ConfirmationTimeHeightAnchor>, BTreeMap<K, u32>), Error>;
) -> Result<FullScanResult<K>, Error>;

/// Sync a set of scripts with the blockchain (via an Esplora client) for the data
/// specified and return a [`TxGraph`].
Expand All @@ -74,13 +74,11 @@ pub trait EsploraAsyncExt {
/// may include scripts that have been used, use [`full_scan`] with the keychain.
///
/// [`full_scan`]: EsploraAsyncExt::full_scan
async fn sync(
async fn sync<'a>(
&self,
misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = ScriptBuf> + Send> + Send,
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
request: SyncRequest<'a>,
parallel_requests: usize,
) -> Result<TxGraph<ConfirmationTimeHeightAnchor>, Error>;
) -> Result<SyncResult, Error>;
}

#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
Expand Down Expand Up @@ -149,25 +147,25 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
})
}

async fn full_scan<K: Ord + Clone + Send>(
async fn full_scan<'a, K: Ord + Clone + Send>(
&self,
keychain_spks: BTreeMap<
request: FullScanRequest<
'a,
K,
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send,
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send + Clone,
>,
stop_gap: usize,
parallel_requests: usize,
) -> Result<(TxGraph<ConfirmationTimeHeightAnchor>, BTreeMap<K, u32>), Error> {
) -> Result<FullScanResult<K>, Error> {
type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
let parallel_requests = Ord::max(parallel_requests, 1);
let mut graph = TxGraph::<ConfirmationTimeHeightAnchor>::default();
let mut graph_update = TxGraph::<ConfirmationTimeHeightAnchor>::default();
let mut last_active_indexes = BTreeMap::<K, u32>::new();

for (keychain, spks) in keychain_spks {
for (keychain, spks) in request.spks_by_keychain {
let mut spks = spks.into_iter();
let mut last_index = Option::<u32>::None;
let mut last_active_index = Option::<u32>::None;

loop {
let handles = spks
.by_ref()
Expand Down Expand Up @@ -200,9 +198,9 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
last_active_index = Some(index);
}
for tx in txs {
let _ = graph.insert_tx(tx.to_tx());
let _ = graph_update.insert_tx(tx.to_tx());
if let Some(anchor) = anchor_from_status(&tx.status) {
let _ = graph.insert_anchor(tx.txid, anchor);
let _ = graph_update.insert_anchor(tx.txid, anchor);
}
}
}
Expand All @@ -223,38 +221,54 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
}
}

Ok((graph, last_active_indexes))
// from wallet transactions determine missing blockchain heights
let missing_heights = graph_update.missing_heights(request.local_chain);
// get blockchain update from original request checkpoint and missing heights
let chain_update = self
.update_local_chain(request.local_chain.tip(), missing_heights)
.await?;

Ok(FullScanResult {
graph_update,
chain_update,
last_active_indices: last_active_indexes,
})
}

async fn sync(
async fn sync<'a>(
&self,
misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = ScriptBuf> + Send> + Send,
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
request: SyncRequest<'a>,
parallel_requests: usize,
) -> Result<TxGraph<ConfirmationTimeHeightAnchor>, Error> {
let mut graph = self
.full_scan(
[(
(),
misc_spks
.into_iter()
.enumerate()
.map(|(i, spk)| (i as u32, spk)),
)]
.into(),
usize::MAX,
parallel_requests,
)
) -> Result<SyncResult, Error> {
let full_scan_request = FullScanRequest {
spks_by_keychain: [(
0,
request
.spks
.into_iter()
.enumerate()
.map(|(i, spk)| (i as u32, spk)),
)]
.into(),
local_chain: request.local_chain,
};
let mut graph_update = self
.full_scan(full_scan_request, usize::MAX, parallel_requests)
.await
.map(|(g, _)| g)?;
.map(
|FullScanResult {
graph_update,
chain_update: _,
last_active_indices: _,
}| graph_update,
)?;

let mut txids = txids.into_iter();
let mut txids = request.txids.into_iter();
loop {
let handles = txids
.by_ref()
.take(parallel_requests)
.filter(|&txid| graph.get_tx(txid).is_none())
.filter(|&txid| graph_update.get_tx(txid).is_none())
.map(|txid| {
let client = self.clone();
async move { client.get_tx_status(&txid).await.map(|s| (txid, s)) }
Expand All @@ -267,36 +281,47 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {

for (txid, status) in handles.try_collect::<Vec<(Txid, TxStatus)>>().await? {
if let Some(anchor) = anchor_from_status(&status) {
let _ = graph.insert_anchor(txid, anchor);
let _ = graph_update.insert_anchor(txid, anchor);
}
}
}

for op in outpoints.into_iter() {
if graph.get_tx(op.txid).is_none() {
for op in request.outpoints.into_iter() {
if graph_update.get_tx(op.txid).is_none() {
if let Some(tx) = self.get_tx(&op.txid).await? {
let _ = graph.insert_tx(tx);
let _ = graph_update.insert_tx(tx);
}
let status = self.get_tx_status(&op.txid).await?;
if let Some(anchor) = anchor_from_status(&status) {
let _ = graph.insert_anchor(op.txid, anchor);
let _ = graph_update.insert_anchor(op.txid, anchor);
}
}

if let Some(op_status) = self.get_output_status(&op.txid, op.vout as _).await? {
if let Some(txid) = op_status.txid {
if graph.get_tx(txid).is_none() {
if graph_update.get_tx(txid).is_none() {
if let Some(tx) = self.get_tx(&txid).await? {
let _ = graph.insert_tx(tx);
let _ = graph_update.insert_tx(tx);
}
let status = self.get_tx_status(&txid).await?;
if let Some(anchor) = anchor_from_status(&status) {
let _ = graph.insert_anchor(txid, anchor);
let _ = graph_update.insert_anchor(txid, anchor);
}
}
}
}
}
Ok(graph)

// from wallet transactions determine missing blockchain heights
let missing_heights = graph_update.missing_heights(request.local_chain);
// get blockchain update from original request checkpoint and missing heights
let chain_update = self
.update_local_chain(request.local_chain.tip(), missing_heights)
.await?;

Ok(SyncResult {
graph_update,
chain_update,
})
}
}
Loading

0 comments on commit 0b7075f

Please sign in to comment.