Skip to content

Commit

Permalink
feat!: rework FullScanRequest and SyncRequest
Browse files Browse the repository at this point in the history
Change `FullScanRequest` and `SyncRequest` take in a `chain_tip` as an
option. In turn, `FullScanResult` and `SyncResult` are also changed to
return the update `chain_tip` as an option. This allows the caller to
opt-out of getting a `LocalChain` update.

Rework `FullScanRequest` and `SyncRequest` to have better ergonomics
when inspecting the progress of items of a sync request. Richer progress
data is provided to the inspect closure.

Introduce `FullScanRequestBuilder` and `SyncRequestBuilder`. Separating
out request-construction and request-consumption in different structs
simplifies the API and method names.

Simplify `EsploraExt` and `EsploraAsyncExt` back to having two methods
(`full_scan` and `sync`). The caller can still opt out of fetching a
`LocalChain` update with the new `FullScanRequest` and `SyncRequest`.
  • Loading branch information
evanlinjin committed Aug 14, 2024
1 parent 16c1c2c commit 44e2a79
Show file tree
Hide file tree
Showing 14 changed files with 1,276 additions and 1,153 deletions.
746 changes: 462 additions & 284 deletions crates/chain/src/spk_client.rs

Large diffs are not rendered by default.

81 changes: 55 additions & 26 deletions crates/electrum/src/bdk_electrum_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,31 +126,43 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
/// [`Wallet.calculate_fee_rate`]: https://docs.rs/bdk_wallet/latest/bdk_wallet/struct.Wallet.html#method.calculate_fee_rate
pub fn full_scan<K: Ord + Clone>(
&self,
request: FullScanRequest<K>,
request: impl Into<FullScanRequest<K>>,
stop_gap: usize,
batch_size: usize,
fetch_prev_txouts: bool,
) -> Result<FullScanResult<K>, Error> {
let (tip, latest_blocks) =
fetch_tip_and_latest_blocks(&self.inner, request.chain_tip.clone())?;
let mut graph_update = TxGraph::<ConfirmationBlockTime>::default();
let mut last_active_indices = BTreeMap::<K, u32>::new();
let mut request: FullScanRequest<K> = request.into();

let tip_and_latest_blocks = match request.chain_tip() {
Some(chain_tip) => Some(fetch_tip_and_latest_blocks(&self.inner, chain_tip)?),
None => None,
};

for (keychain, spks) in request.spks_by_keychain {
let mut graph_update = TxGraph::<ConfirmationBlockTime>::default();
let mut last_active_indices = BTreeMap::<K, u32>::default();
for keychain in request.keychains() {
let spks = request.iter_spks(keychain.clone());
if let Some(last_active_index) =
self.populate_with_spks(&mut graph_update, spks, stop_gap, batch_size)?
{
last_active_indices.insert(keychain, last_active_index);
}
}

let chain_update = chain_update(tip, &latest_blocks, graph_update.all_anchors())?;

// Fetch previous `TxOut`s for fee calculation if flag is enabled.
if fetch_prev_txouts {
self.fetch_prev_txout(&mut graph_update)?;
}

let chain_update = match tip_and_latest_blocks {
Some((chain_tip, latest_blocks)) => Some(chain_update(
chain_tip,
&latest_blocks,
graph_update.all_anchors(),
)?),
_ => None,
};

Ok(FullScanResult {
graph_update,
chain_update,
Expand Down Expand Up @@ -180,35 +192,52 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
/// [`CalculateFeeError::MissingTxOut`]: bdk_chain::tx_graph::CalculateFeeError::MissingTxOut
/// [`Wallet.calculate_fee`]: https://docs.rs/bdk_wallet/latest/bdk_wallet/struct.Wallet.html#method.calculate_fee
/// [`Wallet.calculate_fee_rate`]: https://docs.rs/bdk_wallet/latest/bdk_wallet/struct.Wallet.html#method.calculate_fee_rate
pub fn sync(
pub fn sync<I: 'static>(
&self,
request: SyncRequest,
request: impl Into<SyncRequest<I>>,
batch_size: usize,
fetch_prev_txouts: bool,
) -> Result<SyncResult, Error> {
let full_scan_req = FullScanRequest::from_chain_tip(request.chain_tip.clone())
.set_spks_for_keychain((), request.spks.enumerate().map(|(i, spk)| (i as u32, spk)));
let mut full_scan_res = self.full_scan(full_scan_req, usize::MAX, batch_size, false)?;
let (tip, latest_blocks) =
fetch_tip_and_latest_blocks(&self.inner, request.chain_tip.clone())?;

self.populate_with_txids(&mut full_scan_res.graph_update, request.txids)?;
self.populate_with_outpoints(&mut full_scan_res.graph_update, request.outpoints)?;

let chain_update = chain_update(
tip,
&latest_blocks,
full_scan_res.graph_update.all_anchors(),
)?;
let mut request: SyncRequest<I> = request.into();

let tip_and_latest_blocks = match request.chain_tip() {
Some(chain_tip) => Some(fetch_tip_and_latest_blocks(&self.inner, chain_tip)?),
None => None,
};

let full_scan_request = FullScanRequest::builder()
.spks_for_keychain(
(),
request
.iter_spks()
.enumerate()
.map(|(i, spk)| (i as u32, spk))
.collect::<Vec<_>>(),
)
.build();
let mut graph_update = self
.full_scan(full_scan_request, usize::MAX, batch_size, false)?
.graph_update;
self.populate_with_txids(&mut graph_update, request.iter_txids())?;
self.populate_with_outpoints(&mut graph_update, request.iter_outpoints())?;

// Fetch previous `TxOut`s for fee calculation if flag is enabled.
if fetch_prev_txouts {
self.fetch_prev_txout(&mut full_scan_res.graph_update)?;
self.fetch_prev_txout(&mut graph_update)?;
}

let chain_update = match tip_and_latest_blocks {
Some((chain_tip, latest_blocks)) => Some(chain_update(
chain_tip,
&latest_blocks,
graph_update.all_anchors(),
)?),
None => None,
};

Ok(SyncResult {
graph_update,
chain_update,
graph_update: full_scan_res.graph_update,
})
}

Expand Down
42 changes: 25 additions & 17 deletions crates/electrum/tests/test_electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ where
Spks::IntoIter: ExactSizeIterator + Send + 'static,
{
let mut update = client.sync(
SyncRequest::from_chain_tip(chain.tip()).chain_spks(spks),
SyncRequest::builder().chain_tip(chain.tip()).spks(spks),
BATCH_SIZE,
true,
)?;
Expand All @@ -51,9 +51,11 @@ where
.as_secs();
let _ = update.graph_update.update_last_seen_unconfirmed(now);

let _ = chain
.apply_update(update.chain_update.clone())
.map_err(|err| anyhow::anyhow!("LocalChain update error: {:?}", err))?;
if let Some(chain_update) = update.chain_update.clone() {
let _ = chain
.apply_update(chain_update)
.map_err(|err| anyhow::anyhow!("LocalChain update error: {:?}", err))?;
}
let _ = graph.apply_update(update.graph_update.clone());

Ok(update)
Expand Down Expand Up @@ -103,7 +105,9 @@ pub fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> {
let cp_tip = env.make_checkpoint_tip();

let sync_update = {
let request = SyncRequest::from_chain_tip(cp_tip.clone()).set_spks(misc_spks);
let request = SyncRequest::builder()
.chain_tip(cp_tip.clone())
.spks(misc_spks);
client.sync(request, 1, true)?
};

Expand Down Expand Up @@ -207,15 +211,17 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> {
// A scan with a stop_gap of 3 won't find the transaction, but a scan with a gap limit of 4
// will.
let full_scan_update = {
let request =
FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
let request = FullScanRequest::builder()
.chain_tip(cp_tip.clone())
.spks_for_keychain(0, spks.clone());
client.full_scan(request, 3, 1, false)?
};
assert!(full_scan_update.graph_update.full_txs().next().is_none());
assert!(full_scan_update.last_active_indices.is_empty());
let full_scan_update = {
let request =
FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
let request = FullScanRequest::builder()
.chain_tip(cp_tip.clone())
.spks_for_keychain(0, spks.clone());
client.full_scan(request, 4, 1, false)?
};
assert_eq!(
Expand Down Expand Up @@ -246,8 +252,9 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> {
// A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will.
// The last active indice won't be updated in the first case but will in the second one.
let full_scan_update = {
let request =
FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
let request = FullScanRequest::builder()
.chain_tip(cp_tip.clone())
.spks_for_keychain(0, spks.clone());
client.full_scan(request, 5, 1, false)?
};
let txs: HashSet<_> = full_scan_update
Expand All @@ -259,8 +266,9 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> {
assert!(txs.contains(&txid_4th_addr));
assert_eq!(full_scan_update.last_active_indices[&0], 3);
let full_scan_update = {
let request =
FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
let request = FullScanRequest::builder()
.chain_tip(cp_tip.clone())
.spks_for_keychain(0, spks.clone());
client.full_scan(request, 6, 1, false)?
};
let txs: HashSet<_> = full_scan_update
Expand Down Expand Up @@ -311,7 +319,7 @@ fn test_sync() -> anyhow::Result<()> {
let txid = env.send(&addr_to_track, SEND_AMOUNT)?;
env.wait_until_electrum_sees_txid(txid, Duration::from_secs(6))?;

sync_with_electrum(
let _ = sync_with_electrum(
&client,
[spk_to_track.clone()],
&mut recv_chain,
Expand All @@ -332,7 +340,7 @@ fn test_sync() -> anyhow::Result<()> {
env.mine_blocks(1, None)?;
env.wait_until_electrum_sees_block(Duration::from_secs(6))?;

sync_with_electrum(
let _ = sync_with_electrum(
&client,
[spk_to_track.clone()],
&mut recv_chain,
Expand All @@ -353,7 +361,7 @@ fn test_sync() -> anyhow::Result<()> {
env.reorg_empty_blocks(1)?;
env.wait_until_electrum_sees_block(Duration::from_secs(6))?;

sync_with_electrum(
let _ = sync_with_electrum(
&client,
[spk_to_track.clone()],
&mut recv_chain,
Expand All @@ -373,7 +381,7 @@ fn test_sync() -> anyhow::Result<()> {
env.mine_blocks(1, None)?;
env.wait_until_electrum_sees_block(Duration::from_secs(6))?;

sync_with_electrum(&client, [spk_to_track], &mut recv_chain, &mut recv_graph)?;
let _ = sync_with_electrum(&client, [spk_to_track], &mut recv_chain, &mut recv_graph)?;

// Check if balance is correct once transaction is confirmed again.
assert_eq!(
Expand Down
Loading

0 comments on commit 44e2a79

Please sign in to comment.