Skip to content

Commit

Permalink
feat(esplora): greatly simplify update_local_chain
Browse files Browse the repository at this point in the history
  • Loading branch information
evanlinjin committed Jan 19, 2024
1 parent 25653d7 commit f05e850
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 145 deletions.
95 changes: 24 additions & 71 deletions crates/esplora/src/async_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use async_trait::async_trait;
use bdk_chain::collections::btree_map;
use bdk_chain::{
bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid},
collections::{BTreeMap, BTreeSet},
collections::BTreeMap,
local_chain::{self, CheckPoint},
BlockId, ConfirmationTimeHeightAnchor, TxGraph,
};
use esplora_client::{Error, TxStatus};
use futures::{stream::FuturesOrdered, TryStreamExt};

use crate::{anchor_from_status, ASSUME_FINAL_DEPTH};
use crate::anchor_from_status;

/// Trait to extend the functionality of [`esplora_client::AsyncClient`].
///
Expand Down Expand Up @@ -85,10 +85,11 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
local_tip: CheckPoint,
request_heights: impl IntoIterator<IntoIter = impl Iterator<Item = u32> + Send> + Send,
) -> Result<local_chain::Update, Error> {
let request_heights = request_heights.into_iter().collect::<BTreeSet<_>>();
let new_tip_height = self.get_height().await?;

// atomically fetch blocks from esplora
// Atomically fetch latest blocks from Esplora. This way, we avoid creating an update with
// an inconsistent set of blocks (assuming that a reorg depth cannot be greater than the
// latest blocks fetched).
let mut fetched_blocks = {
let heights = (0..=new_tip_height).rev();
let hashes = self
Expand All @@ -99,89 +100,41 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
heights.zip(hashes).collect::<BTreeMap<u32, BlockHash>>()
};

// fetch heights that the caller is interested in
// fetch blocks of heights that the caller is interested in, reusing latest blocks that are
// already fetched.
for height in request_heights {
// do not fetch blocks higher than remote tip
if height > new_tip_height {
continue;
}
// only fetch what is missing
if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) {
let hash = self.get_block_hash(height).await?;
entry.insert(hash);
entry.insert(self.get_block_hash(height).await?);
}
}

// find the earliest point of agreement between local chain and fetched chain
let earliest_agreement_cp = {
let mut earliest_agreement_cp = Option::<CheckPoint>::None;

let local_tip_height = local_tip.height();
for local_cp in local_tip.iter() {
let local_block = local_cp.block_id();

// the updated hash (block hash at this height after the update), can either be:
// 1. a block that already existed in `fetched_blocks`
// 2. a block that exists locally and at least has a depth of ASSUME_FINAL_DEPTH
// 3. otherwise we can freshly fetch the block from remote, which is safe as it
// is guaranteed that this would be at or below ASSUME_FINAL_DEPTH from the
// remote tip
let updated_hash = match fetched_blocks.entry(local_block.height) {
btree_map::Entry::Occupied(entry) => *entry.get(),
btree_map::Entry::Vacant(entry) => *entry.insert(
if local_tip_height - local_block.height >= ASSUME_FINAL_DEPTH {
local_block.hash
} else {
self.get_block_hash(local_block.height).await?
},
),
};

// since we may introduce blocks below the point of agreement, we cannot break
// here unconditionally - we only break if we guarantee there are no new heights
// below our current local checkpoint
if local_block.hash == updated_hash {
earliest_agreement_cp = Some(local_cp);

let first_new_height = *fetched_blocks
.keys()
.next()
.expect("must have at least one new block");
if first_new_height >= local_block.height {
break;
}
}
// Ensure `fetched_blocks` can create an update that connects with the original chain.
for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) {
if height > new_tip_height {
continue;
}

earliest_agreement_cp
};

let tip = {
// first checkpoint to use for the update chain
let first_cp = match earliest_agreement_cp {
Some(cp) => cp,
None => {
let (&height, &hash) = fetched_blocks
.iter()
.next()
.expect("must have at least one new block");
CheckPoint::new(BlockId { height, hash })
let fetched_hash = match fetched_blocks.entry(height) {
btree_map::Entry::Occupied(entry) => *entry.get(),
btree_map::Entry::Vacant(entry) => {
*entry.insert(self.get_block_hash(height).await?)
}
};
// transform fetched chain into the update chain
fetched_blocks
// we exclude anything at or below the first cp of the update chain otherwise
// building the chain will fail
.split_off(&(first_cp.height() + 1))
.into_iter()
.map(|(height, hash)| BlockId { height, hash })
.fold(first_cp, |prev_cp, block| {
prev_cp.push(block).expect("must extend checkpoint")
})
};

// We have found point of agreement so the update will connect!
if fetched_hash == local_hash {
break;
}
}

Ok(local_chain::Update {
tip,
tip: CheckPoint::from_block_ids(fetched_blocks.into_iter().map(BlockId::from))
.expect("must be in height order"),
introduce_older_blocks: true,
})
}
Expand Down
95 changes: 23 additions & 72 deletions crates/esplora/src/blocking_ext.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::thread::JoinHandle;

use bdk_chain::collections::btree_map;
use bdk_chain::collections::{BTreeMap, BTreeSet};
use bdk_chain::collections::BTreeMap;
use bdk_chain::{
bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid},
local_chain::{self, CheckPoint},
BlockId, ConfirmationTimeHeightAnchor, TxGraph,
};
use esplora_client::{Error, TxStatus};

use crate::{anchor_from_status, ASSUME_FINAL_DEPTH};
use crate::anchor_from_status;

/// Trait to extend the functionality of [`esplora_client::BlockingClient`].
///
Expand Down Expand Up @@ -78,10 +78,11 @@ impl EsploraExt for esplora_client::BlockingClient {
local_tip: CheckPoint,
request_heights: impl IntoIterator<Item = u32>,
) -> Result<local_chain::Update, Error> {
let request_heights = request_heights.into_iter().collect::<BTreeSet<_>>();
let new_tip_height = self.get_height()?;

// atomically fetch blocks from esplora
// Atomically fetch latest blocks from Esplora. This way, we avoid creating an update with
// an inconsistent set of blocks (assuming that a reorg depth cannot be greater than the
// latest blocks fetched).
let mut fetched_blocks = {
let heights = (0..=new_tip_height).rev();
let hashes = self
Expand All @@ -91,89 +92,39 @@ impl EsploraExt for esplora_client::BlockingClient {
heights.zip(hashes).collect::<BTreeMap<u32, BlockHash>>()
};

// fetch heights that the caller is interested in
// fetch blocks of heights that the caller is interested in, reusing latest blocks that are
// already fetched.
for height in request_heights {
// do not fetch blocks higher than remote tip
if height > new_tip_height {
continue;
}
// only fetch what is missing
if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) {
let hash = self.get_block_hash(height)?;
entry.insert(hash);
entry.insert(self.get_block_hash(height)?);
}
}

// find the earliest point of agreement between local chain and fetched chain
let earliest_agreement_cp = {
let mut earliest_agreement_cp = Option::<CheckPoint>::None;

let local_tip_height = local_tip.height();
for local_cp in local_tip.iter() {
let local_block = local_cp.block_id();

// the updated hash (block hash at this height after the update), can either be:
// 1. a block that already existed in `fetched_blocks`
// 2. a block that exists locally and at least has a depth of ASSUME_FINAL_DEPTH
// 3. otherwise we can freshly fetch the block from remote, which is safe as it
// is guaranteed that this would be at or below ASSUME_FINAL_DEPTH from the
// remote tip
let updated_hash = match fetched_blocks.entry(local_block.height) {
btree_map::Entry::Occupied(entry) => *entry.get(),
btree_map::Entry::Vacant(entry) => *entry.insert(
if local_tip_height - local_block.height >= ASSUME_FINAL_DEPTH {
local_block.hash
} else {
self.get_block_hash(local_block.height)?
},
),
};

// since we may introduce blocks below the point of agreement, we cannot break
// here unconditionally - we only break if we guarantee there are no new heights
// below our current local checkpoint
if local_block.hash == updated_hash {
earliest_agreement_cp = Some(local_cp);

let first_new_height = *fetched_blocks
.keys()
.next()
.expect("must have at least one new block");
if first_new_height >= local_block.height {
break;
}
}
// Ensure `fetched_blocks` can create an update that connects with the original chain.
for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) {
if height > new_tip_height {
continue;
}

earliest_agreement_cp
};

let tip = {
// first checkpoint to use for the update chain
let first_cp = match earliest_agreement_cp {
Some(cp) => cp,
None => {
let (&height, &hash) = fetched_blocks
.iter()
.next()
.expect("must have at least one new block");
CheckPoint::new(BlockId { height, hash })
}
let fetched_hash = match fetched_blocks.entry(height) {
btree_map::Entry::Occupied(entry) => *entry.get(),
btree_map::Entry::Vacant(entry) => *entry.insert(self.get_block_hash(height)?),
};
// transform fetched chain into the update chain
fetched_blocks
// we exclude anything at or below the first cp of the update chain otherwise
// building the chain will fail
.split_off(&(first_cp.height() + 1))
.into_iter()
.map(|(height, hash)| BlockId { height, hash })
.fold(first_cp, |prev_cp, block| {
prev_cp.push(block).expect("must extend checkpoint")
})
};

// We have found point of agreement so the update will connect!
if fetched_hash == local_hash {
break;
}
}

Ok(local_chain::Update {
tip,
tip: CheckPoint::from_block_ids(fetched_blocks.into_iter().map(BlockId::from))
.expect("must be in height order"),
introduce_older_blocks: true,
})
}
Expand Down
2 changes: 0 additions & 2 deletions crates/esplora/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ mod async_ext;
#[cfg(feature = "async")]
pub use async_ext::*;

const ASSUME_FINAL_DEPTH: u32 = 15;

fn anchor_from_status(status: &TxStatus) -> Option<ConfirmationTimeHeightAnchor> {
if let TxStatus {
block_height: Some(height),
Expand Down

0 comments on commit f05e850

Please sign in to comment.