diff --git a/crates/bdk/src/wallet/mod.rs b/crates/bdk/src/wallet/mod.rs index 9ee72b4b6..090a9ca60 100644 --- a/crates/bdk/src/wallet/mod.rs +++ b/crates/bdk/src/wallet/mod.rs @@ -509,7 +509,7 @@ impl Wallet { where D: PersistBackend, { - let additions = self.indexed_graph.insert_txout(outpoint, &txout); + let additions = self.indexed_graph.insert_txout(outpoint, txout); self.persist.stage(ChangeSet::from(additions)); } diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index 7a4b7e4d1..601fb5616 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -3,7 +3,6 @@ use std::collections::{BTreeMap, BTreeSet}; use bdk_bitcoind_rpc::Emitter; use bdk_chain::{ bitcoin::{Address, Amount, BlockHash, Txid}, - indexed_tx_graph::InsertTxItem, keychain::Balance, local_chain::{self, CheckPoint, LocalChain}, Append, BlockId, IndexedTxGraph, SpkTxOutIndex, @@ -180,17 +179,6 @@ fn block_to_chain_update(block: &bitcoin::Block, height: u32) -> local_chain::Up } } -fn block_to_tx_graph_update( - block: &bitcoin::Block, - height: u32, -) -> impl Iterator>> { - let anchor = BlockId { - hash: block.block_hash(), - height, - }; - block.txdata.iter().map(move |tx| (tx, Some(anchor), None)) -} - /// Ensure that blocks are emitted in order even after reorg. /// /// 1. Mine 101 blocks. @@ -321,8 +309,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> { while let Some((height, block)) = emitter.next_block()? { let _ = chain.apply_update(block_to_chain_update(&block, height))?; - let indexed_additions = - indexed_tx_graph.batch_insert_relevant(block_to_tx_graph_update(&block, height)); + let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height); assert!(indexed_additions.is_empty()); } @@ -350,8 +337,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> { assert!(emitter.next_block()?.is_none()); let mempool_txs = emitter.mempool()?; - let indexed_additions = indexed_tx_graph - .batch_insert_unconfirmed(mempool_txs.iter().map(|(tx, time)| (tx, Some(*time)))); + let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs); assert_eq!( indexed_additions .graph @@ -383,8 +369,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> { { let (height, block) = emitter.next_block()?.expect("must get mined block"); let _ = chain.apply_update(block_to_chain_update(&block, height))?; - let indexed_additions = - indexed_tx_graph.batch_insert_relevant(block_to_tx_graph_update(&block, height)); + let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height); assert!(indexed_additions.graph.txs.is_empty()); assert!(indexed_additions.graph.txouts.is_empty()); assert_eq!(indexed_additions.graph.anchors, exp_anchors); diff --git a/crates/chain/src/indexed_tx_graph.rs b/crates/chain/src/indexed_tx_graph.rs index 065e0892a..e65b6868a 100644 --- a/crates/chain/src/indexed_tx_graph.rs +++ b/crates/chain/src/indexed_tx_graph.rs @@ -72,32 +72,34 @@ impl IndexedTxGraph where I::ChangeSet: Default + Append, { + fn index_tx_graph_changeset( + &mut self, + tx_graph_changeset: &tx_graph::ChangeSet, + ) -> I::ChangeSet { + let mut changeset = I::ChangeSet::default(); + for added_tx in &tx_graph_changeset.txs { + changeset.append(self.index.index_tx(added_tx)); + } + for (&added_outpoint, added_txout) in &tx_graph_changeset.txouts { + changeset.append(self.index.index_txout(added_outpoint, added_txout)); + } + changeset + } + /// Apply an `update` directly. /// /// `update` is a [`TxGraph`] and the resultant changes is returned as [`ChangeSet`]. pub fn apply_update(&mut self, update: TxGraph) -> ChangeSet { let graph = self.graph.apply_update(update); - - let mut indexer = I::ChangeSet::default(); - for added_tx in &graph.txs { - indexer.append(self.index.index_tx(added_tx)); - } - for (&added_outpoint, added_txout) in &graph.txouts { - indexer.append(self.index.index_txout(added_outpoint, added_txout)); - } - + let indexer = self.index_tx_graph_changeset(&graph); ChangeSet { graph, indexer } } /// Insert a floating `txout` of given `outpoint`. - pub fn insert_txout( - &mut self, - outpoint: OutPoint, - txout: &TxOut, - ) -> ChangeSet { - let mut update = TxGraph::::default(); - let _ = update.insert_txout(outpoint, txout.clone()); - self.apply_update(update) + pub fn insert_txout(&mut self, outpoint: OutPoint, txout: TxOut) -> ChangeSet { + let graph = self.graph.insert_txout(outpoint, txout); + let indexer = self.index_tx_graph_changeset(&graph); + ChangeSet { graph, indexer } } /// Insert and index a transaction into the graph. @@ -112,18 +114,19 @@ where ) -> ChangeSet { let txid = tx.txid(); - let mut update = TxGraph::::default(); + let mut graph = tx_graph::ChangeSet::default(); if self.graph.get_tx(txid).is_none() { - let _ = update.insert_tx(tx.clone()); + graph.append(self.graph.insert_tx(tx.clone())); } for anchor in anchors.into_iter() { - let _ = update.insert_anchor(txid, anchor); + graph.append(self.graph.insert_anchor(txid, anchor)); } if let Some(seen_at) = seen_at { - let _ = update.insert_seen_at(txid, seen_at); + graph.append(self.graph.insert_seen_at(txid, seen_at)); } - self.apply_update(update) + let indexer = self.index_tx_graph_changeset(&graph); + ChangeSet { graph, indexer } } /// Batch insert transactions, filtering out those that are irrelevant. @@ -132,7 +135,7 @@ where /// transactions in `txs` will be ignored. `txs` do not need to be in topological order. pub fn batch_insert_relevant<'t>( &mut self, - txs: impl IntoIterator>>, + txs: impl IntoIterator)>, ) -> ChangeSet { // The algorithm below allows for non-topologically ordered transactions by using two loops. // This is achieved by: @@ -140,38 +143,25 @@ where // not store anything about them. // 2. decide whether to insert them into the graph depending on whether `is_tx_relevant` // returns true or not. (in a second loop). - let mut changeset = ChangeSet::::default(); + let txs = txs.into_iter().collect::>(); - let txs = txs - .into_iter() - .inspect(|(tx, _, _)| changeset.indexer.append(self.index.index_tx(tx))) - .collect::>(); + let mut indexer = I::ChangeSet::default(); + for (tx, _) in &txs { + indexer.append(self.index.index_tx(tx)); + } - for (tx, anchors, seen_at) in txs { + let mut graph = tx_graph::ChangeSet::default(); + for (tx, anchors) in txs { if self.index.is_tx_relevant(tx) { - changeset.append(self.insert_tx(tx, anchors, seen_at)); + let txid = tx.txid(); + graph.append(self.graph.insert_tx(tx.clone())); + for anchor in anchors { + graph.append(self.graph.insert_anchor(txid, anchor)); + } } } - changeset - } - - /// Batch insert transactions. - /// - /// All transactions in `txs` will be inserted. To filter out irrelevant transactions, use - /// [`batch_insert_relevant`] instead. - /// - /// [`batch_insert_relevant`]: IndexedTxGraph::batch_insert_relevant - pub fn batch_insert<'t>( - &mut self, - txs: impl IntoIterator>>, - ) -> ChangeSet { - let mut changeset = ChangeSet::::default(); - for (tx, anchors, seen_at) in txs { - changeset.indexer.append(self.index.index_tx(tx)); - changeset.append(self.insert_tx(tx, anchors, seen_at)); - } - changeset + ChangeSet { graph, indexer } } /// Batch insert unconfirmed transactions, filtering out those that are irrelevant. @@ -179,38 +169,51 @@ where /// Relevancy is determined by the internal [`Indexer::is_tx_relevant`] implementation of `I`. /// Irrelevant tansactions in `txs` will be ignored. /// - /// Items of `txs` are tuples containing the transaction and an optional *last seen* timestamp. - /// The *last seen* communicates when the transaction is last seen in the mempool which is used - /// for conflict-resolution in [`TxGraph`] (refer to [`TxGraph::insert_seen_at`] for details). + /// Items of `txs` are tuples containing the transaction and a *last seen* timestamp. The + /// *last seen* communicates when the transaction is last seen in the mempool which is used for + /// conflict-resolution in [`TxGraph`] (refer to [`TxGraph::insert_seen_at`] for details). pub fn batch_insert_relevant_unconfirmed<'t>( &mut self, - unconfirmed_txs: impl IntoIterator)>, + unconfirmed_txs: impl IntoIterator, ) -> ChangeSet { - self.batch_insert_relevant( - unconfirmed_txs - .into_iter() - .map(|(tx, last_seen)| (tx, core::iter::empty(), last_seen)), - ) + // The algorithm below allows for non-topologically ordered transactions by using two loops. + // This is achieved by: + // 1. insert all txs into the index. If they are irrelevant then that's fine it will just + // not store anything about them. + // 2. decide whether to insert them into the graph depending on whether `is_tx_relevant` + // returns true or not. (in a second loop). + let txs = unconfirmed_txs.into_iter().collect::>(); + + let mut indexer = I::ChangeSet::default(); + for (tx, _) in &txs { + indexer.append(self.index.index_tx(tx)); + } + + let graph = self.graph.batch_insert_unconfirmed( + txs.into_iter() + .filter(|(tx, _)| self.index.is_tx_relevant(tx)) + .map(|(tx, seen_at)| (tx.clone(), seen_at)), + ); + + ChangeSet { graph, indexer } } /// Batch insert unconfirmed transactions. /// - /// Items of `txs` are tuples containing the transaction and an optional *last seen* timestamp. - /// The *last seen* communicates when the transaction is last seen in the mempool which is used - /// for conflict-resolution in [`TxGraph`] (refer to [`TxGraph::insert_seen_at`] for details). + /// Items of `txs` are tuples containing the transaction and a *last seen* timestamp. The + /// *last seen* communicates when the transaction is last seen in the mempool which is used for + /// conflict-resolution in [`TxGraph`] (refer to [`TxGraph::insert_seen_at`] for details). /// /// To filter out irrelevant transactions, use [`batch_insert_relevant_unconfirmed`] instead. /// /// [`batch_insert_relevant_unconfirmed`]: IndexedTxGraph::batch_insert_relevant_unconfirmed - pub fn batch_insert_unconfirmed<'t>( + pub fn batch_insert_unconfirmed( &mut self, - unconfirmed_txs: impl IntoIterator)>, + txs: impl IntoIterator, ) -> ChangeSet { - self.batch_insert( - unconfirmed_txs - .into_iter() - .map(|(tx, last_seen)| (tx, core::iter::empty(), last_seen)), - ) + let graph = self.graph.batch_insert_unconfirmed(txs); + let indexer = self.index_tx_graph_changeset(&graph); + ChangeSet { graph, indexer } } } @@ -241,7 +244,6 @@ where ( tx, core::iter::once(A::from_block_position(&block, block_id, tx_pos)), - None, ) }); self.batch_insert_relevant(txs) @@ -260,30 +262,17 @@ where hash: block.block_hash(), height, }; - let txs = block.txdata.iter().enumerate().map(|(tx_pos, tx)| { - ( - tx, - core::iter::once(A::from_block_position(&block, block_id, tx_pos)), - None, - ) - }); - self.batch_insert(txs) + let mut graph = tx_graph::ChangeSet::default(); + for (tx_pos, tx) in block.txdata.iter().enumerate() { + let anchor = A::from_block_position(&block, block_id, tx_pos); + graph.append(self.graph.insert_anchor(tx.txid(), anchor)); + graph.append(self.graph.insert_tx(tx.clone())); + } + let indexer = self.index_tx_graph_changeset(&graph); + ChangeSet { graph, indexer } } } -/// A tuple of a transaction, and associated metadata, that are to be inserted into [`IndexedTxGraph`]. -/// -/// This tuple contains fields in the following order: -/// * A reference to the transaction. -/// * A collection of [`Anchor`]s. -/// * An optional last-seen timestamp. -/// -/// This is used as a input item of [`batch_insert_relevant`] and [`batch_insert`]. -/// -/// [`batch_insert_relevant`]: IndexedTxGraph::batch_insert_relevant -/// [`batch_insert`]: IndexedTxGraph::batch_insert -pub type InsertTxItem<'t, A> = (&'t Transaction, A, Option); - /// A structure that represents changes to an [`IndexedTxGraph`]. #[derive(Clone, Debug, PartialEq)] #[cfg_attr( diff --git a/crates/chain/src/tx_graph.rs b/crates/chain/src/tx_graph.rs index edc1e4966..698af76c4 100644 --- a/crates/chain/src/tx_graph.rs +++ b/crates/chain/src/tx_graph.rs @@ -451,6 +451,23 @@ impl TxGraph { self.apply_update(update) } + /// Batch insert unconfirmed transactions. + /// + /// Items of `txs` are tuples containing the transaction and a *last seen* timestamp. The + /// *last seen* communicates when the transaction is last seen in the mempool which is used for + /// conflict-resolution (refer to [`TxGraph::insert_seen_at`] for details). + pub fn batch_insert_unconfirmed( + &mut self, + txs: impl IntoIterator, + ) -> ChangeSet { + let mut changeset = ChangeSet::::default(); + for (tx, seen_at) in txs { + changeset.append(self.insert_seen_at(tx.txid(), seen_at)); + changeset.append(self.insert_tx(tx)); + } + changeset + } + /// Inserts the given `anchor` into [`TxGraph`]. /// /// The [`ChangeSet`] returned will be empty if graph already knows that `txid` exists in diff --git a/crates/chain/tests/test_indexed_tx_graph.rs b/crates/chain/tests/test_indexed_tx_graph.rs index 5f95e111d..3dc22ef5b 100644 --- a/crates/chain/tests/test_indexed_tx_graph.rs +++ b/crates/chain/tests/test_indexed_tx_graph.rs @@ -74,7 +74,7 @@ fn insert_relevant_txs() { }; assert_eq!( - graph.batch_insert_relevant(txs.iter().map(|tx| (tx, None, None))), + graph.batch_insert_relevant(txs.iter().map(|tx| (tx, None))), changeset, ); @@ -225,11 +225,10 @@ fn test_list_owned_txouts() { anchor_block, confirmation_height: anchor_block.height, }), - None, ) })); - let _ = graph.batch_insert_relevant([&tx4, &tx5].iter().map(|tx| (*tx, None, Some(100)))); + let _ = graph.batch_insert_relevant_unconfirmed([&tx4, &tx5].iter().map(|tx| (*tx, 100))); // A helper lambda to extract and filter data from the graph. let fetch = diff --git a/example-crates/example_bitcoind_rpc_polling/src/main.rs b/example-crates/example_bitcoind_rpc_polling/src/main.rs index 6fb557f7a..c9bcc9728 100644 --- a/example-crates/example_bitcoind_rpc_polling/src/main.rs +++ b/example-crates/example_bitcoind_rpc_polling/src/main.rs @@ -212,8 +212,7 @@ fn main() -> anyhow::Result<()> { // mempool let mempool_txs = emitter.mempool()?; - let graph_changeset = graph - .batch_insert_unconfirmed(mempool_txs.iter().map(|(tx, time)| (tx, Some(*time)))); + let graph_changeset = graph.batch_insert_unconfirmed(mempool_txs); db.stage((local_chain::ChangeSet::default(), graph_changeset)); // commit one last time! @@ -291,7 +290,7 @@ fn main() -> anyhow::Result<()> { } Emission::Mempool(mempool_txs) => { let graph_changeset = graph.batch_insert_relevant_unconfirmed( - mempool_txs.iter().map(|(tx, time)| (tx, Some(*time))), + mempool_txs.iter().map(|(tx, time)| (tx, *time)), ); (local_chain::ChangeSet::default(), graph_changeset) }