Skip to content

Commit

Permalink
chain: add batch-insert methods for IndexedTxGraph
Browse files Browse the repository at this point in the history
  • Loading branch information
evanlinjin committed Oct 9, 2023
1 parent 43bc813 commit 240657b
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 31 deletions.
166 changes: 142 additions & 24 deletions crates/chain/src/indexed_tx_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
//! This is essentially a [`TxGraph`] combined with an indexer.
use alloc::vec::Vec;
use bitcoin::{OutPoint, Transaction, TxOut};
use bitcoin::{Block, OutPoint, Transaction, TxOut};

use crate::{
keychain,
tx_graph::{self, TxGraph},
Anchor, Append,
Anchor, AnchorFromBlockPosition, Append, BlockId,
};

/// A struct that combines [`TxGraph`] and an [`Indexer`] implementation.
Expand Down Expand Up @@ -126,17 +126,13 @@ where
self.apply_update(update)
}

/// Insert relevant transactions from the given `txs` iterator.
/// Batch insert transactions, filtering out those that are irrelevant.
///
/// Relevancy is determined by the [`Indexer::is_tx_relevant`] implementation of `I`. Irrelevant
/// transactions in `txs` will be ignored. `txs` do not need to be in topological order.
///
/// `anchors` can be provided to anchor the transactions to blocks. `seen_at` is a unix
/// timestamp of when the transactions are last seen.
pub fn insert_relevant_txs<'t>(
pub fn batch_insert_relevant<'t>(
&mut self,
txs: impl IntoIterator<Item = (&'t Transaction, impl IntoIterator<Item = A>)>,
seen_at: Option<u64>,
txs: impl IntoIterator<Item = InsertTxItem<'t, impl IntoIterator<Item = A>>>,
) -> ChangeSet<A, I::ChangeSet> {
// The algorithm below allows for non-topologically ordered transactions by using two loops.
// This is achieved by:
Expand All @@ -145,27 +141,149 @@ where
// 2. decide whether to insert them into the graph depending on whether `is_tx_relevant`
// returns true or not. (in a second loop).
let mut changeset = ChangeSet::<A, I::ChangeSet>::default();
let mut transactions = Vec::new();
for (tx, anchors) in txs.into_iter() {

let txs = txs
.into_iter()
.inspect(|(tx, _, _)| changeset.indexer.append(self.index.index_tx(tx)))
.collect::<Vec<_>>();

for (tx, anchors, seen_at) in txs {
if self.index.is_tx_relevant(tx) {
changeset.append(self.insert_tx(tx, anchors, seen_at));
}
}

changeset
}

/// Batch insert transactions.
///
/// All transactions in `txs` will be inserted. To filter out irrelevant transactions, use
/// [`batch_insert_relevant`] instead.
///
/// [`batch_insert_relevant`]: IndexedTxGraph::batch_insert_relevant
pub fn batch_insert<'t>(
&mut self,
txs: impl IntoIterator<Item = InsertTxItem<'t, impl IntoIterator<Item = A>>>,
) -> ChangeSet<A, I::ChangeSet> {
let mut changeset = ChangeSet::<A, I::ChangeSet>::default();
for (tx, anchors, seen_at) in txs {
changeset.indexer.append(self.index.index_tx(tx));
transactions.push((tx, anchors));
changeset.append(self.insert_tx(tx, anchors, seen_at));
}
changeset.append(
transactions
.into_iter()
.filter_map(|(tx, anchors)| match self.index.is_tx_relevant(tx) {
true => Some(self.insert_tx(tx, anchors, seen_at)),
false => None,
})
.fold(Default::default(), |mut acc, other| {
acc.append(other);
acc
}),
);
changeset
}

/// Batch insert unconfirmed transactions, filtering out those that are irrelevant.
///
/// Relevancy is determined by the internal [`Indexer::is_tx_relevant`] implementation of `I`.
/// Irrelevant tansactions in `txs` will be ignored.
///
/// Items of `txs` are tuples containing the transaction and an optional *last seen* timestamp.
/// The *last seen* communicates when the transaction is last seen in the mempool which is used
/// for conflict-resolution in [`TxGraph`] (refer to [`TxGraph::insert_seen_at`] for details).
pub fn batch_insert_relevant_unconfirmed<'t>(
&mut self,
unconfirmed_txs: impl IntoIterator<Item = (&'t Transaction, Option<u64>)>,
) -> ChangeSet<A, I::ChangeSet> {
self.batch_insert_relevant(
unconfirmed_txs
.into_iter()
.map(|(tx, last_seen)| (tx, core::iter::empty(), last_seen)),
)
}

/// Batch insert unconfirmed transactions.
///
/// Items of `txs` are tuples containing the transaction and an optional *last seen* timestamp.
/// The *last seen* communicates when the transaction is last seen in the mempool which is used
/// for conflict-resolution in [`TxGraph`] (refer to [`TxGraph::insert_seen_at`] for details).
///
/// To filter out irrelevant transactions, use [`batch_insert_relevant_unconfirmed`] instead.
///
/// [`batch_insert_relevant_unconfirmed`]: IndexedTxGraph::batch_insert_relevant_unconfirmed
pub fn batch_insert_unconfirmed<'t>(
&mut self,
unconfirmed_txs: impl IntoIterator<Item = (&'t Transaction, Option<u64>)>,
) -> ChangeSet<A, I::ChangeSet> {
self.batch_insert(
unconfirmed_txs
.into_iter()
.map(|(tx, last_seen)| (tx, core::iter::empty(), last_seen)),
)
}
}

/// Methods are available if the anchor (`A`) implements [`AnchorFromBlockPosition`].
impl<A: Anchor, I: Indexer> IndexedTxGraph<A, I>
where
I::ChangeSet: Default + Append,
A: AnchorFromBlockPosition,
{
/// Batch insert all transactions of the given `block` of `height`, filtering out those that are
/// irrelevant.
///
/// Each inserted transaction's anchor will be constructed from
/// [`AnchorFromBlockPosition::from_block_position`].
///
/// Relevancy is determined by the internal [`Indexer::is_tx_relevant`] implementation of `I`.
/// Irrelevant tansactions in `txs` will be ignored.
pub fn apply_block_relevant(
&mut self,
block: Block,
height: u32,
) -> ChangeSet<A, I::ChangeSet> {
let block_id = BlockId {
hash: block.block_hash(),
height,
};
let txs = block.txdata.iter().enumerate().map(|(tx_pos, tx)| {
(
tx,
core::iter::once(A::from_block_position(&block, block_id, tx_pos)),
None,
)
});
self.batch_insert_relevant(txs)
}

/// Batch insert all transactions of the given `block` of `height`.
///
/// Each inserted transaction's anchor will be constructed from
/// [`AnchorFromBlockPosition::from_block_position`].
///
/// To only insert relevant transactions, use [`apply_block_relevant`] instead.
///
/// [`apply_block_relevant`]: IndexedTxGraph::apply_block_relevant
pub fn apply_block(&mut self, block: Block, height: u32) -> ChangeSet<A, I::ChangeSet> {
let block_id = BlockId {
hash: block.block_hash(),
height,
};
let txs = block.txdata.iter().enumerate().map(|(tx_pos, tx)| {
(
tx,
core::iter::once(A::from_block_position(&block, block_id, tx_pos)),
None,
)
});
self.batch_insert(txs)
}
}

/// A tuple of a transaction, and associated metadata, that are to be inserted into [`IndexedTxGraph`].
///
/// This tuple contains fields in the following order:
/// * A reference to the transaction.
/// * A collection of [`Anchor`]s.
/// * An optional last-seen timestamp.
///
/// This is used as a input item of [`batch_insert_relevant`] and [`batch_insert`].
///
/// [`batch_insert_relevant`]: IndexedTxGraph::batch_insert_relevant
/// [`batch_insert`]: IndexedTxGraph::batch_insert
pub type InsertTxItem<'t, A> = (&'t Transaction, A, Option<u64>);

/// A structure that represents changes to an [`IndexedTxGraph`].
#[derive(Clone, Debug, PartialEq)]
#[cfg_attr(
Expand Down
13 changes: 6 additions & 7 deletions crates/chain/tests/test_indexed_tx_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ fn insert_relevant_txs() {
};

assert_eq!(
graph.insert_relevant_txs(txs.iter().map(|tx| (tx, None)), None),
graph.batch_insert_relevant(txs.iter().map(|tx| (tx, None, None))),
changeset,
);

Expand Down Expand Up @@ -211,8 +211,8 @@ fn test_list_owned_txouts() {
// Insert transactions into graph with respective anchors
// For unconfirmed txs we pass in `None`.

let _ = graph.insert_relevant_txs(
[&tx1, &tx2, &tx3, &tx6].iter().enumerate().map(|(i, tx)| {
let _ =
graph.batch_insert_relevant([&tx1, &tx2, &tx3, &tx6].iter().enumerate().map(|(i, tx)| {
let height = i as u32;
(
*tx,
Expand All @@ -225,12 +225,11 @@ fn test_list_owned_txouts() {
anchor_block,
confirmation_height: anchor_block.height,
}),
None,
)
}),
None,
);
}));

let _ = graph.insert_relevant_txs([&tx4, &tx5].iter().map(|tx| (*tx, None)), Some(100));
let _ = graph.batch_insert_relevant([&tx4, &tx5].iter().map(|tx| (*tx, None, Some(100))));

// A helper lambda to extract and filter data from the graph.
let fetch =
Expand Down

0 comments on commit 240657b

Please sign in to comment.