Skip to content

Commit

Permalink
chain: improvements to IndexedTxGraph and TxGraph APIs
Browse files Browse the repository at this point in the history
For `IndexedTxGraph`:
- Remove `InsertTxItem` type (this is too complex).
    - `batch_insert_relevant` now uses a simple tuple `(&tx, anchors)`.
    - `batch_insert` is now also removed, as the same functionality can be
      done elsewhere.
- Add internal helper method `index_tx_graph_changeset` so we don't need
  to create a seprate `TxGraph` update in each method.
- `batch_insert_<relevant>_unconfirmed` no longer takes in an option of
  last_seen.
- `batch_insert_unconfirmed` no longer takes a reference of a
  transaction (since we apply all transactions anyway, so there is no
  need to clone).

For `TxGraph`:
- Add `batch_insert_unconfirmed` method.
  • Loading branch information
evanlinjin committed Oct 9, 2023
1 parent 150d6f8 commit 4f5695d
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 115 deletions.
2 changes: 1 addition & 1 deletion crates/bdk/src/wallet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ impl<D> Wallet<D> {
where
D: PersistBackend<ChangeSet>,
{
let additions = self.indexed_graph.insert_txout(outpoint, &txout);
let additions = self.indexed_graph.insert_txout(outpoint, txout);
self.persist.stage(ChangeSet::from(additions));
}

Expand Down
21 changes: 3 additions & 18 deletions crates/bitcoind_rpc/tests/test_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::collections::{BTreeMap, BTreeSet};
use bdk_bitcoind_rpc::Emitter;
use bdk_chain::{
bitcoin::{Address, Amount, BlockHash, Txid},
indexed_tx_graph::InsertTxItem,
keychain::Balance,
local_chain::{self, CheckPoint, LocalChain},
Append, BlockId, IndexedTxGraph, SpkTxOutIndex,
Expand Down Expand Up @@ -180,17 +179,6 @@ fn block_to_chain_update(block: &bitcoin::Block, height: u32) -> local_chain::Up
}
}

fn block_to_tx_graph_update(
block: &bitcoin::Block,
height: u32,
) -> impl Iterator<Item = InsertTxItem<'_, Option<BlockId>>> {
let anchor = BlockId {
hash: block.block_hash(),
height,
};
block.txdata.iter().map(move |tx| (tx, Some(anchor), None))
}

/// Ensure that blocks are emitted in order even after reorg.
///
/// 1. Mine 101 blocks.
Expand Down Expand Up @@ -321,8 +309,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {

while let Some((height, block)) = emitter.next_block()? {
let _ = chain.apply_update(block_to_chain_update(&block, height))?;
let indexed_additions =
indexed_tx_graph.batch_insert_relevant(block_to_tx_graph_update(&block, height));
let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height);
assert!(indexed_additions.is_empty());
}

Expand Down Expand Up @@ -350,8 +337,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
assert!(emitter.next_block()?.is_none());

let mempool_txs = emitter.mempool()?;
let indexed_additions = indexed_tx_graph
.batch_insert_unconfirmed(mempool_txs.iter().map(|(tx, time)| (tx, Some(*time))));
let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs);
assert_eq!(
indexed_additions
.graph
Expand Down Expand Up @@ -383,8 +369,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
{
let (height, block) = emitter.next_block()?.expect("must get mined block");
let _ = chain.apply_update(block_to_chain_update(&block, height))?;
let indexed_additions =
indexed_tx_graph.batch_insert_relevant(block_to_tx_graph_update(&block, height));
let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height);
assert!(indexed_additions.graph.txs.is_empty());
assert!(indexed_additions.graph.txouts.is_empty());
assert_eq!(indexed_additions.graph.anchors, exp_anchors);
Expand Down
169 changes: 79 additions & 90 deletions crates/chain/src/indexed_tx_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,32 +72,34 @@ impl<A: Anchor, I: Indexer> IndexedTxGraph<A, I>
where
I::ChangeSet: Default + Append,
{
fn index_tx_graph_changeset(
&mut self,
tx_graph_changeset: &tx_graph::ChangeSet<A>,
) -> I::ChangeSet {
let mut changeset = I::ChangeSet::default();
for added_tx in &tx_graph_changeset.txs {
changeset.append(self.index.index_tx(added_tx));
}
for (&added_outpoint, added_txout) in &tx_graph_changeset.txouts {
changeset.append(self.index.index_txout(added_outpoint, added_txout));
}
changeset
}

/// Apply an `update` directly.
///
/// `update` is a [`TxGraph<A>`] and the resultant changes is returned as [`ChangeSet`].
pub fn apply_update(&mut self, update: TxGraph<A>) -> ChangeSet<A, I::ChangeSet> {
let graph = self.graph.apply_update(update);

let mut indexer = I::ChangeSet::default();
for added_tx in &graph.txs {
indexer.append(self.index.index_tx(added_tx));
}
for (&added_outpoint, added_txout) in &graph.txouts {
indexer.append(self.index.index_txout(added_outpoint, added_txout));
}

let indexer = self.index_tx_graph_changeset(&graph);
ChangeSet { graph, indexer }
}

/// Insert a floating `txout` of given `outpoint`.
pub fn insert_txout(
&mut self,
outpoint: OutPoint,
txout: &TxOut,
) -> ChangeSet<A, I::ChangeSet> {
let mut update = TxGraph::<A>::default();
let _ = update.insert_txout(outpoint, txout.clone());
self.apply_update(update)
pub fn insert_txout(&mut self, outpoint: OutPoint, txout: TxOut) -> ChangeSet<A, I::ChangeSet> {
let graph = self.graph.insert_txout(outpoint, txout);
let indexer = self.index_tx_graph_changeset(&graph);
ChangeSet { graph, indexer }
}

/// Insert and index a transaction into the graph.
Expand All @@ -112,18 +114,19 @@ where
) -> ChangeSet<A, I::ChangeSet> {
let txid = tx.txid();

let mut update = TxGraph::<A>::default();
let mut graph = tx_graph::ChangeSet::default();
if self.graph.get_tx(txid).is_none() {
let _ = update.insert_tx(tx.clone());
graph.append(self.graph.insert_tx(tx.clone()));
}
for anchor in anchors.into_iter() {
let _ = update.insert_anchor(txid, anchor);
graph.append(self.graph.insert_anchor(txid, anchor));
}
if let Some(seen_at) = seen_at {
let _ = update.insert_seen_at(txid, seen_at);
graph.append(self.graph.insert_seen_at(txid, seen_at));
}

self.apply_update(update)
let indexer = self.index_tx_graph_changeset(&graph);
ChangeSet { graph, indexer }
}

/// Batch insert transactions, filtering out those that are irrelevant.
Expand All @@ -132,85 +135,85 @@ where
/// transactions in `txs` will be ignored. `txs` do not need to be in topological order.
pub fn batch_insert_relevant<'t>(
&mut self,
txs: impl IntoIterator<Item = InsertTxItem<'t, impl IntoIterator<Item = A>>>,
txs: impl IntoIterator<Item = (&'t Transaction, impl IntoIterator<Item = A>)>,
) -> ChangeSet<A, I::ChangeSet> {
// The algorithm below allows for non-topologically ordered transactions by using two loops.
// This is achieved by:
// 1. insert all txs into the index. If they are irrelevant then that's fine it will just
// not store anything about them.
// 2. decide whether to insert them into the graph depending on whether `is_tx_relevant`
// returns true or not. (in a second loop).
let mut changeset = ChangeSet::<A, I::ChangeSet>::default();
let txs = txs.into_iter().collect::<Vec<_>>();

let txs = txs
.into_iter()
.inspect(|(tx, _, _)| changeset.indexer.append(self.index.index_tx(tx)))
.collect::<Vec<_>>();
let mut indexer = I::ChangeSet::default();
for (tx, _) in &txs {
indexer.append(self.index.index_tx(tx));
}

for (tx, anchors, seen_at) in txs {
let mut graph = tx_graph::ChangeSet::default();
for (tx, anchors) in txs {
if self.index.is_tx_relevant(tx) {
changeset.append(self.insert_tx(tx, anchors, seen_at));
let txid = tx.txid();
graph.append(self.graph.insert_tx(tx.clone()));
for anchor in anchors {
graph.append(self.graph.insert_anchor(txid, anchor));
}
}
}

changeset
}

/// Batch insert transactions.
///
/// All transactions in `txs` will be inserted. To filter out irrelevant transactions, use
/// [`batch_insert_relevant`] instead.
///
/// [`batch_insert_relevant`]: IndexedTxGraph::batch_insert_relevant
pub fn batch_insert<'t>(
&mut self,
txs: impl IntoIterator<Item = InsertTxItem<'t, impl IntoIterator<Item = A>>>,
) -> ChangeSet<A, I::ChangeSet> {
let mut changeset = ChangeSet::<A, I::ChangeSet>::default();
for (tx, anchors, seen_at) in txs {
changeset.indexer.append(self.index.index_tx(tx));
changeset.append(self.insert_tx(tx, anchors, seen_at));
}
changeset
ChangeSet { graph, indexer }
}

/// Batch insert unconfirmed transactions, filtering out those that are irrelevant.
///
/// Relevancy is determined by the internal [`Indexer::is_tx_relevant`] implementation of `I`.
/// Irrelevant tansactions in `txs` will be ignored.
///
/// Items of `txs` are tuples containing the transaction and an optional *last seen* timestamp.
/// The *last seen* communicates when the transaction is last seen in the mempool which is used
/// for conflict-resolution in [`TxGraph`] (refer to [`TxGraph::insert_seen_at`] for details).
/// Items of `txs` are tuples containing the transaction and a *last seen* timestamp. The
/// *last seen* communicates when the transaction is last seen in the mempool which is used for
/// conflict-resolution in [`TxGraph`] (refer to [`TxGraph::insert_seen_at`] for details).
pub fn batch_insert_relevant_unconfirmed<'t>(
&mut self,
unconfirmed_txs: impl IntoIterator<Item = (&'t Transaction, Option<u64>)>,
unconfirmed_txs: impl IntoIterator<Item = (&'t Transaction, u64)>,
) -> ChangeSet<A, I::ChangeSet> {
self.batch_insert_relevant(
unconfirmed_txs
.into_iter()
.map(|(tx, last_seen)| (tx, core::iter::empty(), last_seen)),
)
// The algorithm below allows for non-topologically ordered transactions by using two loops.
// This is achieved by:
// 1. insert all txs into the index. If they are irrelevant then that's fine it will just
// not store anything about them.
// 2. decide whether to insert them into the graph depending on whether `is_tx_relevant`
// returns true or not. (in a second loop).
let txs = unconfirmed_txs.into_iter().collect::<Vec<_>>();

let mut indexer = I::ChangeSet::default();
for (tx, _) in &txs {
indexer.append(self.index.index_tx(tx));
}

let graph = self.graph.batch_insert_unconfirmed(
txs.into_iter()
.filter(|(tx, _)| self.index.is_tx_relevant(tx))
.map(|(tx, seen_at)| (tx.clone(), seen_at)),
);

ChangeSet { graph, indexer }
}

/// Batch insert unconfirmed transactions.
///
/// Items of `txs` are tuples containing the transaction and an optional *last seen* timestamp.
/// The *last seen* communicates when the transaction is last seen in the mempool which is used
/// for conflict-resolution in [`TxGraph`] (refer to [`TxGraph::insert_seen_at`] for details).
/// Items of `txs` are tuples containing the transaction and a *last seen* timestamp. The
/// *last seen* communicates when the transaction is last seen in the mempool which is used for
/// conflict-resolution in [`TxGraph`] (refer to [`TxGraph::insert_seen_at`] for details).
///
/// To filter out irrelevant transactions, use [`batch_insert_relevant_unconfirmed`] instead.
///
/// [`batch_insert_relevant_unconfirmed`]: IndexedTxGraph::batch_insert_relevant_unconfirmed
pub fn batch_insert_unconfirmed<'t>(
pub fn batch_insert_unconfirmed(
&mut self,
unconfirmed_txs: impl IntoIterator<Item = (&'t Transaction, Option<u64>)>,
txs: impl IntoIterator<Item = (Transaction, u64)>,
) -> ChangeSet<A, I::ChangeSet> {
self.batch_insert(
unconfirmed_txs
.into_iter()
.map(|(tx, last_seen)| (tx, core::iter::empty(), last_seen)),
)
let graph = self.graph.batch_insert_unconfirmed(txs);
let indexer = self.index_tx_graph_changeset(&graph);
ChangeSet { graph, indexer }
}
}

Expand Down Expand Up @@ -241,7 +244,6 @@ where
(
tx,
core::iter::once(A::from_block_position(&block, block_id, tx_pos)),
None,
)
});
self.batch_insert_relevant(txs)
Expand All @@ -260,30 +262,17 @@ where
hash: block.block_hash(),
height,
};
let txs = block.txdata.iter().enumerate().map(|(tx_pos, tx)| {
(
tx,
core::iter::once(A::from_block_position(&block, block_id, tx_pos)),
None,
)
});
self.batch_insert(txs)
let mut graph = tx_graph::ChangeSet::default();
for (tx_pos, tx) in block.txdata.iter().enumerate() {
let anchor = A::from_block_position(&block, block_id, tx_pos);
graph.append(self.graph.insert_anchor(tx.txid(), anchor));
graph.append(self.graph.insert_tx(tx.clone()));
}
let indexer = self.index_tx_graph_changeset(&graph);
ChangeSet { graph, indexer }
}
}

/// A tuple of a transaction, and associated metadata, that are to be inserted into [`IndexedTxGraph`].
///
/// This tuple contains fields in the following order:
/// * A reference to the transaction.
/// * A collection of [`Anchor`]s.
/// * An optional last-seen timestamp.
///
/// This is used as a input item of [`batch_insert_relevant`] and [`batch_insert`].
///
/// [`batch_insert_relevant`]: IndexedTxGraph::batch_insert_relevant
/// [`batch_insert`]: IndexedTxGraph::batch_insert
pub type InsertTxItem<'t, A> = (&'t Transaction, A, Option<u64>);

/// A structure that represents changes to an [`IndexedTxGraph`].
#[derive(Clone, Debug, PartialEq)]
#[cfg_attr(
Expand Down
17 changes: 17 additions & 0 deletions crates/chain/src/tx_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,23 @@ impl<A: Clone + Ord> TxGraph<A> {
self.apply_update(update)
}

/// Batch insert unconfirmed transactions.
///
/// Items of `txs` are tuples containing the transaction and a *last seen* timestamp. The
/// *last seen* communicates when the transaction is last seen in the mempool which is used for
/// conflict-resolution (refer to [`TxGraph::insert_seen_at`] for details).
pub fn batch_insert_unconfirmed(
&mut self,
txs: impl IntoIterator<Item = (Transaction, u64)>,
) -> ChangeSet<A> {
let mut changeset = ChangeSet::<A>::default();
for (tx, seen_at) in txs {
changeset.append(self.insert_seen_at(tx.txid(), seen_at));
changeset.append(self.insert_tx(tx));
}
changeset
}

/// Inserts the given `anchor` into [`TxGraph`].
///
/// The [`ChangeSet`] returned will be empty if graph already knows that `txid` exists in
Expand Down
5 changes: 2 additions & 3 deletions crates/chain/tests/test_indexed_tx_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ fn insert_relevant_txs() {
};

assert_eq!(
graph.batch_insert_relevant(txs.iter().map(|tx| (tx, None, None))),
graph.batch_insert_relevant(txs.iter().map(|tx| (tx, None))),
changeset,
);

Expand Down Expand Up @@ -225,11 +225,10 @@ fn test_list_owned_txouts() {
anchor_block,
confirmation_height: anchor_block.height,
}),
None,
)
}));

let _ = graph.batch_insert_relevant([&tx4, &tx5].iter().map(|tx| (*tx, None, Some(100))));
let _ = graph.batch_insert_relevant_unconfirmed([&tx4, &tx5].iter().map(|tx| (*tx, 100)));

// A helper lambda to extract and filter data from the graph.
let fetch =
Expand Down
5 changes: 2 additions & 3 deletions example-crates/example_bitcoind_rpc_polling/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ fn main() -> anyhow::Result<()> {

// mempool
let mempool_txs = emitter.mempool()?;
let graph_changeset = graph
.batch_insert_unconfirmed(mempool_txs.iter().map(|(tx, time)| (tx, Some(*time))));
let graph_changeset = graph.batch_insert_unconfirmed(mempool_txs);
db.stage((local_chain::ChangeSet::default(), graph_changeset));

// commit one last time!
Expand Down Expand Up @@ -291,7 +290,7 @@ fn main() -> anyhow::Result<()> {
}
Emission::Mempool(mempool_txs) => {
let graph_changeset = graph.batch_insert_relevant_unconfirmed(
mempool_txs.iter().map(|(tx, time)| (tx, Some(*time))),
mempool_txs.iter().map(|(tx, time)| (tx, *time)),
);
(local_chain::ChangeSet::default(), graph_changeset)
}
Expand Down

0 comments on commit 4f5695d

Please sign in to comment.