Skip to content

Commit

Permalink
feat(esplora): update to use SyncRequest,SyncResult and FullScanReque…
Browse files Browse the repository at this point in the history
…st,FullScanResult
  • Loading branch information
notmandatory committed Feb 28, 2024
1 parent 7a54f35 commit fa12c11
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 146 deletions.
126 changes: 69 additions & 57 deletions crates/esplora/src/async_ext.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use bdk_chain::collections::btree_map;
use bdk_chain::spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult};
use bdk_chain::{
bitcoin::{BlockHash, OutPoint, ScriptBuf, TxOut, Txid},
collections::BTreeMap,
Expand All @@ -8,6 +9,7 @@ use bdk_chain::{
};
use esplora_client::TxStatus;
use futures::{stream::FuturesOrdered, TryStreamExt};
use std::fmt::Debug;

use crate::anchor_from_status;

Expand Down Expand Up @@ -45,22 +47,17 @@ pub trait EsploraAsyncExt {
) -> Result<local_chain::Update, Error>;

/// Full scan the keychain scripts specified with the blockchain (via an Esplora client) and
/// returns a [`TxGraph`] and a map of last active indices.
///
/// * `keychain_spks`: keychains that we want to scan transactions for
/// returns a [`TxGraph`] and a map of keychain last active indices.
///
/// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
/// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
/// parallel.
async fn full_scan<K: Ord + Clone + Send>(
async fn full_scan<K: Ord + Clone + Send + Debug, I: Iterator<Item = (u32, ScriptBuf)> + Send>(
&self,
keychain_spks: BTreeMap<
K,
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send,
>,
request: FullScanRequest<K, I>,
stop_gap: usize,
parallel_requests: usize,
) -> Result<(TxGraph<ConfirmationTimeHeightAnchor>, BTreeMap<K, u32>), Error>;
) -> Result<FullScanResult<K>, Error>;

/// Sync a set of scripts with the blockchain (via an Esplora client) for the data
/// specified and return a [`TxGraph`].
Expand All @@ -76,11 +73,9 @@ pub trait EsploraAsyncExt {
/// [`full_scan`]: EsploraAsyncExt::full_scan
async fn sync(
&self,
misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = ScriptBuf> + Send> + Send,
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
request: SyncRequest,
parallel_requests: usize,
) -> Result<TxGraph<ConfirmationTimeHeightAnchor>, Error>;
) -> Result<SyncResult, Error>;
}

#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
Expand Down Expand Up @@ -149,25 +144,24 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
})
}

async fn full_scan<K: Ord + Clone + Send>(
async fn full_scan<
K: Ord + Clone + Send + Debug,
I: Iterator<Item = (u32, ScriptBuf)> + Send,
>(
&self,
keychain_spks: BTreeMap<
K,
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send,
>,
mut request: FullScanRequest<K, I>,
stop_gap: usize,
parallel_requests: usize,
) -> Result<(TxGraph<ConfirmationTimeHeightAnchor>, BTreeMap<K, u32>), Error> {
) -> Result<FullScanResult<K>, Error> {
type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
let parallel_requests = Ord::max(parallel_requests, 1);
let mut graph = TxGraph::<ConfirmationTimeHeightAnchor>::default();
let mut last_active_indexes = BTreeMap::<K, u32>::new();
let mut graph_update = TxGraph::<ConfirmationTimeHeightAnchor>::default();
let mut last_active_indices = BTreeMap::<K, u32>::new();

for (keychain, spks) in keychain_spks {
for (keychain, spks) in request.take_spks_by_keychain() {
let mut spks = spks.into_iter();
let mut last_index = Option::<u32>::None;
let mut last_active_index = Option::<u32>::None;

loop {
let handles = spks
.by_ref()
Expand Down Expand Up @@ -200,9 +194,9 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
last_active_index = Some(index);
}
for tx in txs {
let _ = graph.insert_tx(tx.to_tx());
let _ = graph_update.insert_tx(tx.to_tx());
if let Some(anchor) = anchor_from_status(&tx.status) {
let _ = graph.insert_anchor(tx.txid, anchor);
let _ = graph_update.insert_anchor(tx.txid, anchor);
}

let previous_outputs = tx.vin.iter().filter_map(|vin| {
Expand All @@ -220,7 +214,7 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
});

for (outpoint, txout) in previous_outputs {
let _ = graph.insert_txout(outpoint, txout);
let _ = graph_update.insert_txout(outpoint, txout);
}
}
}
Expand All @@ -237,42 +231,44 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
}

if let Some(last_active_index) = last_active_index {
last_active_indexes.insert(keychain, last_active_index);
last_active_indices.insert(keychain, last_active_index);
}
}

Ok((graph, last_active_indexes))
// new tx graph transactions determine possible missing blockchain heights
let missing_heights = graph_update.anchor_heights(request.chain_tip.height());
// get blockchain update from original request checkpoint and missing heights
let chain_update = self
.update_local_chain(request.chain_tip.clone(), missing_heights)
.await?;

Ok(FullScanResult {
graph_update,
chain_update,
last_active_indices,
})
}

async fn sync(
&self,
misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = ScriptBuf> + Send> + Send,
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
mut request: SyncRequest,
parallel_requests: usize,
) -> Result<TxGraph<ConfirmationTimeHeightAnchor>, Error> {
let mut graph = self
.full_scan(
[(
(),
misc_spks
.into_iter()
.enumerate()
.map(|(i, spk)| (i as u32, spk)),
)]
.into(),
usize::MAX,
parallel_requests,
)
) -> Result<SyncResult, Error> {
let mut full_scan_request = FullScanRequest::new(request.chain_tip.clone());
let spks = [(0, request.take_spks())].into();
full_scan_request.add_spks_by_keychain(spks);

let mut graph_update = self
.full_scan(full_scan_request, usize::MAX, parallel_requests)
.await
.map(|(g, _)| g)?;
.map(|result| result.graph_update)?;

let mut txids = txids.into_iter();
let mut txids = request.take_txids();
loop {
let handles = txids
.by_ref()
.take(parallel_requests)
.filter(|&txid| graph.get_tx(txid).is_none())
.filter(|&txid| graph_update.get_tx(txid).is_none())
.map(|txid| {
let client = self.clone();
async move { client.get_tx_status(&txid).await.map(|s| (txid, s)) }
Expand All @@ -285,36 +281,52 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {

for (txid, status) in handles.try_collect::<Vec<(Txid, TxStatus)>>().await? {
if let Some(anchor) = anchor_from_status(&status) {
let _ = graph.insert_anchor(txid, anchor);
let _ = graph_update.insert_anchor(txid, anchor);
}
}
}

for op in outpoints.into_iter() {
if graph.get_tx(op.txid).is_none() {
for op in request.take_outpoints() {
if graph_update.get_tx(op.txid).is_none() {
if let Some(tx) = self.get_tx(&op.txid).await? {
let _ = graph.insert_tx(tx);
let _ = graph_update.insert_tx(tx);
}
let status = self.get_tx_status(&op.txid).await?;
if let Some(anchor) = anchor_from_status(&status) {
let _ = graph.insert_anchor(op.txid, anchor);
let _ = graph_update.insert_anchor(op.txid, anchor);
}
}

if let Some(op_status) = self.get_output_status(&op.txid, op.vout as _).await? {
if let Some(txid) = op_status.txid {
if graph.get_tx(txid).is_none() {
if graph_update.get_tx(txid).is_none() {
if let Some(tx) = self.get_tx(&txid).await? {
let _ = graph.insert_tx(tx);
let _ = graph_update.insert_tx(tx);
}
let status = self.get_tx_status(&txid).await?;
if let Some(anchor) = anchor_from_status(&status) {
let _ = graph.insert_anchor(txid, anchor);
let _ = graph_update.insert_anchor(txid, anchor);
}
}
}
}
}
Ok(graph)

// Now that we're done updating the `IndexedTxGraph`, it's time to update the `LocalChain`! We
// want the `LocalChain` to have data about all the anchors in the `TxGraph` - for this reason,
// we want to retrieve the blocks at the heights of the newly added anchors that are missing from
// our view of the chain.

// new tx graph transactions determine possible missing blockchain heights
let missing_heights = graph_update.anchor_heights(request.chain_tip.height());
// get blockchain update from original request checkpoint and missing heights
let chain_update = self
.update_local_chain(request.chain_tip.clone(), missing_heights)
.await?;

Ok(SyncResult {
graph_update,
chain_update,
})
}
}
Loading

0 comments on commit fa12c11

Please sign in to comment.