diff --git a/zebra-chain/src/tests/vectors.rs b/zebra-chain/src/tests/vectors.rs index deb2a507707..ef9d00f6d99 100644 --- a/zebra-chain/src/tests/vectors.rs +++ b/zebra-chain/src/tests/vectors.rs @@ -1,9 +1,15 @@ //! Network methods for fetching blockchain vectors. //! -use std::collections::BTreeMap; +use std::{collections::BTreeMap, ops::RangeBounds}; -use crate::{block::Block, parameters::Network, serialization::ZcashDeserializeInto}; +use crate::{ + amount::Amount, + block::Block, + parameters::Network, + serialization::ZcashDeserializeInto, + transaction::{UnminedTx, VerifiedUnminedTx}, +}; use zebra_test::vectors::{ BLOCK_MAINNET_1046400_BYTES, BLOCK_MAINNET_653599_BYTES, BLOCK_MAINNET_982681_BYTES, @@ -30,6 +36,39 @@ impl Network { } } + /// Returns iterator over verified unmined transactions in the provided block height range. + pub fn unmined_transactions_in_blocks( + &self, + block_height_range: impl RangeBounds, + ) -> impl DoubleEndedIterator { + let blocks = self.block_iter(); + + // Deserialize the blocks that are selected based on the specified `block_height_range`. + let selected_blocks = blocks + .filter(move |(&height, _)| block_height_range.contains(&height)) + .map(|(_, block)| { + block + .zcash_deserialize_into::() + .expect("block test vector is structurally valid") + }); + + // Extract the transactions from the blocks and wrap each one as an unmined transaction. + // Use a fake zero miner fee and sigops, because we don't have the UTXOs to calculate + // the correct fee. + selected_blocks + .flat_map(|block| block.transactions) + .map(UnminedTx::from) + // Skip transactions that fail ZIP-317 mempool checks + .filter_map(|transaction| { + VerifiedUnminedTx::new( + transaction, + Amount::try_from(1_000_000).expect("invalid value"), + 0, + ) + .ok() + }) + } + /// Returns blocks indexed by height in a [`BTreeMap`]. /// /// Returns Mainnet blocks if `self` is set to Mainnet, and Testnet blocks otherwise. diff --git a/zebra-consensus/src/block/tests.rs b/zebra-consensus/src/block/tests.rs index e6eb6f2c4b9..eea5f40015e 100644 --- a/zebra-consensus/src/block/tests.rs +++ b/zebra-consensus/src/block/tests.rs @@ -137,7 +137,7 @@ async fn check_transcripts() -> Result<(), Report> { let network = Network::Mainnet; let state_service = zebra_state::init_test(&network); - let transaction = transaction::Verifier::new(&network, state_service.clone()); + let transaction = transaction::Verifier::new_for_tests(&network, state_service.clone()); let transaction = Buffer::new(BoxService::new(transaction), 1); let block_verifier = Buffer::new( SemanticBlockVerifier::new(&network, state_service.clone(), transaction), diff --git a/zebra-consensus/src/router.rs b/zebra-consensus/src/router.rs index ba42896e56f..38819d0b245 100644 --- a/zebra-consensus/src/router.rs +++ b/zebra-consensus/src/router.rs @@ -21,7 +21,7 @@ use std::{ use futures::{FutureExt, TryFutureExt}; use thiserror::Error; -use tokio::task::JoinHandle; +use tokio::{sync::oneshot, task::JoinHandle}; use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt}; use tracing::{instrument, Instrument, Span}; @@ -30,6 +30,7 @@ use zebra_chain::{ parameters::Network, }; +use zebra_node_services::mempool; use zebra_state as zs; use crate::{ @@ -230,11 +231,12 @@ where /// Block and transaction verification requests should be wrapped in a timeout, /// so that out-of-order and invalid requests do not hang indefinitely. /// See the [`router`](`crate::router`) module documentation for details. -#[instrument(skip(state_service))] -pub async fn init( +#[instrument(skip(state_service, mempool))] +pub async fn init( config: Config, network: &Network, mut state_service: S, + mempool: oneshot::Receiver, ) -> ( Buffer, Request>, Buffer< @@ -247,6 +249,11 @@ pub async fn init( where S: Service + Send + Clone + 'static, S::Future: Send + 'static, + Mempool: Service + + Send + + Clone + + 'static, + Mempool::Future: Send + 'static, { // Give other tasks priority before spawning the checkpoint task. tokio::task::yield_now().await; @@ -333,7 +340,7 @@ where // transaction verification - let transaction = transaction::Verifier::new(network, state_service.clone()); + let transaction = transaction::Verifier::new(network, state_service.clone(), mempool); let transaction = Buffer::new(BoxService::new(transaction), VERIFIER_BUFFER_BOUND); // block verification @@ -397,3 +404,36 @@ pub struct BackgroundTaskHandles { /// Finishes when all the checkpoints are verified, or when the state tip is reached. pub state_checkpoint_verify_handle: JoinHandle<()>, } + +/// Calls [`init`] with a closed mempool setup channel for conciseness in tests. +/// +/// See [`init`] for more details. +#[cfg(any(test, feature = "proptest-impl"))] +pub async fn init_test( + config: Config, + network: &Network, + state_service: S, +) -> ( + Buffer, Request>, + Buffer< + BoxService, + transaction::Request, + >, + BackgroundTaskHandles, + Height, +) +where + S: Service + Send + Clone + 'static, + S::Future: Send + 'static, +{ + init( + config.clone(), + network, + state_service.clone(), + oneshot::channel::< + Buffer, mempool::Request>, + >() + .1, + ) + .await +} diff --git a/zebra-consensus/src/router/tests.rs b/zebra-consensus/src/router/tests.rs index 8fe304e3364..063cc7394cf 100644 --- a/zebra-consensus/src/router/tests.rs +++ b/zebra-consensus/src/router/tests.rs @@ -68,7 +68,7 @@ async fn verifiers_from_network( _transaction_verifier, _groth16_download_handle, _max_checkpoint_height, - ) = crate::router::init(Config::default(), &network, state_service.clone()).await; + ) = crate::router::init_test(Config::default(), &network, state_service.clone()).await; // We can drop the download task handle here, because: // - if the download task fails, the tests will panic, and @@ -169,7 +169,7 @@ async fn verify_checkpoint(config: Config) -> Result<(), Report> { _transaction_verifier, _groth16_download_handle, _max_checkpoint_height, - ) = super::init(config.clone(), &network, zs::init_test(&network)).await; + ) = super::init_test(config.clone(), &network, zs::init_test(&network)).await; // Add a timeout layer let block_verifier_router = diff --git a/zebra-consensus/src/transaction.rs b/zebra-consensus/src/transaction.rs index 1c303003615..aac77a055d6 100644 --- a/zebra-consensus/src/transaction.rs +++ b/zebra-consensus/src/transaction.rs @@ -6,6 +6,7 @@ use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, + time::Duration, }; use chrono::{DateTime, Utc}; @@ -13,7 +14,13 @@ use futures::{ stream::{FuturesUnordered, StreamExt}, FutureExt, }; -use tower::{timeout::Timeout, Service, ServiceExt}; +use tokio::sync::oneshot; +use tower::{ + buffer::Buffer, + timeout::{error::Elapsed, Timeout}, + util::BoxService, + Service, ServiceExt, +}; use tracing::Instrument; use zebra_chain::{ @@ -26,9 +33,10 @@ use zebra_chain::{ transaction::{ self, HashType, SigHash, Transaction, UnminedTx, UnminedTxId, VerifiedUnminedTx, }, - transparent::{self, OrderedUtxo}, + transparent, }; +use zebra_node_services::mempool; use zebra_script::CachedFfiTransaction; use zebra_state as zs; @@ -52,6 +60,23 @@ mod tests; /// chain in the correct order.) const UTXO_LOOKUP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(6 * 60); +/// A timeout applied to output lookup requests sent to the mempool. This is shorter than the +/// timeout for the state UTXO lookups because a block is likely to be mined every 75 seconds +/// after Blossom is active, changing the best chain tip and requiring re-verification of transactions +/// in the mempool. +/// +/// This is how long Zebra will wait for an output to be added to the mempool before verification +/// of the transaction that spends it will fail. +const MEMPOOL_OUTPUT_LOOKUP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60); + +/// How long to wait after responding to a mempool request with a transaction that creates new +/// transparent outputs before polling the mempool service so that it will try adding the verified +/// transaction and responding to any potential `AwaitOutput` requests. +/// +/// This should be long enough for the mempool service's `Downloads` to finish processing the +/// response from the transaction verifier. +const POLL_MEMPOOL_DELAY: std::time::Duration = Duration::from_millis(50); + /// Asynchronous transaction verification. /// /// # Correctness @@ -59,24 +84,55 @@ const UTXO_LOOKUP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs( /// Transaction verification requests should be wrapped in a timeout, so that /// out-of-order and invalid requests do not hang indefinitely. See the [`router`](`crate::router`) /// module documentation for details. -#[derive(Debug, Clone)] -pub struct Verifier { +pub struct Verifier { network: Network, state: Timeout, + // TODO: Use an enum so that this can either be Pending(oneshot::Receiver) or Initialized(MempoolService) + mempool: Option>, script_verifier: script::Verifier, + mempool_setup_rx: oneshot::Receiver, } -impl Verifier +impl Verifier where ZS: Service + Send + Clone + 'static, ZS::Future: Send + 'static, + Mempool: Service + + Send + + Clone + + 'static, + Mempool::Future: Send + 'static, { /// Create a new transaction verifier. - pub fn new(network: &Network, state: ZS) -> Self { + pub fn new(network: &Network, state: ZS, mempool_setup_rx: oneshot::Receiver) -> Self { Self { network: network.clone(), state: Timeout::new(state, UTXO_LOOKUP_TIMEOUT), + mempool: None, script_verifier: script::Verifier, + mempool_setup_rx, + } + } +} + +impl + Verifier< + ZS, + Buffer, mempool::Request>, + > +where + ZS: Service + Send + Clone + 'static, + ZS::Future: Send + 'static, +{ + /// Create a new transaction verifier with a closed channel receiver for mempool setup for tests. + #[cfg(test)] + pub fn new_for_tests(network: &Network, state: ZS) -> Self { + Self { + network: network.clone(), + state: Timeout::new(state, UTXO_LOOKUP_TIMEOUT), + mempool: None, + script_verifier: script::Verifier, + mempool_setup_rx: oneshot::channel().1, } } } @@ -156,12 +212,24 @@ pub enum Response { /// [`Response::Mempool`] responses are uniquely identified by the /// [`UnminedTxId`] variant for their transaction version. transaction: VerifiedUnminedTx, + + /// A list of spent [`transparent::OutPoint`]s that were found in + /// the mempool's list of `created_outputs`. + /// + /// Used by the mempool to determine dependencies between transactions + /// in the mempool and to avoid adding transactions with missing spends + /// to its verified set. + spent_mempool_outpoints: Vec, }, } +#[cfg(any(test, feature = "proptest-impl"))] impl From for Response { fn from(transaction: VerifiedUnminedTx) -> Self { - Response::Mempool { transaction } + Response::Mempool { + transaction, + spent_mempool_outpoints: Vec::new(), + } } } @@ -228,14 +296,6 @@ impl Request { } impl Response { - /// The verified mempool transaction, if this is a mempool response. - pub fn into_mempool_transaction(self) -> Option { - match self { - Response::Block { .. } => None, - Response::Mempool { transaction, .. } => Some(transaction), - } - } - /// The unmined transaction ID for the transaction in this response. pub fn tx_id(&self) -> UnminedTxId { match self { @@ -276,10 +336,15 @@ impl Response { } } -impl Service for Verifier +impl Service for Verifier where ZS: Service + Send + Clone + 'static, ZS::Future: Send + 'static, + Mempool: Service + + Send + + Clone + + 'static, + Mempool::Future: Send + 'static, { type Response = Response; type Error = TransactionError; @@ -287,6 +352,14 @@ where Pin> + Send + 'static>>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + // Note: The block verifier expects the transaction verifier to always be ready. + + if self.mempool.is_none() { + if let Ok(mempool) = self.mempool_setup_rx.try_recv() { + self.mempool = Some(Timeout::new(mempool, MEMPOOL_OUTPUT_LOOKUP_TIMEOUT)); + } + } + Poll::Ready(Ok(())) } @@ -295,6 +368,7 @@ where let script_verifier = self.script_verifier; let network = self.network.clone(); let state = self.state.clone(); + let mempool = self.mempool.clone(); let tx = req.transaction(); let tx_id = req.tx_id(); @@ -370,8 +444,8 @@ where // Load spent UTXOs from state. // The UTXOs are required for almost all the async checks. let load_spent_utxos_fut = - Self::spent_utxos(tx.clone(), req.known_utxos(), req.is_mempool(), state.clone()); - let (spent_utxos, spent_outputs) = load_spent_utxos_fut.await?; + Self::spent_utxos(tx.clone(), req.clone(), state.clone(), mempool.clone(),); + let (spent_utxos, spent_outputs, spent_mempool_outpoints) = load_spent_utxos_fut.await?; // WONTFIX: Return an error for Request::Block as well to replace this check in // the state once #2336 has been implemented? @@ -473,7 +547,22 @@ where ), legacy_sigop_count, )?; - Response::Mempool { transaction } + + if let Some(mut mempool) = mempool { + if !transaction.transaction.transaction.outputs().is_empty() { + tokio::spawn(async move { + tokio::time::sleep(POLL_MEMPOOL_DELAY).await; + let _ = mempool + .ready() + .await + .expect("mempool poll_ready() method should not return an error") + .call(mempool::Request::CheckForVerifiedTransactions) + .await; + }); + } + } + + Response::Mempool { transaction, spent_mempool_outpoints } }, }; @@ -488,10 +577,15 @@ where } } -impl Verifier +impl Verifier where ZS: Service + Send + Clone + 'static, ZS::Future: Send + 'static, + Mempool: Service + + Send + + Clone + + 'static, + Mempool::Future: Send + 'static, { /// Fetches the median-time-past of the *next* block after the best state tip. /// @@ -514,33 +608,42 @@ where } } - /// Wait for the UTXOs that are being spent by the given transaction. + /// Waits for the UTXOs that are being spent by the given transaction to arrive in + /// the state for [`Block`](Request::Block) requests. /// - /// `known_utxos` are additional UTXOs known at the time of validation (i.e. - /// from previous transactions in the block). + /// Looks up UTXOs that are being spent by the given transaction in the state or waits + /// for them to be added to the mempool for [`Mempool`](Request::Mempool) requests. /// - /// Returns a tuple with a OutPoint -> Utxo map, and a vector of Outputs - /// in the same order as the matching inputs in the transaction. + /// Returns a triple containing: + /// - `OutPoint` -> `Utxo` map, + /// - vec of `Output`s in the same order as the matching inputs in the `tx`, + /// - vec of `Outpoint`s spent by a mempool `tx` that were not found in the best chain's utxo set. async fn spent_utxos( tx: Arc, - known_utxos: Arc>, - is_mempool: bool, + req: Request, state: Timeout, + mempool: Option>, ) -> Result< ( HashMap, Vec, + Vec, ), TransactionError, > { + let is_mempool = req.is_mempool(); + // Additional UTXOs known at the time of validation, + // i.e., from previous transactions in the block. + let known_utxos = req.known_utxos(); + let inputs = tx.inputs(); let mut spent_utxos = HashMap::new(); let mut spent_outputs = Vec::new(); + let mut spent_mempool_outpoints = Vec::new(); + for input in inputs { if let transparent::Input::PrevOut { outpoint, .. } = input { tracing::trace!("awaiting outpoint lookup"); - // Currently, Zebra only supports known UTXOs in block transactions. - // But it might support them in the mempool in future. let utxo = if let Some(output) = known_utxos.get(outpoint) { tracing::trace!("UXTO in known_utxos, discarding query"); output.utxo.clone() @@ -548,11 +651,17 @@ where let query = state .clone() .oneshot(zs::Request::UnspentBestChainUtxo(*outpoint)); - if let zebra_state::Response::UnspentBestChainUtxo(utxo) = query.await? { - utxo.ok_or(TransactionError::TransparentInputNotFound)? - } else { + + let zebra_state::Response::UnspentBestChainUtxo(utxo) = query.await? else { unreachable!("UnspentBestChainUtxo always responds with Option") - } + }; + + let Some(utxo) = utxo else { + spent_mempool_outpoints.push(*outpoint); + continue; + }; + + utxo } else { let query = state .clone() @@ -570,7 +679,41 @@ where continue; } } - Ok((spent_utxos, spent_outputs)) + + if let Some(mempool) = mempool { + for &spent_mempool_outpoint in &spent_mempool_outpoints { + let query = mempool + .clone() + .oneshot(mempool::Request::AwaitOutput(spent_mempool_outpoint)); + + let output = match query.await { + Ok(mempool::Response::UnspentOutput(output)) => output, + Ok(_) => unreachable!("UnspentOutput always responds with UnspentOutput"), + Err(err) => { + return match err.downcast::() { + Ok(_) => Err(TransactionError::TransparentInputNotFound), + Err(err) => Err(err.into()), + }; + } + }; + + spent_outputs.push(output.clone()); + spent_utxos.insert( + spent_mempool_outpoint, + // Assume the Utxo height will be next height after the best chain tip height + // + // # Correctness + // + // If the tip height changes while an umined transaction is being verified, + // the transaction must be re-verified before being added to the mempool. + transparent::Utxo::new(output, req.height(), false), + ); + } + } else if !spent_mempool_outpoints.is_empty() { + return Err(TransactionError::TransparentInputNotFound); + } + + Ok((spent_utxos, spent_outputs, spent_mempool_outpoints)) } /// Accepts `request`, a transaction verifier [`&Request`](Request), diff --git a/zebra-consensus/src/transaction/tests.rs b/zebra-consensus/src/transaction/tests.rs index 0a4c21bb039..d42bbb8594c 100644 --- a/zebra-consensus/src/transaction/tests.rs +++ b/zebra-consensus/src/transaction/tests.rs @@ -7,7 +7,7 @@ use std::{collections::HashMap, sync::Arc}; use chrono::{DateTime, TimeZone, Utc}; use color_eyre::eyre::Report; use halo2::pasta::{group::ff::PrimeField, pallas}; -use tower::{service_fn, ServiceExt}; +use tower::{buffer::Buffer, service_fn, ServiceExt}; use zebra_chain::{ amount::{Amount, NonNegative}, @@ -28,10 +28,11 @@ use zebra_chain::{ transparent::{self, CoinbaseData}, }; +use zebra_node_services::mempool; use zebra_state::ValidateContextError; use zebra_test::mock_service::MockService; -use crate::error::TransactionError; +use crate::{error::TransactionError, transaction::POLL_MEMPOOL_DELAY}; use super::{check, Request, Verifier}; @@ -181,7 +182,7 @@ fn v5_transaction_with_no_inputs_fails_validation() { #[tokio::test] async fn mempool_request_with_missing_input_is_rejected() { let mut state: MockService<_, _, _, _> = MockService::build().for_prop_tests(); - let verifier = Verifier::new(&Network::Mainnet, state.clone()); + let verifier = Verifier::new_for_tests(&Network::Mainnet, state.clone()); let (height, tx) = transactions_from_blocks(zebra_test::vectors::MAINNET_BLOCKS.iter()) .find(|(_, tx)| !(tx.is_coinbase() || tx.inputs().is_empty())) @@ -230,7 +231,7 @@ async fn mempool_request_with_missing_input_is_rejected() { #[tokio::test] async fn mempool_request_with_present_input_is_accepted() { let mut state: MockService<_, _, _, _> = MockService::build().for_prop_tests(); - let verifier = Verifier::new(&Network::Mainnet, state.clone()); + let verifier = Verifier::new_for_tests(&Network::Mainnet, state.clone()); let height = NetworkUpgrade::Canopy .activation_height(&Network::Mainnet) @@ -297,7 +298,7 @@ async fn mempool_request_with_present_input_is_accepted() { #[tokio::test] async fn mempool_request_with_invalid_lock_time_is_rejected() { let mut state: MockService<_, _, _, _> = MockService::build().for_prop_tests(); - let verifier = Verifier::new(&Network::Mainnet, state.clone()); + let verifier = Verifier::new_for_tests(&Network::Mainnet, state.clone()); let height = NetworkUpgrade::Canopy .activation_height(&Network::Mainnet) @@ -376,7 +377,7 @@ async fn mempool_request_with_invalid_lock_time_is_rejected() { #[tokio::test] async fn mempool_request_with_unlocked_lock_time_is_accepted() { let mut state: MockService<_, _, _, _> = MockService::build().for_prop_tests(); - let verifier = Verifier::new(&Network::Mainnet, state.clone()); + let verifier = Verifier::new_for_tests(&Network::Mainnet, state.clone()); let height = NetworkUpgrade::Canopy .activation_height(&Network::Mainnet) @@ -443,7 +444,7 @@ async fn mempool_request_with_unlocked_lock_time_is_accepted() { #[tokio::test] async fn mempool_request_with_lock_time_max_sequence_number_is_accepted() { let mut state: MockService<_, _, _, _> = MockService::build().for_prop_tests(); - let verifier = Verifier::new(&Network::Mainnet, state.clone()); + let verifier = Verifier::new_for_tests(&Network::Mainnet, state.clone()); let height = NetworkUpgrade::Canopy .activation_height(&Network::Mainnet) @@ -513,7 +514,7 @@ async fn mempool_request_with_lock_time_max_sequence_number_is_accepted() { #[tokio::test] async fn mempool_request_with_past_lock_time_is_accepted() { let mut state: MockService<_, _, _, _> = MockService::build().for_prop_tests(); - let verifier = Verifier::new(&Network::Mainnet, state.clone()); + let verifier = Verifier::new_for_tests(&Network::Mainnet, state.clone()); let height = NetworkUpgrade::Canopy .activation_height(&Network::Mainnet) @@ -585,6 +586,123 @@ async fn mempool_request_with_past_lock_time_is_accepted() { ); } +#[tokio::test] +async fn mempool_request_with_unmined_output_spends_is_accepted() { + let mut state: MockService<_, _, _, _> = MockService::build().for_prop_tests(); + let mempool: MockService<_, _, _, _> = MockService::build().for_prop_tests(); + let (mempool_setup_tx, mempool_setup_rx) = tokio::sync::oneshot::channel(); + let verifier = Verifier::new(&Network::Mainnet, state.clone(), mempool_setup_rx); + mempool_setup_tx + .send(mempool.clone()) + .ok() + .expect("send should succeed"); + + let height = NetworkUpgrade::Canopy + .activation_height(&Network::Mainnet) + .expect("Canopy activation height is specified"); + let fund_height = (height - 1).expect("fake source fund block height is too small"); + let (input, output, known_utxos) = mock_transparent_transfer( + fund_height, + true, + 0, + Amount::try_from(10001).expect("invalid value"), + ); + + // Create a non-coinbase V4 tx with the last valid expiry height. + let tx = Transaction::V4 { + inputs: vec![input], + outputs: vec![output], + lock_time: LockTime::min_lock_time_timestamp(), + expiry_height: height, + joinsplit_data: None, + sapling_shielded_data: None, + }; + + let input_outpoint = match tx.inputs()[0] { + transparent::Input::PrevOut { outpoint, .. } => outpoint, + transparent::Input::Coinbase { .. } => panic!("requires a non-coinbase transaction"), + }; + + tokio::spawn(async move { + state + .expect_request(zebra_state::Request::BestChainNextMedianTimePast) + .await + .expect("verifier should call mock state service with correct request") + .respond(zebra_state::Response::BestChainNextMedianTimePast( + DateTime32::MAX, + )); + + state + .expect_request(zebra_state::Request::UnspentBestChainUtxo(input_outpoint)) + .await + .expect("verifier should call mock state service with correct request") + .respond(zebra_state::Response::UnspentBestChainUtxo(None)); + + state + .expect_request_that(|req| { + matches!( + req, + zebra_state::Request::CheckBestChainTipNullifiersAndAnchors(_) + ) + }) + .await + .expect("verifier should call mock state service with correct request") + .respond(zebra_state::Response::ValidBestChainTipNullifiersAndAnchors); + }); + + let mut mempool_clone = mempool.clone(); + tokio::spawn(async move { + mempool_clone + .expect_request(mempool::Request::AwaitOutput(input_outpoint)) + .await + .expect("verifier should call mock state service with correct request") + .respond(mempool::Response::UnspentOutput( + known_utxos + .get(&input_outpoint) + .expect("input outpoint should exist in known_utxos") + .utxo + .output + .clone(), + )); + }); + + let verifier_response = verifier + .oneshot(Request::Mempool { + transaction: tx.into(), + height, + }) + .await; + + assert!( + verifier_response.is_ok(), + "expected successful verification, got: {verifier_response:?}" + ); + + let crate::transaction::Response::Mempool { + transaction: _, + spent_mempool_outpoints, + } = verifier_response.expect("already checked that response is ok") + else { + panic!("unexpected response variant from transaction verifier for Mempool request") + }; + + assert_eq!( + spent_mempool_outpoints, + vec![input_outpoint], + "spent_mempool_outpoints in tx verifier response should match input_outpoint" + ); + + tokio::time::sleep(POLL_MEMPOOL_DELAY * 2).await; + assert_eq!( + mempool.poll_count(), + 2, + "the mempool service should have been polled twice, \ + first before being called with an AwaitOutput request, \ + then again shortly after a mempool transaction with transparent outputs \ + is successfully verified" + ); +} + /// Tests that calls to the transaction verifier with a mempool request that spends /// immature coinbase outputs will return an error. #[tokio::test] @@ -592,7 +710,7 @@ async fn mempool_request_with_immature_spend_is_rejected() { let _init_guard = zebra_test::init(); let mut state: MockService<_, _, _, _> = MockService::build().for_prop_tests(); - let verifier = Verifier::new(&Network::Mainnet, state.clone()); + let verifier = Verifier::new_for_tests(&Network::Mainnet, state.clone()); let height = NetworkUpgrade::Canopy .activation_height(&Network::Mainnet) @@ -695,7 +813,7 @@ async fn state_error_converted_correctly() { use zebra_state::DuplicateNullifierError; let mut state: MockService<_, _, _, _> = MockService::build().for_prop_tests(); - let verifier = Verifier::new(&Network::Mainnet, state.clone()); + let verifier = Verifier::new_for_tests(&Network::Mainnet, state.clone()); let height = NetworkUpgrade::Canopy .activation_height(&Network::Mainnet) @@ -856,7 +974,7 @@ async fn v5_transaction_is_rejected_before_nu5_activation() { for network in Network::iter() { let state_service = service_fn(|_| async { unreachable!("Service should not be called") }); - let verifier = Verifier::new(&network, state_service); + let verifier = Verifier::new_for_tests(&network, state_service); let transaction = fake_v5_transactions_for_network(&network, network.block_iter()) .next_back() @@ -903,7 +1021,7 @@ fn v5_transaction_is_accepted_after_nu5_activation_for_network(network: Network) let blocks = network.block_iter(); let state_service = service_fn(|_| async { unreachable!("Service should not be called") }); - let verifier = Verifier::new(&network, state_service); + let verifier = Verifier::new_for_tests(&network, state_service); let mut transaction = fake_v5_transactions_for_network(&network, blocks) .next_back() @@ -975,7 +1093,7 @@ async fn v4_transaction_with_transparent_transfer_is_accepted() { let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&network, state_service); + let verifier = Verifier::new_for_tests(&network, state_service); let result = verifier .oneshot(Request::Block { @@ -998,7 +1116,7 @@ async fn v4_transaction_with_transparent_transfer_is_accepted() { async fn v4_transaction_with_last_valid_expiry_height() { let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&Network::Mainnet, state_service); + let verifier = Verifier::new_for_tests(&Network::Mainnet, state_service); let block_height = NetworkUpgrade::Canopy .activation_height(&Network::Mainnet) @@ -1045,7 +1163,7 @@ async fn v4_transaction_with_last_valid_expiry_height() { async fn v4_coinbase_transaction_with_low_expiry_height() { let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&Network::Mainnet, state_service); + let verifier = Verifier::new_for_tests(&Network::Mainnet, state_service); let block_height = NetworkUpgrade::Canopy .activation_height(&Network::Mainnet) @@ -1086,7 +1204,7 @@ async fn v4_coinbase_transaction_with_low_expiry_height() { async fn v4_transaction_with_too_low_expiry_height() { let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&Network::Mainnet, state_service); + let verifier = Verifier::new_for_tests(&Network::Mainnet, state_service); let block_height = NetworkUpgrade::Canopy .activation_height(&Network::Mainnet) @@ -1138,7 +1256,7 @@ async fn v4_transaction_with_too_low_expiry_height() { async fn v4_transaction_with_exceeding_expiry_height() { let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&Network::Mainnet, state_service); + let verifier = Verifier::new_for_tests(&Network::Mainnet, state_service); let block_height = block::Height::MAX; @@ -1189,7 +1307,7 @@ async fn v4_transaction_with_exceeding_expiry_height() { async fn v4_coinbase_transaction_with_exceeding_expiry_height() { let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&Network::Mainnet, state_service); + let verifier = Verifier::new_for_tests(&Network::Mainnet, state_service); // Use an arbitrary pre-NU5 block height. // It can't be NU5-onward because the expiry height limit is not enforced @@ -1265,7 +1383,7 @@ async fn v4_coinbase_transaction_is_accepted() { let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&network, state_service); + let verifier = Verifier::new_for_tests(&network, state_service); let result = verifier .oneshot(Request::Block { @@ -1320,7 +1438,7 @@ async fn v4_transaction_with_transparent_transfer_is_rejected_by_the_script() { let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&network, state_service); + let verifier = Verifier::new_for_tests(&network, state_service); let result = verifier .oneshot(Request::Block { @@ -1375,7 +1493,7 @@ async fn v4_transaction_with_conflicting_transparent_spend_is_rejected() { let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&network, state_service); + let verifier = Verifier::new_for_tests(&network, state_service); let result = verifier .oneshot(Request::Block { @@ -1446,7 +1564,7 @@ fn v4_transaction_with_conflicting_sprout_nullifier_inside_joinsplit_is_rejected let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&network, state_service); + let verifier = Verifier::new_for_tests(&network, state_service); let result = verifier .oneshot(Request::Block { @@ -1522,7 +1640,7 @@ fn v4_transaction_with_conflicting_sprout_nullifier_across_joinsplits_is_rejecte let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&network, state_service); + let verifier = Verifier::new_for_tests(&network, state_service); let result = verifier .oneshot(Request::Block { @@ -1581,7 +1699,7 @@ async fn v5_transaction_with_transparent_transfer_is_accepted() { let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&network, state_service); + let verifier = Verifier::new_for_tests(&network, state_service); let result = verifier .oneshot(Request::Block { @@ -1605,7 +1723,7 @@ async fn v5_transaction_with_last_valid_expiry_height() { let network = Network::new_default_testnet(); let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&network, state_service); + let verifier = Verifier::new_for_tests(&network, state_service); let block_height = NetworkUpgrade::Nu5 .activation_height(&network) @@ -1651,7 +1769,8 @@ async fn v5_coinbase_transaction_expiry_height() { let network = Network::new_default_testnet(); let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&network, state_service); + let verifier = Verifier::new_for_tests(&network, state_service); + let verifier = Buffer::new(verifier, 10); let block_height = NetworkUpgrade::Nu5 .activation_height(&network) @@ -1701,7 +1820,11 @@ async fn v5_coinbase_transaction_expiry_height() { height: block_height, time: DateTime::::MAX_UTC, }) - .await; + .await + .map_err(|err| { + *err.downcast() + .expect("error type should be TransactionError") + }); assert_eq!( result, @@ -1726,7 +1849,11 @@ async fn v5_coinbase_transaction_expiry_height() { height: block_height, time: DateTime::::MAX_UTC, }) - .await; + .await + .map_err(|err| { + *err.downcast() + .expect("error type should be TransactionError") + }); assert_eq!( result, @@ -1768,7 +1895,7 @@ async fn v5_transaction_with_too_low_expiry_height() { let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&network, state_service); + let verifier = Verifier::new_for_tests(&network, state_service); let block_height = NetworkUpgrade::Nu5 .activation_height(&network) @@ -1820,7 +1947,7 @@ async fn v5_transaction_with_too_low_expiry_height() { async fn v5_transaction_with_exceeding_expiry_height() { let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&Network::Mainnet, state_service); + let verifier = Verifier::new_for_tests(&Network::Mainnet, state_service); let block_height = block::Height::MAX; @@ -1898,7 +2025,7 @@ async fn v5_coinbase_transaction_is_accepted() { let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&network, state_service); + let verifier = Verifier::new_for_tests(&network, state_service); let result = verifier .oneshot(Request::Block { @@ -1955,7 +2082,7 @@ async fn v5_transaction_with_transparent_transfer_is_rejected_by_the_script() { let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&network, state_service); + let verifier = Verifier::new_for_tests(&network, state_service); let result = verifier .oneshot(Request::Block { @@ -2012,7 +2139,7 @@ async fn v5_transaction_with_conflicting_transparent_spend_is_rejected() { let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&network, state_service); + let verifier = Verifier::new_for_tests(&network, state_service); let result = verifier .oneshot(Request::Block { @@ -2055,11 +2182,10 @@ fn v4_with_signed_sprout_transfer_is_accepted() { // Initialize the verifier let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&network, state_service); + let verifier = Verifier::new_for_tests(&network, state_service); // Test the transaction verifier let result = verifier - .clone() .oneshot(Request::Block { transaction, known_utxos: Arc::new(HashMap::new()), @@ -2135,7 +2261,8 @@ async fn v4_with_joinsplit_is_rejected_for_modification( // Initialize the verifier let state_service = service_fn(|_| async { unreachable!("State service should not be called.") }); - let verifier = Verifier::new(&network, state_service); + let verifier = Verifier::new_for_tests(&network, state_service); + let verifier = Buffer::new(verifier, 10); // Test the transaction verifier. // @@ -2154,7 +2281,11 @@ async fn v4_with_joinsplit_is_rejected_for_modification( height, time: DateTime::::MAX_UTC, }) - .await; + .await + .map_err(|err| { + *err.downcast() + .expect("error type should be TransactionError") + }); if result == expected_error || i >= 100 { break result; @@ -2186,11 +2317,10 @@ fn v4_with_sapling_spends() { // Initialize the verifier let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&network, state_service); + let verifier = Verifier::new_for_tests(&network, state_service); // Test the transaction verifier let result = verifier - .clone() .oneshot(Request::Block { transaction, known_utxos: Arc::new(HashMap::new()), @@ -2229,11 +2359,10 @@ fn v4_with_duplicate_sapling_spends() { // Initialize the verifier let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&network, state_service); + let verifier = Verifier::new_for_tests(&network, state_service); // Test the transaction verifier let result = verifier - .clone() .oneshot(Request::Block { transaction, known_utxos: Arc::new(HashMap::new()), @@ -2274,11 +2403,10 @@ fn v4_with_sapling_outputs_and_no_spends() { // Initialize the verifier let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&network, state_service); + let verifier = Verifier::new_for_tests(&network, state_service); // Test the transaction verifier let result = verifier - .clone() .oneshot(Request::Block { transaction, known_utxos: Arc::new(HashMap::new()), @@ -2323,11 +2451,10 @@ fn v5_with_sapling_spends() { // Initialize the verifier let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&network, state_service); + let verifier = Verifier::new_for_tests(&network, state_service); // Test the transaction verifier let result = verifier - .clone() .oneshot(Request::Block { transaction: Arc::new(transaction), known_utxos: Arc::new(HashMap::new()), @@ -2367,11 +2494,10 @@ fn v5_with_duplicate_sapling_spends() { // Initialize the verifier let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&network, state_service); + let verifier = Verifier::new_for_tests(&network, state_service); // Test the transaction verifier let result = verifier - .clone() .oneshot(Request::Block { transaction: Arc::new(transaction), known_utxos: Arc::new(HashMap::new()), @@ -2430,11 +2556,10 @@ fn v5_with_duplicate_orchard_action() { // Initialize the verifier let state_service = service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(&network, state_service); + let verifier = Verifier::new_for_tests(&network, state_service); // Test the transaction verifier let result = verifier - .clone() .oneshot(Request::Block { transaction: Arc::new(transaction), known_utxos: Arc::new(HashMap::new()), @@ -2933,7 +3058,7 @@ fn shielded_outputs_are_not_decryptable_for_fake_v5_blocks() { #[tokio::test] async fn mempool_zip317_error() { let mut state: MockService<_, _, _, _> = MockService::build().for_prop_tests(); - let verifier = Verifier::new(&Network::Mainnet, state.clone()); + let verifier = Verifier::new_for_tests(&Network::Mainnet, state.clone()); let height = NetworkUpgrade::Nu5 .activation_height(&Network::Mainnet) @@ -3005,7 +3130,7 @@ async fn mempool_zip317_error() { #[tokio::test] async fn mempool_zip317_ok() { let mut state: MockService<_, _, _, _> = MockService::build().for_prop_tests(); - let verifier = Verifier::new(&Network::Mainnet, state.clone()); + let verifier = Verifier::new_for_tests(&Network::Mainnet, state.clone()); let height = NetworkUpgrade::Nu5 .activation_height(&Network::Mainnet) diff --git a/zebra-consensus/src/transaction/tests/prop.rs b/zebra-consensus/src/transaction/tests/prop.rs index f45b4731de0..856742e5d74 100644 --- a/zebra-consensus/src/transaction/tests/prop.rs +++ b/zebra-consensus/src/transaction/tests/prop.rs @@ -4,7 +4,7 @@ use std::{collections::HashMap, sync::Arc}; use chrono::{DateTime, Duration, Utc}; use proptest::{collection::vec, prelude::*}; -use tower::ServiceExt; +use tower::{buffer::Buffer, ServiceExt}; use zebra_chain::{ amount::Amount, @@ -450,7 +450,8 @@ fn validate( // Initialize the verifier let state_service = tower::service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = transaction::Verifier::new(&network, state_service); + let verifier = transaction::Verifier::new_for_tests(&network, state_service); + let verifier = Buffer::new(verifier, 10); // Test the transaction verifier verifier @@ -462,5 +463,9 @@ fn validate( time: block_time, }) .await + .map_err(|err| { + *err.downcast() + .expect("error type should be TransactionError") + }) }) } diff --git a/zebra-node-services/src/mempool.rs b/zebra-node-services/src/mempool.rs index fbaaf029c75..10f51cf4a30 100644 --- a/zebra-node-services/src/mempool.rs +++ b/zebra-node-services/src/mempool.rs @@ -5,7 +5,10 @@ use std::collections::HashSet; use tokio::sync::oneshot; -use zebra_chain::transaction::{self, UnminedTx, UnminedTxId}; +use zebra_chain::{ + transaction::{self, UnminedTx, UnminedTxId}, + transparent, +}; #[cfg(feature = "getblocktemplate-rpcs")] use zebra_chain::transaction::VerifiedUnminedTx; @@ -14,6 +17,10 @@ use crate::BoxError; mod gossip; +mod transaction_dependencies; + +pub use transaction_dependencies::TransactionDependencies; + pub use self::gossip::Gossip; /// A mempool service request. @@ -39,6 +46,21 @@ pub enum Request { /// the [`AuthDigest`](zebra_chain::transaction::AuthDigest). TransactionsByMinedId(HashSet), + /// Request a [`transparent::Output`] identified by the given [`OutPoint`](transparent::OutPoint), + /// waiting until it becomes available if it is unknown. + /// + /// This request is purely informational, and there are no guarantees about + /// whether the UTXO remains unspent or is on the best chain, or any chain. + /// Its purpose is to allow orphaned mempool transaction verification. + /// + /// # Correctness + /// + /// Output requests should be wrapped in a timeout, so that + /// out-of-order and invalid requests do not hang indefinitely. + /// + /// Outdated requests are pruned on a regular basis. + AwaitOutput(transparent::OutPoint), + /// Get all the [`VerifiedUnminedTx`] in the mempool. /// /// Equivalent to `TransactionsById(TransactionIds)`, @@ -99,6 +121,9 @@ pub enum Response { /// different transactions with different mined IDs. Transactions(Vec), + /// Response to [`Request::AwaitOutput`] with the transparent output + UnspentOutput(transparent::Output), + /// Returns all [`VerifiedUnminedTx`] in the mempool. // // TODO: make the Transactions response return VerifiedUnminedTx, @@ -108,6 +133,9 @@ pub enum Response { /// All [`VerifiedUnminedTx`]s in the mempool transactions: Vec, + /// All transaction dependencies in the mempool + transaction_dependencies: TransactionDependencies, + /// Last seen chain tip hash by mempool service last_seen_tip_hash: zebra_chain::block::Hash, }, diff --git a/zebra-node-services/src/mempool/transaction_dependencies.rs b/zebra-node-services/src/mempool/transaction_dependencies.rs new file mode 100644 index 00000000000..2b333060b77 --- /dev/null +++ b/zebra-node-services/src/mempool/transaction_dependencies.rs @@ -0,0 +1,124 @@ +//! Representation of mempool transactions' dependencies on other transactions in the mempool. + +use std::collections::{HashMap, HashSet}; + +use zebra_chain::{transaction, transparent}; + +/// Representation of mempool transactions' dependencies on other transactions in the mempool. +#[derive(Default, Debug, Clone)] +pub struct TransactionDependencies { + /// Lists of mempool transaction ids that create UTXOs spent by + /// a mempool transaction. Used during block template construction + /// to exclude transactions from block templates unless all of the + /// transactions they depend on have been included. + dependencies: HashMap>, + + /// Lists of transaction ids in the mempool that spend UTXOs created + /// by a transaction in the mempool, e.g. tx1 -> set(tx2, tx3, tx4) where + /// tx2, tx3, and tx4 spend outputs created by tx1. + dependents: HashMap>, +} + +impl TransactionDependencies { + /// Adds a transaction that spends outputs created by other transactions in the mempool + /// as a dependent of those transactions, and adds the transactions that created the outputs + /// spent by the dependent transaction as dependencies of the dependent transaction. + /// + /// # Correctness + /// + /// It's the caller's responsibility to ensure that there are no cyclical dependencies. + /// + /// The transaction verifier will wait until the spent output of a transaction has been added to the verified set, + /// so its `AwaitOutput` requests will timeout if there is a cyclical dependency. + pub fn add( + &mut self, + dependent: transaction::Hash, + spent_mempool_outpoints: Vec, + ) { + for &spent_mempool_outpoint in &spent_mempool_outpoints { + self.dependents + .entry(spent_mempool_outpoint.hash) + .or_default() + .insert(dependent); + } + + // Only add an entries to `dependencies` for transactions that spend unmined outputs so it + // can be used to handle transactions with dependencies differently during block production. + if !spent_mempool_outpoints.is_empty() { + self.dependencies.insert( + dependent, + spent_mempool_outpoints + .into_iter() + .map(|outpoint| outpoint.hash) + .collect(), + ); + } + } + + /// Removes all dependents for a list of mined transaction ids and removes the mined transaction ids + /// from the dependencies of their dependents. + pub fn clear_mined_dependencies(&mut self, mined_ids: &HashSet) { + for mined_tx_id in mined_ids { + for dependent_id in self.dependents.remove(mined_tx_id).unwrap_or_default() { + let Some(dependencies) = self.dependencies.get_mut(&dependent_id) else { + // TODO: Move this struct to zebra-chain and log a warning here. + continue; + }; + + // TODO: Move this struct to zebra-chain and log a warning here if the dependency was not found. + let _ = dependencies.remove(&dependent_id); + } + } + } + + /// Removes the hash of a transaction in the mempool and the hashes of any transactions + /// that are tracked as being directly or indirectly dependent on that transaction from + /// this [`TransactionDependencies`]. + /// + /// Returns a list of transaction hashes that were being tracked as dependents of the + /// provided transaction hash. + pub fn remove_all(&mut self, &tx_hash: &transaction::Hash) -> HashSet { + let mut all_dependents = HashSet::new(); + let mut current_level_dependents: HashSet<_> = [tx_hash].into(); + + while !current_level_dependents.is_empty() { + current_level_dependents = current_level_dependents + .iter() + .flat_map(|dep| { + self.dependencies.remove(dep); + self.dependents.remove(dep).unwrap_or_default() + }) + .collect(); + + all_dependents.extend(¤t_level_dependents); + } + + all_dependents + } + + /// Returns a list of hashes of transactions that directly depend on the transaction for `tx_hash`. + pub fn direct_dependents(&self, tx_hash: &transaction::Hash) -> HashSet { + self.dependents.get(tx_hash).cloned().unwrap_or_default() + } + + /// Returns a list of hashes of transactions that are direct dependencies of the transaction for `tx_hash`. + pub fn direct_dependencies(&self, tx_hash: &transaction::Hash) -> HashSet { + self.dependencies.get(tx_hash).cloned().unwrap_or_default() + } + + /// Clear the maps of transaction dependencies. + pub fn clear(&mut self) { + self.dependencies.clear(); + self.dependents.clear(); + } + + /// Returns the map of transaction's dependencies + pub fn dependencies(&self) -> &HashMap> { + &self.dependencies + } + + /// Returns the map of transaction's dependents + pub fn dependents(&self) -> &HashMap> { + &self.dependents + } +} diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index 736021c7668..268676beb27 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -965,6 +965,7 @@ where #[cfg(feature = "getblocktemplate-rpcs")] mempool::Response::FullTransactions { mut transactions, + transaction_dependencies: _, last_seen_tip_hash: _, } => { // Sort transactions in descending order by fee/size, using hash in serialized byte order as a tie-breaker diff --git a/zebra-rpc/src/methods/get_block_template_rpcs.rs b/zebra-rpc/src/methods/get_block_template_rpcs.rs index 2d50552cfec..aed926b3635 100644 --- a/zebra-rpc/src/methods/get_block_template_rpcs.rs +++ b/zebra-rpc/src/methods/get_block_template_rpcs.rs @@ -648,7 +648,13 @@ where // The loop returns the server long poll ID, // which should be different to the client long poll ID. - let (server_long_poll_id, chain_tip_and_local_time, mempool_txs, submit_old) = loop { + let ( + server_long_poll_id, + chain_tip_and_local_time, + mempool_txs, + mempool_tx_deps, + submit_old, + ) = loop { // Check if we are synced to the tip. // The result of this check can change during long polling. // @@ -688,12 +694,13 @@ where // // Optional TODO: // - add a `MempoolChange` type with an `async changed()` method (like `ChainTip`) - let Some(mempool_txs) = fetch_mempool_transactions(mempool.clone(), tip_hash) - .await? - // If the mempool and state responses are out of sync: - // - if we are not long polling, omit mempool transactions from the template, - // - if we are long polling, continue to the next iteration of the loop to make fresh state and mempool requests. - .or_else(|| client_long_poll_id.is_none().then(Vec::new)) + let Some((mempool_txs, mempool_tx_deps)) = + fetch_mempool_transactions(mempool.clone(), tip_hash) + .await? + // If the mempool and state responses are out of sync: + // - if we are not long polling, omit mempool transactions from the template, + // - if we are long polling, continue to the next iteration of the loop to make fresh state and mempool requests. + .or_else(|| client_long_poll_id.is_none().then(Default::default)) else { continue; }; @@ -728,6 +735,7 @@ where server_long_poll_id, chain_tip_and_local_time, mempool_txs, + mempool_tx_deps, submit_old, ); } @@ -888,15 +896,15 @@ where next_block_height, &miner_address, mempool_txs, + mempool_tx_deps, debug_like_zcashd, extra_coinbase_data.clone(), - ) - .await; + ); tracing::debug!( selected_mempool_tx_hashes = ?mempool_txs .iter() - .map(|tx| tx.transaction.id.mined_id()) + .map(|#[cfg(not(test))] tx, #[cfg(test)] (_, tx)| tx.transaction.id.mined_id()) .collect::>(), "selected transactions for the template from the mempool" ); diff --git a/zebra-rpc/src/methods/get_block_template_rpcs/get_block_template.rs b/zebra-rpc/src/methods/get_block_template_rpcs/get_block_template.rs index 8e9578180be..7ab1a48e20a 100644 --- a/zebra-rpc/src/methods/get_block_template_rpcs/get_block_template.rs +++ b/zebra-rpc/src/methods/get_block_template_rpcs/get_block_template.rs @@ -22,7 +22,7 @@ use zebra_chain::{ use zebra_consensus::{ block_subsidy, funding_stream_address, funding_stream_values, miner_subsidy, }; -use zebra_node_services::mempool; +use zebra_node_services::mempool::{self, TransactionDependencies}; use zebra_state::GetBlockTemplateChainInfo; use crate::methods::{ @@ -253,7 +253,7 @@ where pub async fn fetch_mempool_transactions( mempool: Mempool, chain_tip_hash: block::Hash, -) -> Result>> +) -> Result, TransactionDependencies)>> where Mempool: Service< mempool::Request, @@ -271,8 +271,11 @@ where data: None, })?; + // TODO: Order transactions in block templates based on their dependencies + let mempool::Response::FullTransactions { transactions, + transaction_dependencies, last_seen_tip_hash, } = response else { @@ -280,7 +283,7 @@ where }; // Check that the mempool and state were in sync when we made the requests - Ok((last_seen_tip_hash == chain_tip_hash).then_some(transactions)) + Ok((last_seen_tip_hash == chain_tip_hash).then_some((transactions, transaction_dependencies))) } // - Response processing diff --git a/zebra-rpc/src/methods/get_block_template_rpcs/types/get_block_template.rs b/zebra-rpc/src/methods/get_block_template_rpcs/types/get_block_template.rs index d7c31e11a81..879425bb667 100644 --- a/zebra-rpc/src/methods/get_block_template_rpcs/types/get_block_template.rs +++ b/zebra-rpc/src/methods/get_block_template_rpcs/types/get_block_template.rs @@ -35,6 +35,11 @@ pub mod proposal; pub use parameters::{GetBlockTemplateCapability, GetBlockTemplateRequestMode, JsonParameters}; pub use proposal::{proposal_block_from_template, ProposalResponse}; +/// An alias to indicate that a usize value represents the depth of in-block dependencies of a transaction. +/// +/// See the `dependencies_depth()` function in [`zip317`](super::super::zip317) for more details. +pub type InBlockTxDependenciesDepth = usize; + /// A serialized `getblocktemplate` RPC response in template mode. #[derive(Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)] pub struct GetBlockTemplate { @@ -227,7 +232,8 @@ impl GetBlockTemplate { miner_address: &transparent::Address, chain_tip_and_local_time: &GetBlockTemplateChainInfo, long_poll_id: LongPollId, - mempool_txs: Vec, + #[cfg(not(test))] mempool_txs: Vec, + #[cfg(test)] mempool_txs: Vec<(InBlockTxDependenciesDepth, VerifiedUnminedTx)>, submit_old: Option, like_zcashd: bool, extra_coinbase_data: Vec, @@ -237,28 +243,45 @@ impl GetBlockTemplate { (chain_tip_and_local_time.tip_height + 1).expect("tip is far below Height::MAX"); // Convert transactions into TransactionTemplates - let mut mempool_txs_with_templates: Vec<( - TransactionTemplate, - VerifiedUnminedTx, - )> = mempool_txs - .into_iter() - .map(|tx| ((&tx).into(), tx)) - .collect(); + #[cfg(not(test))] + let (mempool_tx_templates, mempool_txs): (Vec<_>, Vec<_>) = + mempool_txs.into_iter().map(|tx| ((&tx).into(), tx)).unzip(); // Transaction selection returns transactions in an arbitrary order, // but Zebra's snapshot tests expect the same order every time. - if like_zcashd { - // Sort in serialized data order, excluding the length byte. - // `zcashd` sometimes seems to do this, but other times the order is arbitrary. - mempool_txs_with_templates.sort_by_key(|(tx_template, _tx)| tx_template.data.clone()); - } else { - // Sort by hash, this is faster. + // + // # Correctness + // + // Transactions that spend outputs created in the same block must appear + // after the transactions that create those outputs. + #[cfg(test)] + let (mempool_tx_templates, mempool_txs): (Vec<_>, Vec<_>) = { + let mut mempool_txs_with_templates: Vec<( + InBlockTxDependenciesDepth, + TransactionTemplate, + VerifiedUnminedTx, + )> = mempool_txs + .into_iter() + .map(|(min_tx_index, tx)| (min_tx_index, (&tx).into(), tx)) + .collect(); + + if like_zcashd { + // Sort in serialized data order, excluding the length byte. + // `zcashd` sometimes seems to do this, but other times the order is arbitrary. + mempool_txs_with_templates.sort_by_key(|(min_tx_index, tx_template, _tx)| { + (*min_tx_index, tx_template.data.clone()) + }); + } else { + // Sort by hash, this is faster. + mempool_txs_with_templates.sort_by_key(|(min_tx_index, tx_template, _tx)| { + (*min_tx_index, tx_template.hash.bytes_in_display_order()) + }); + } mempool_txs_with_templates - .sort_by_key(|(tx_template, _tx)| tx_template.hash.bytes_in_display_order()); - } - - let (mempool_tx_templates, mempool_txs): (Vec<_>, Vec<_>) = - mempool_txs_with_templates.into_iter().unzip(); + .into_iter() + .map(|(_, template, tx)| (template, tx)) + .unzip() + }; // Generate the coinbase transaction and default roots // diff --git a/zebra-rpc/src/methods/get_block_template_rpcs/types/long_poll.rs b/zebra-rpc/src/methods/get_block_template_rpcs/types/long_poll.rs index 8817a8c12c0..08439df2fcf 100644 --- a/zebra-rpc/src/methods/get_block_template_rpcs/types/long_poll.rs +++ b/zebra-rpc/src/methods/get_block_template_rpcs/types/long_poll.rs @@ -71,14 +71,18 @@ impl LongPollInput { max_time: DateTime32, mempool_tx_ids: impl IntoIterator, ) -> Self { - let mempool_transaction_mined_ids = + let mut tx_mined_ids: Vec = mempool_tx_ids.into_iter().map(|id| id.mined_id()).collect(); + // The mempool returns unordered transactions, we need to sort them here so + // that the longpollid doesn't change unexpectedly. + tx_mined_ids.sort(); + LongPollInput { tip_height, tip_hash, max_time, - mempool_transaction_mined_ids, + mempool_transaction_mined_ids: tx_mined_ids.into(), } } @@ -293,3 +297,28 @@ impl TryFrom for LongPollId { s.parse() } } + +/// Check that [`LongPollInput::new`] will sort mempool transaction ids. +/// +/// The mempool does not currently guarantee the order in which it will return transactions and +/// may return the same items in a different order, while the long poll id should be the same if +/// its other components are equal and no transactions have been added or removed in the mempool. +#[test] +fn long_poll_input_mempool_tx_ids_are_sorted() { + let mempool_tx_ids = || { + (0..10) + .map(|i| transaction::Hash::from([i; 32])) + .map(UnminedTxId::Legacy) + }; + + assert_eq!( + LongPollInput::new(Height::MIN, Default::default(), 0.into(), mempool_tx_ids()), + LongPollInput::new( + Height::MIN, + Default::default(), + 0.into(), + mempool_tx_ids().rev() + ), + "long poll input should sort mempool tx ids" + ); +} diff --git a/zebra-rpc/src/methods/get_block_template_rpcs/zip317.rs b/zebra-rpc/src/methods/get_block_template_rpcs/zip317.rs index 3f0979dc266..75ae9575d62 100644 --- a/zebra-rpc/src/methods/get_block_template_rpcs/zip317.rs +++ b/zebra-rpc/src/methods/get_block_template_rpcs/zip317.rs @@ -6,6 +6,8 @@ //! > when computing `size_target`, since there is no consensus requirement for this to be //! > exactly the same between implementations. +use std::collections::{HashMap, HashSet}; + use rand::{ distributions::{Distribution, WeightedIndex}, prelude::thread_rng, @@ -15,15 +17,30 @@ use zebra_chain::{ amount::NegativeOrZero, block::{Height, MAX_BLOCK_BYTES}, parameters::Network, - transaction::{zip317::BLOCK_UNPAID_ACTION_LIMIT, VerifiedUnminedTx}, + transaction::{self, zip317::BLOCK_UNPAID_ACTION_LIMIT, VerifiedUnminedTx}, transparent, }; use zebra_consensus::MAX_BLOCK_SIGOPS; +use zebra_node_services::mempool::TransactionDependencies; use crate::methods::get_block_template_rpcs::{ get_block_template::generate_coinbase_transaction, types::transaction::TransactionTemplate, }; +#[cfg(test)] +use super::get_block_template::InBlockTxDependenciesDepth; + +#[cfg(test)] +mod tests; + +/// Used in the return type of [`select_mempool_transactions()`] for test compilations. +#[cfg(test)] +type SelectedMempoolTx = (InBlockTxDependenciesDepth, VerifiedUnminedTx); + +/// Used in the return type of [`select_mempool_transactions()`] for non-test compilations. +#[cfg(not(test))] +type SelectedMempoolTx = VerifiedUnminedTx; + /// Selects mempool transactions for block production according to [ZIP-317], /// using a fake coinbase transaction and the mempool. /// @@ -36,14 +53,15 @@ use crate::methods::get_block_template_rpcs::{ /// Returns selected transactions from `mempool_txs`. /// /// [ZIP-317]: https://zips.z.cash/zip-0317#block-production -pub async fn select_mempool_transactions( +pub fn select_mempool_transactions( network: &Network, next_block_height: Height, miner_address: &transparent::Address, mempool_txs: Vec, + mempool_tx_deps: TransactionDependencies, like_zcashd: bool, extra_coinbase_data: Vec, -) -> Vec { +) -> Vec { // Use a fake coinbase transaction to break the dependency between transaction // selection, the miner fee, and the fee payment in the coinbase transaction. let fake_coinbase_tx = fake_coinbase_transaction( @@ -54,9 +72,16 @@ pub async fn select_mempool_transactions( extra_coinbase_data, ); + let tx_dependencies = mempool_tx_deps.dependencies(); + let (independent_mempool_txs, mut dependent_mempool_txs): (HashMap<_, _>, HashMap<_, _>) = + mempool_txs + .into_iter() + .map(|tx| (tx.transaction.id.mined_id(), tx)) + .partition(|(tx_id, _tx)| !tx_dependencies.contains_key(tx_id)); + // Setup the transaction lists. - let (mut conventional_fee_txs, mut low_fee_txs): (Vec<_>, Vec<_>) = mempool_txs - .into_iter() + let (mut conventional_fee_txs, mut low_fee_txs): (Vec<_>, Vec<_>) = independent_mempool_txs + .into_values() .partition(VerifiedUnminedTx::pays_conventional_fee); let mut selected_txs = Vec::new(); @@ -77,8 +102,10 @@ pub async fn select_mempool_transactions( while let Some(tx_weights) = conventional_fee_tx_weights { conventional_fee_tx_weights = checked_add_transaction_weighted_random( &mut conventional_fee_txs, + &mut dependent_mempool_txs, tx_weights, &mut selected_txs, + &mempool_tx_deps, &mut remaining_block_bytes, &mut remaining_block_sigops, // The number of unpaid actions is always zero for transactions that pay the @@ -93,8 +120,10 @@ pub async fn select_mempool_transactions( while let Some(tx_weights) = low_fee_tx_weights { low_fee_tx_weights = checked_add_transaction_weighted_random( &mut low_fee_txs, + &mut dependent_mempool_txs, tx_weights, &mut selected_txs, + &mempool_tx_deps, &mut remaining_block_bytes, &mut remaining_block_sigops, &mut remaining_block_unpaid_actions, @@ -158,6 +187,59 @@ fn setup_fee_weighted_index(transactions: &[VerifiedUnminedTx]) -> Option>, + selected_txs: &Vec, +) -> bool { + let Some(deps) = candidate_tx_deps else { + return true; + }; + + if selected_txs.len() < deps.len() { + return false; + } + + let mut num_available_deps = 0; + for tx in selected_txs { + #[cfg(test)] + let (_, tx) = tx; + if deps.contains(&tx.transaction.id.mined_id()) { + num_available_deps += 1; + } else { + continue; + } + + if num_available_deps == deps.len() { + return true; + } + } + + false +} + +/// Returns the depth of a transaction's dependencies in the block for a candidate +/// transaction with the provided dependencies. +#[cfg(test)] +fn dependencies_depth( + dependent_tx_id: &transaction::Hash, + mempool_tx_deps: &TransactionDependencies, +) -> InBlockTxDependenciesDepth { + let mut current_level = 0; + let mut current_level_deps = mempool_tx_deps.direct_dependencies(dependent_tx_id); + while !current_level_deps.is_empty() { + current_level += 1; + current_level_deps = current_level_deps + .iter() + .flat_map(|dep_id| mempool_tx_deps.direct_dependencies(dep_id)) + .collect(); + } + + current_level +} + /// Chooses a random transaction from `txs` using the weighted index `tx_weights`, /// and tries to add it to `selected_txs`. /// @@ -168,10 +250,14 @@ fn setup_fee_weighted_index(transactions: &[VerifiedUnminedTx]) -> Option, + dependent_txs: &mut HashMap, tx_weights: WeightedIndex, - selected_txs: &mut Vec, + selected_txs: &mut Vec, + mempool_tx_deps: &TransactionDependencies, remaining_block_bytes: &mut usize, remaining_block_sigops: &mut u64, remaining_block_unpaid_actions: &mut u32, @@ -181,30 +267,124 @@ fn checked_add_transaction_weighted_random( let (new_tx_weights, candidate_tx) = choose_transaction_weighted_random(candidate_txs, tx_weights); - // > If the block template with this transaction included - // > would be within the block size limit and block sigop limit, - // > and block_unpaid_actions <= block_unpaid_action_limit, - // > add the transaction to the block template - // - // Unpaid actions are always zero for transactions that pay the conventional fee, - // so the unpaid action check always passes for those transactions. - if candidate_tx.transaction.size <= *remaining_block_bytes - && candidate_tx.legacy_sigop_count <= *remaining_block_sigops - && candidate_tx.unpaid_actions <= *remaining_block_unpaid_actions - { - selected_txs.push(candidate_tx.clone()); + if !candidate_tx.try_update_block_template_limits( + remaining_block_bytes, + remaining_block_sigops, + remaining_block_unpaid_actions, + ) { + return new_tx_weights; + } - *remaining_block_bytes -= candidate_tx.transaction.size; - *remaining_block_sigops -= candidate_tx.legacy_sigop_count; + let tx_dependencies = mempool_tx_deps.dependencies(); + let selected_tx_id = &candidate_tx.transaction.id.mined_id(); + debug_assert!( + !tx_dependencies.contains_key(selected_tx_id), + "all candidate transactions should be independent" + ); - // Unpaid actions are always zero for transactions that pay the conventional fee, - // so this limit always remains the same after they are added. - *remaining_block_unpaid_actions -= candidate_tx.unpaid_actions; + #[cfg(not(test))] + selected_txs.push(candidate_tx); + + #[cfg(test)] + selected_txs.push((0, candidate_tx)); + + // Try adding any dependent transactions if all of their dependencies have been selected. + + let mut current_level_dependents = mempool_tx_deps.direct_dependents(selected_tx_id); + while !current_level_dependents.is_empty() { + let mut next_level_dependents = HashSet::new(); + + for dependent_tx_id in ¤t_level_dependents { + // ## Note + // + // A necessary condition for adding the dependent tx is that it spends unmined outputs coming only from + // the selected txs, which come from the mempool. If the tx also spends in-chain outputs, it won't + // be added. This behavior is not specified by consensus rules and can be changed at any time, + // meaning that such txs could be added. + if has_direct_dependencies(tx_dependencies.get(dependent_tx_id), selected_txs) { + let Some(candidate_tx) = dependent_txs.remove(dependent_tx_id) else { + continue; + }; + + // Transactions that don't pay the conventional fee should not have + // the same probability of being included as their dependencies. + if !candidate_tx.pays_conventional_fee() { + continue; + } + + if !candidate_tx.try_update_block_template_limits( + remaining_block_bytes, + remaining_block_sigops, + remaining_block_unpaid_actions, + ) { + continue; + } + + #[cfg(not(test))] + selected_txs.push(candidate_tx); + + #[cfg(test)] + selected_txs.push(( + dependencies_depth(dependent_tx_id, mempool_tx_deps), + candidate_tx, + )); + + next_level_dependents.extend(mempool_tx_deps.direct_dependents(dependent_tx_id)); + } + } + + current_level_dependents = next_level_dependents; } new_tx_weights } +trait TryUpdateBlockLimits { + /// Checks if a transaction fits within the provided remaining block bytes, + /// sigops, and unpaid actions limits. + /// + /// Updates the limits and returns true if the transaction does fit, or + /// returns false otherwise. + fn try_update_block_template_limits( + &self, + remaining_block_bytes: &mut usize, + remaining_block_sigops: &mut u64, + remaining_block_unpaid_actions: &mut u32, + ) -> bool; +} + +impl TryUpdateBlockLimits for VerifiedUnminedTx { + fn try_update_block_template_limits( + &self, + remaining_block_bytes: &mut usize, + remaining_block_sigops: &mut u64, + remaining_block_unpaid_actions: &mut u32, + ) -> bool { + // > If the block template with this transaction included + // > would be within the block size limit and block sigop limit, + // > and block_unpaid_actions <= block_unpaid_action_limit, + // > add the transaction to the block template + // + // Unpaid actions are always zero for transactions that pay the conventional fee, + // so the unpaid action check always passes for those transactions. + if self.transaction.size <= *remaining_block_bytes + && self.legacy_sigop_count <= *remaining_block_sigops + && self.unpaid_actions <= *remaining_block_unpaid_actions + { + *remaining_block_bytes -= self.transaction.size; + *remaining_block_sigops -= self.legacy_sigop_count; + + // Unpaid actions are always zero for transactions that pay the conventional fee, + // so this limit always remains the same after they are added. + *remaining_block_unpaid_actions -= self.unpaid_actions; + + true + } else { + false + } + } +} + /// Choose a transaction from `transactions`, using the previously set up `weighted_index`. /// /// If some transactions have not yet been chosen, returns the weighted index and the transaction. diff --git a/zebra-rpc/src/methods/get_block_template_rpcs/zip317/tests.rs b/zebra-rpc/src/methods/get_block_template_rpcs/zip317/tests.rs new file mode 100644 index 00000000000..a132d855937 --- /dev/null +++ b/zebra-rpc/src/methods/get_block_template_rpcs/zip317/tests.rs @@ -0,0 +1,116 @@ +//! Tests for ZIP-317 transaction selection for block template production + +use zebra_chain::{ + block::Height, + parameters::Network, + transaction, + transparent::{self, OutPoint}, +}; +use zebra_node_services::mempool::TransactionDependencies; + +use super::select_mempool_transactions; + +#[test] +fn excludes_tx_with_unselected_dependencies() { + let network = Network::Mainnet; + let next_block_height = Height(1_000_000); + let miner_address = transparent::Address::from_pub_key_hash(network.kind(), [0; 20]); + let unmined_tx = network + .unmined_transactions_in_blocks(..) + .next() + .expect("should not be empty"); + + let mut mempool_tx_deps = TransactionDependencies::default(); + mempool_tx_deps.add( + unmined_tx.transaction.id.mined_id(), + vec![OutPoint::from_usize(transaction::Hash([0; 32]), 0)], + ); + + let like_zcashd = true; + let extra_coinbase_data = Vec::new(); + + assert_eq!( + select_mempool_transactions( + &network, + next_block_height, + &miner_address, + vec![unmined_tx], + mempool_tx_deps, + like_zcashd, + extra_coinbase_data, + ), + vec![], + "should not select any transactions when dependencies are unavailable" + ); +} + +#[test] +fn includes_tx_with_selected_dependencies() { + let network = Network::Mainnet; + let next_block_height = Height(1_000_000); + let miner_address = transparent::Address::from_pub_key_hash(network.kind(), [0; 20]); + let unmined_txs: Vec<_> = network.unmined_transactions_in_blocks(..).take(3).collect(); + + let dependent_tx1 = unmined_txs.first().expect("should have 3 txns"); + let dependent_tx2 = unmined_txs.get(1).expect("should have 3 txns"); + let independent_tx_id = unmined_txs + .get(2) + .expect("should have 3 txns") + .transaction + .id + .mined_id(); + + let mut mempool_tx_deps = TransactionDependencies::default(); + mempool_tx_deps.add( + dependent_tx1.transaction.id.mined_id(), + vec![OutPoint::from_usize(independent_tx_id, 0)], + ); + mempool_tx_deps.add( + dependent_tx2.transaction.id.mined_id(), + vec![ + OutPoint::from_usize(independent_tx_id, 0), + OutPoint::from_usize(transaction::Hash([0; 32]), 0), + ], + ); + + let like_zcashd = true; + let extra_coinbase_data = Vec::new(); + + let selected_txs = select_mempool_transactions( + &network, + next_block_height, + &miner_address, + unmined_txs.clone(), + mempool_tx_deps.clone(), + like_zcashd, + extra_coinbase_data, + ); + + assert_eq!( + selected_txs.len(), + 2, + "should select the independent transaction and 1 of the dependent txs, selected: {selected_txs:?}" + ); + + let selected_tx_by_id = |id| { + selected_txs + .iter() + .find(|(_, tx)| tx.transaction.id.mined_id() == id) + }; + + let (dependency_depth, _) = + selected_tx_by_id(independent_tx_id).expect("should select the independent tx"); + + assert_eq!( + *dependency_depth, 0, + "should return a dependency depth of 0 for the independent tx" + ); + + let (dependency_depth, _) = selected_tx_by_id(dependent_tx1.transaction.id.mined_id()) + .expect("should select dependent_tx1"); + + assert_eq!( + *dependency_depth, 1, + "should return a dependency depth of 1 for the dependent tx" + ); +} diff --git a/zebra-rpc/src/methods/tests/prop.rs b/zebra-rpc/src/methods/tests/prop.rs index 409a6aefe52..726ddca159a 100644 --- a/zebra-rpc/src/methods/tests/prop.rs +++ b/zebra-rpc/src/methods/tests/prop.rs @@ -424,6 +424,7 @@ proptest! { .await? .respond(mempool::Response::FullTransactions { transactions, + transaction_dependencies: Default::default(), last_seen_tip_hash: [0; 32].into(), }); diff --git a/zebra-rpc/src/methods/tests/snapshot.rs b/zebra-rpc/src/methods/tests/snapshot.rs index f4d7804088e..c0cda974ede 100644 --- a/zebra-rpc/src/methods/tests/snapshot.rs +++ b/zebra-rpc/src/methods/tests/snapshot.rs @@ -356,6 +356,7 @@ async fn test_rpc_response_data_for_network(network: &Network) { .map(|responder| { responder.respond(mempool::Response::FullTransactions { transactions: vec![], + transaction_dependencies: Default::default(), last_seen_tip_hash: blocks[blocks.len() - 1].hash(), }); }); diff --git a/zebra-rpc/src/methods/tests/snapshot/get_block_template_rpcs.rs b/zebra-rpc/src/methods/tests/snapshot/get_block_template_rpcs.rs index 8afb7dd312d..b2e012c7bcd 100644 --- a/zebra-rpc/src/methods/tests/snapshot/get_block_template_rpcs.rs +++ b/zebra-rpc/src/methods/tests/snapshot/get_block_template_rpcs.rs @@ -86,8 +86,12 @@ pub async fn test_responses( _transaction_verifier, _parameter_download_task_handle, _max_checkpoint_height, - ) = zebra_consensus::router::init(zebra_consensus::Config::default(), network, state.clone()) - .await; + ) = zebra_consensus::router::init_test( + zebra_consensus::Config::default(), + network, + state.clone(), + ) + .await; let mut mock_sync_status = MockSyncStatus::default(); mock_sync_status.set_is_close_to_tip(true); @@ -261,6 +265,7 @@ pub async fn test_responses( .await .respond(mempool::Response::FullTransactions { transactions: vec![], + transaction_dependencies: Default::default(), // tip hash needs to match chain info for long poll requests last_seen_tip_hash: fake_tip_hash, }); diff --git a/zebra-rpc/src/methods/tests/vectors.rs b/zebra-rpc/src/methods/tests/vectors.rs index 5b5a21e23d0..b82ac588d5c 100644 --- a/zebra-rpc/src/methods/tests/vectors.rs +++ b/zebra-rpc/src/methods/tests/vectors.rs @@ -921,8 +921,12 @@ async fn rpc_getblockcount() { _transaction_verifier, _parameter_download_task_handle, _max_checkpoint_height, - ) = zebra_consensus::router::init(zebra_consensus::Config::default(), &Mainnet, state.clone()) - .await; + ) = zebra_consensus::router::init_test( + zebra_consensus::Config::default(), + &Mainnet, + state.clone(), + ) + .await; // Init RPC let get_block_template_rpc = GetBlockTemplateRpcImpl::new( @@ -966,8 +970,12 @@ async fn rpc_getblockcount_empty_state() { _transaction_verifier, _parameter_download_task_handle, _max_checkpoint_height, - ) = zebra_consensus::router::init(zebra_consensus::Config::default(), &Mainnet, state.clone()) - .await; + ) = zebra_consensus::router::init_test( + zebra_consensus::Config::default(), + &Mainnet, + state.clone(), + ) + .await; // Init RPC let get_block_template_rpc = get_block_template_rpcs::GetBlockTemplateRpcImpl::new( @@ -1013,8 +1021,12 @@ async fn rpc_getpeerinfo() { _transaction_verifier, _parameter_download_task_handle, _max_checkpoint_height, - ) = zebra_consensus::router::init(zebra_consensus::Config::default(), &network, state.clone()) - .await; + ) = zebra_consensus::router::init_test( + zebra_consensus::Config::default(), + &network, + state.clone(), + ) + .await; let mock_peer_address = zebra_network::types::MetaAddr::new_initial_peer( std::net::SocketAddr::new( @@ -1083,8 +1095,12 @@ async fn rpc_getblockhash() { _transaction_verifier, _parameter_download_task_handle, _max_checkpoint_height, - ) = zebra_consensus::router::init(zebra_consensus::Config::default(), &Mainnet, state.clone()) - .await; + ) = zebra_consensus::router::init_test( + zebra_consensus::Config::default(), + &Mainnet, + state.clone(), + ) + .await; // Init RPC let get_block_template_rpc = get_block_template_rpcs::GetBlockTemplateRpcImpl::new( @@ -1348,6 +1364,7 @@ async fn rpc_getblocktemplate_mining_address(use_p2pkh: bool) { .await .respond(mempool::Response::FullTransactions { transactions, + transaction_dependencies: Default::default(), last_seen_tip_hash, }); } @@ -1569,8 +1586,12 @@ async fn rpc_submitblock_errors() { _transaction_verifier, _parameter_download_task_handle, _max_checkpoint_height, - ) = zebra_consensus::router::init(zebra_consensus::Config::default(), &Mainnet, state.clone()) - .await; + ) = zebra_consensus::router::init_test( + zebra_consensus::Config::default(), + &Mainnet, + state.clone(), + ) + .await; // Init RPC let get_block_template_rpc = GetBlockTemplateRpcImpl::new( diff --git a/zebra-test/src/mock_service.rs b/zebra-test/src/mock_service.rs index 7ab0d1f613b..cf5c2da0db7 100644 --- a/zebra-test/src/mock_service.rs +++ b/zebra-test/src/mock_service.rs @@ -43,7 +43,10 @@ use std::{ fmt::Debug, marker::PhantomData, - sync::Arc, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, task::{Context, Poll}, time::Duration, }; @@ -111,6 +114,7 @@ type ProxyItem = pub struct MockService { receiver: broadcast::Receiver>, sender: broadcast::Sender>, + poll_count: Arc, max_request_delay: Duration, _assertion_type: PhantomData, } @@ -155,6 +159,7 @@ where type Future = BoxFuture<'static, Result>; fn poll_ready(&mut self, _context: &mut Context) -> Poll> { + self.poll_count.fetch_add(1, Ordering::SeqCst); Poll::Ready(Ok(())) } @@ -271,6 +276,7 @@ impl MockServiceBuilder { MockService { receiver, sender, + poll_count: Arc::new(AtomicUsize::new(0)), max_request_delay: self.max_request_delay.unwrap_or(DEFAULT_MAX_REQUEST_DELAY), _assertion_type: PhantomData, } @@ -454,6 +460,13 @@ impl MockService usize { + self.poll_count.load(Ordering::SeqCst) + } } /// Implementation of [`MockService`] methods that use [`mod@proptest`] assertions. @@ -667,6 +680,13 @@ impl MockService usize { + self.poll_count.load(Ordering::SeqCst) + } } /// Code that is independent of the assertions used in [`MockService`]. @@ -708,6 +728,7 @@ impl Clone MockService { receiver: self.sender.subscribe(), sender: self.sender.clone(), + poll_count: self.poll_count.clone(), max_request_delay: self.max_request_delay, _assertion_type: PhantomData, } diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 887f1cc0242..2f8a1563b8a 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -179,11 +179,13 @@ impl StartCmd { .await; info!("initializing verifiers"); + let (tx_verifier_setup_tx, tx_verifier_setup_rx) = oneshot::channel(); let (block_verifier_router, tx_verifier, consensus_task_handles, max_checkpoint_height) = zebra_consensus::router::init( config.consensus.clone(), &config.network.network, state.clone(), + tx_verifier_setup_rx, ) .await; @@ -212,6 +214,10 @@ impl StartCmd { .buffer(mempool::downloads::MAX_INBOUND_CONCURRENCY) .service(mempool); + if tx_verifier_setup_tx.send(mempool.clone()).is_err() { + warn!("error setting up the transaction verifier with a handle to the mempool service"); + }; + info!("fully initializing inbound peer request handler"); // Fully start the inbound service as soon as possible let setup_data = InboundSetupData { diff --git a/zebrad/src/components/inbound/tests/fake_peer_set.rs b/zebrad/src/components/inbound/tests/fake_peer_set.rs index 3ca30c5759a..176ec8c1c57 100644 --- a/zebrad/src/components/inbound/tests/fake_peer_set.rs +++ b/zebrad/src/components/inbound/tests/fake_peer_set.rs @@ -31,8 +31,8 @@ use crate::{ components::{ inbound::{downloads::MAX_INBOUND_CONCURRENCY, Inbound, InboundSetupData}, mempool::{ - gossip_mempool_transaction_id, unmined_transactions_in_blocks, Config as MempoolConfig, - Mempool, MempoolError, SameEffectsChainRejectionError, UnboxMempoolError, + gossip_mempool_transaction_id, Config as MempoolConfig, Mempool, MempoolError, + SameEffectsChainRejectionError, UnboxMempoolError, }, sync::{self, BlockGossipError, SyncStatus, PEER_GOSSIP_DELAY}, }, @@ -785,7 +785,7 @@ async fn caches_getaddr_response() { _transaction_verifier, _groth16_download_handle, _max_checkpoint_height, - ) = zebra_consensus::router::init( + ) = zebra_consensus::router::init_test( consensus_config.clone(), &network, state_service.clone(), @@ -894,8 +894,12 @@ async fn setup( // Download task panics and timeouts are propagated to the tests that use Groth16 verifiers. let (block_verifier, _transaction_verifier, _groth16_download_handle, _max_checkpoint_height) = - zebra_consensus::router::init(consensus_config.clone(), &network, state_service.clone()) - .await; + zebra_consensus::router::init_test( + consensus_config.clone(), + &network, + state_service.clone(), + ) + .await; let mut peer_set = MockService::build() .with_max_request_delay(MAX_PEER_SET_REQUEST_DELAY) @@ -1050,14 +1054,15 @@ fn add_some_stuff_to_mempool( network: Network, ) -> Vec { // get the genesis block coinbase transaction from the Zcash blockchain. - let genesis_transactions: Vec<_> = unmined_transactions_in_blocks(..=0, &network) + let genesis_transactions: Vec<_> = network + .unmined_transactions_in_blocks(..=0) .take(1) .collect(); // Insert the genesis block coinbase transaction into the mempool storage. mempool_service .storage() - .insert(genesis_transactions[0].clone()) + .insert(genesis_transactions[0].clone(), Vec::new()) .unwrap(); genesis_transactions diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 05732ddaac2..b94ad0b09b8 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -50,6 +50,7 @@ mod crawler; pub mod downloads; mod error; pub mod gossip; +mod pending_outputs; mod queue_checker; mod storage; @@ -68,7 +69,7 @@ pub use storage::{ }; #[cfg(test)] -pub use self::{storage::tests::unmined_transactions_in_blocks, tests::UnboxMempoolError}; +pub use self::tests::UnboxMempoolError; use downloads::{ Downloads as TxDownloads, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT, @@ -132,7 +133,10 @@ impl ActiveState { } => { let mut transactions = Vec::new(); - let storage = storage.transactions().map(|tx| tx.clone().into()); + let storage = storage + .transactions() + .values() + .map(|tx| tx.transaction.clone().into()); transactions.extend(storage); let pending = tx_downloads.transaction_requests().cloned(); @@ -387,10 +391,11 @@ impl Mempool { /// Remove expired transaction ids from a given list of inserted ones. fn remove_expired_from_peer_list( send_to_peers_ids: &HashSet, - expired_transactions: &HashSet, + expired_transactions: &HashSet, ) -> HashSet { send_to_peers_ids - .difference(expired_transactions) + .iter() + .filter(|id| !expired_transactions.contains(&id.mined_id())) .copied() .collect() } @@ -585,7 +590,7 @@ impl Service for Mempool { pin!(tx_downloads.timeout(RATE_LIMIT_DELAY)).poll_next(cx) { match r { - Ok(Ok((tx, expected_tip_height))) => { + Ok(Ok((tx, spent_mempool_outpoints, expected_tip_height))) => { // # Correctness: // // It's okay to use tip height here instead of the tip hash since @@ -593,7 +598,7 @@ impl Service for Mempool { // the best chain changes (which is the only way to stay at the same height), and the // mempool re-verifies all pending tx_downloads when there's a `TipAction::Reset`. if best_tip_height == expected_tip_height { - let insert_result = storage.insert(tx.clone()); + let insert_result = storage.insert(tx.clone(), spent_mempool_outpoints); tracing::trace!( ?insert_result, @@ -612,11 +617,11 @@ impl Service for Mempool { .download_if_needed_and_verify(tx.transaction.into(), None); } } - Ok(Err((txid, error))) => { - tracing::debug!(?txid, ?error, "mempool transaction failed to verify"); + Ok(Err((tx_id, error))) => { + tracing::debug!(?tx_id, ?error, "mempool transaction failed to verify"); metrics::counter!("mempool.failed.verify.tasks.total", "reason" => error.to_string()).increment(1); - storage.reject_if_needed(txid, error); + storage.reject_if_needed(tx_id, error); } Err(_elapsed) => { // A timeout happens when the stream hangs waiting for another service, @@ -638,6 +643,7 @@ impl Service for Mempool { // with the same mined IDs as recently mined transactions. let mined_ids = block.transaction_hashes.iter().cloned().collect(); tx_downloads.cancel(&mined_ids); + storage.clear_mined_dependencies(&mined_ids); storage.reject_and_remove_same_effects(&mined_ids, block.transactions); // Clear any transaction rejections if they might have become valid after @@ -728,16 +734,32 @@ impl Service for Mempool { async move { Ok(Response::Transactions(res)) }.boxed() } + Request::AwaitOutput(outpoint) => { + trace!(?req, "got mempool request"); + + let response_fut = storage.pending_outputs.queue(outpoint); + + if let Some(output) = storage.created_output(&outpoint) { + storage.pending_outputs.respond(&outpoint, output) + } + + trace!("answered mempool request"); + + response_fut.boxed() + } + #[cfg(feature = "getblocktemplate-rpcs")] Request::FullTransactions => { trace!(?req, "got mempool request"); - let transactions: Vec<_> = storage.full_transactions().cloned().collect(); + let transactions: Vec<_> = storage.transactions().values().cloned().collect(); + let transaction_dependencies = storage.transaction_dependencies().clone(); trace!(?req, transactions_count = ?transactions.len(), "answered mempool request"); let response = Response::FullTransactions { transactions, + transaction_dependencies, last_seen_tip_hash: *last_seen_tip_hash, }; @@ -806,6 +828,13 @@ impl Service for Mempool { Request::TransactionsById(_) => Response::Transactions(Default::default()), Request::TransactionsByMinedId(_) => Response::Transactions(Default::default()), + Request::AwaitOutput(_) => { + return async move { + Err("mempool is not active: wait for Zebra to sync to the tip".into()) + } + .boxed() + } + #[cfg(feature = "getblocktemplate-rpcs")] Request::FullTransactions => { return async move { diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index eeda6bd9567..45fd44a7c05 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -47,6 +47,7 @@ use tracing_futures::Instrument; use zebra_chain::{ block::Height, transaction::{self, UnminedTxId, VerifiedUnminedTx}, + transparent, }; use zebra_consensus::transaction as tx; use zebra_network as zn; @@ -153,7 +154,11 @@ where pending: FuturesUnordered< JoinHandle< Result< - (VerifiedUnminedTx, Option), + ( + VerifiedUnminedTx, + Vec, + Option, + ), (TransactionDownloadVerifyError, UnminedTxId), >, >, @@ -173,8 +178,14 @@ where ZS: Service + Send + Clone + 'static, ZS::Future: Send, { - type Item = - Result<(VerifiedUnminedTx, Option), (UnminedTxId, TransactionDownloadVerifyError)>; + type Item = Result< + ( + VerifiedUnminedTx, + Vec, + Option, + ), + (UnminedTxId, TransactionDownloadVerifyError), + >; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let this = self.project(); @@ -189,9 +200,9 @@ where // TODO: this would be cleaner with poll_map (#2693) if let Some(join_result) = ready!(this.pending.poll_next(cx)) { match join_result.expect("transaction download and verify tasks must not panic") { - Ok((tx, tip_height)) => { + Ok((tx, spent_mempool_outpoints, tip_height)) => { this.cancel_handles.remove(&tx.transaction.id); - Poll::Ready(Some(Ok((tx, tip_height)))) + Poll::Ready(Some(Ok((tx, spent_mempool_outpoints, tip_height)))) } Err((e, hash)) => { this.cancel_handles.remove(&hash); @@ -347,8 +358,11 @@ where height: next_height, }) .map_ok(|rsp| { - (rsp.into_mempool_transaction() - .expect("unexpected non-mempool response to mempool request"), tip_height) + let tx::Response::Mempool { transaction, spent_mempool_outpoints } = rsp else { + panic!("unexpected non-mempool response to mempool request") + }; + + (transaction, spent_mempool_outpoints, tip_height) }) .await; @@ -357,12 +371,12 @@ where result.map_err(|e| TransactionDownloadVerifyError::Invalid(e.into())) } - .map_ok(|(tx, tip_height)| { + .map_ok(|(tx, spent_mempool_outpoints, tip_height)| { metrics::counter!( "mempool.verified.transactions.total", "version" => format!("{}", tx.transaction.transaction.version()), ).increment(1); - (tx, tip_height) + (tx, spent_mempool_outpoints, tip_height) }) // Tack the hash onto the error so we can remove the cancel handle // on failure as well as on success. @@ -387,6 +401,7 @@ where }; // Send the result to responder channel if one was provided. + // TODO: Wait until transactions are added to the verified set before sending an Ok to `rsp_tx`. if let Some(rsp_tx) = rsp_tx { let _ = rsp_tx.send( result diff --git a/zebrad/src/components/mempool/pending_outputs.rs b/zebrad/src/components/mempool/pending_outputs.rs new file mode 100644 index 00000000000..495613019cc --- /dev/null +++ b/zebrad/src/components/mempool/pending_outputs.rs @@ -0,0 +1,65 @@ +//! Pending [`transparent::Output`] tracker for [`AwaitOutput` requests](zebra_node_services::mempool::Request::AwaitOutput). + +use std::{collections::HashMap, future::Future}; + +use tokio::sync::broadcast; + +use tower::BoxError; +use zebra_chain::transparent; + +use zebra_node_services::mempool::Response; + +/// Pending [`transparent::Output`] tracker for handling the mempool's +/// [`AwaitOutput` requests](zebra_node_services::mempool::Request::AwaitOutput). +#[derive(Debug, Default)] +pub struct PendingOutputs(HashMap>); + +impl PendingOutputs { + /// Returns a future that will resolve to the `transparent::Output` pointed + /// to by the given `transparent::OutPoint` when it is available. + pub fn queue( + &mut self, + outpoint: transparent::OutPoint, + ) -> impl Future> { + let mut receiver = self + .0 + .entry(outpoint) + .or_insert_with(|| { + let (sender, _) = broadcast::channel(1); + sender + }) + .subscribe(); + + async move { + receiver + .recv() + .await + .map(Response::UnspentOutput) + .map_err(BoxError::from) + } + } + + /// Notify all requests waiting for the [`transparent::Output`] pointed to by + /// the given [`transparent::OutPoint`] that the [`transparent::Output`] has + /// arrived. + #[inline] + pub fn respond(&mut self, outpoint: &transparent::OutPoint, output: transparent::Output) { + if let Some(sender) = self.0.remove(outpoint) { + // Adding the outpoint as a field lets us cross-reference + // with the trace of the verification that made the request. + tracing::trace!(?outpoint, "found pending mempool output"); + let _ = sender.send(output); + } + } + + /// Scan the set of waiting Output requests for channels where all receivers + /// have been dropped and remove the corresponding sender. + pub fn prune(&mut self) { + self.0.retain(|_, chan| chan.receiver_count() > 0); + } + + /// Clears the inner [`HashMap`] of queued pending output requests. + pub fn clear(&mut self) { + self.0.clear(); + } +} diff --git a/zebrad/src/components/mempool/storage.rs b/zebrad/src/components/mempool/storage.rs index d380efb84aa..ce6f09cf1d6 100644 --- a/zebrad/src/components/mempool/storage.rs +++ b/zebrad/src/components/mempool/storage.rs @@ -16,12 +16,17 @@ use std::{ use thiserror::Error; -use zebra_chain::transaction::{ - self, Hash, Transaction, UnminedTx, UnminedTxId, VerifiedUnminedTx, +use zebra_chain::{ + transaction::{self, Hash, Transaction, UnminedTx, UnminedTxId, VerifiedUnminedTx}, + transparent, }; +use zebra_node_services::mempool::TransactionDependencies; use self::{eviction_list::EvictionList, verified_set::VerifiedSet}; -use super::{config, downloads::TransactionDownloadVerifyError, MempoolError}; +use super::{ + config, downloads::TransactionDownloadVerifyError, pending_outputs::PendingOutputs, + MempoolError, +}; #[cfg(any(test, feature = "proptest-impl"))] use proptest_derive::Arbitrary; @@ -67,6 +72,12 @@ pub enum SameEffectsTipRejectionError { its inputs" )] SpendConflict, + + #[error( + "transaction rejected because it spends missing outputs from \ + another transaction in the mempool" + )] + MissingOutput, } /// Transactions rejected based only on their effects (spends, outputs, transaction header). @@ -116,6 +127,9 @@ pub struct Storage { /// The set of verified transactions in the mempool. verified: VerifiedSet, + /// The set of outpoints with pending requests for their associated transparent::Output. + pub(super) pending_outputs: PendingOutputs, + /// The set of transactions rejected due to bad authorizations, or for other /// reasons, and their rejection reasons. These rejections only apply to the /// current tip. @@ -165,6 +179,7 @@ impl Storage { tx_cost_limit: config.tx_cost_limit, eviction_memory_time: config.eviction_memory_time, verified: Default::default(), + pending_outputs: Default::default(), tip_rejected_exact: Default::default(), tip_rejected_same_effects: Default::default(), chain_rejected_same_effects: Default::default(), @@ -173,6 +188,10 @@ impl Storage { /// Insert a [`VerifiedUnminedTx`] into the mempool, caching any rejections. /// + /// Accepts the [`VerifiedUnminedTx`] being inserted and `spent_mempool_outpoints`, + /// a list of transparent inputs of the provided [`VerifiedUnminedTx`] that were found + /// as newly created transparent outputs in the mempool during transaction verification. + /// /// Returns an error if the mempool's verified transactions or rejection caches /// prevent this transaction from being inserted. /// These errors should not be propagated to peers, because the transactions are valid. @@ -180,14 +199,19 @@ impl Storage { /// If inserting this transaction evicts other transactions, they will be tracked /// as [`SameEffectsChainRejectionError::RandomlyEvicted`]. #[allow(clippy::unwrap_in_result)] - pub fn insert(&mut self, tx: VerifiedUnminedTx) -> Result { + pub fn insert( + &mut self, + tx: VerifiedUnminedTx, + spent_mempool_outpoints: Vec, + ) -> Result { // # Security // // This method must call `reject`, rather than modifying the rejection lists directly. - let tx_id = tx.transaction.id; + let unmined_tx_id = tx.transaction.id; + let tx_id = unmined_tx_id.mined_id(); // First, check if we have a cached rejection for this transaction. - if let Some(error) = self.rejection_error(&tx_id) { + if let Some(error) = self.rejection_error(&unmined_tx_id) { tracing::trace!( ?tx_id, ?error, @@ -213,8 +237,11 @@ impl Storage { } // Then, we try to insert into the pool. If this fails the transaction is rejected. - let mut result = Ok(tx_id); - if let Err(rejection_error) = self.verified.insert(tx) { + let mut result = Ok(unmined_tx_id); + if let Err(rejection_error) = + self.verified + .insert(tx, spent_mempool_outpoints, &mut self.pending_outputs) + { tracing::debug!( ?tx_id, ?rejection_error, @@ -223,7 +250,7 @@ impl Storage { ); // We could return here, but we still want to check the mempool size - self.reject(tx_id, rejection_error.clone().into()); + self.reject(unmined_tx_id, rejection_error.clone().into()); result = Err(rejection_error.into()); } @@ -256,8 +283,7 @@ impl Storage { ); // If this transaction gets evicted, set its result to the same error - // (we could return here, but we still want to check the mempool size) - if victim_tx.transaction.id == tx_id { + if victim_tx.transaction.id == unmined_tx_id { result = Err(SameEffectsChainRejectionError::RandomlyEvicted.into()); } } @@ -285,6 +311,11 @@ impl Storage { .remove_all_that(|tx| exact_wtxids.contains(&tx.transaction.id)) } + /// Clears a list of mined transaction ids from the verified set's tracked transaction dependencies. + pub fn clear_mined_dependencies(&mut self, mined_ids: &HashSet) { + self.verified.clear_mined_dependencies(mined_ids); + } + /// Reject and remove transactions from the mempool via non-malleable [`transaction::Hash`]. /// - For v5 transactions, transactions are matched by TXID, /// using only the non-malleable transaction ID. @@ -293,6 +324,7 @@ impl Storage { /// - Returns the number of transactions which were removed. /// - Removes from the 'verified' set, if present. /// Maintains the order in which the other unmined transactions have been inserted into the mempool. + /// - Prunes `pending_outputs` of any closed channels. /// /// Reject and remove transactions from the mempool that contain any spent outpoints or revealed /// nullifiers from the passed in `transactions`. @@ -327,23 +359,21 @@ impl Storage { let duplicate_spend_ids: HashSet<_> = self .verified .transactions() - .filter_map(|tx| { - (tx.transaction - .spent_outpoints() + .values() + .map(|tx| (tx.transaction.id, &tx.transaction.transaction)) + .filter_map(|(tx_id, tx)| { + (tx.spent_outpoints() .any(|outpoint| spent_outpoints.contains(&outpoint)) || tx - .transaction .sprout_nullifiers() .any(|nullifier| sprout_nullifiers.contains(nullifier)) || tx - .transaction .sapling_nullifiers() .any(|nullifier| sapling_nullifiers.contains(nullifier)) || tx - .transaction .orchard_nullifiers() .any(|nullifier| orchard_nullifiers.contains(nullifier))) - .then_some(tx.id) + .then_some(tx_id) }) .collect(); @@ -367,6 +397,8 @@ impl Storage { ); } + self.pending_outputs.prune(); + num_removed_mined + num_removed_duplicate_spend } @@ -375,6 +407,7 @@ impl Storage { pub fn clear(&mut self) { self.verified.clear(); self.tip_rejected_exact.clear(); + self.pending_outputs.clear(); self.tip_rejected_same_effects.clear(); self.chain_rejected_same_effects.clear(); self.update_rejected_metrics(); @@ -407,24 +440,26 @@ impl Storage { /// Returns the set of [`UnminedTxId`]s in the mempool. pub fn tx_ids(&self) -> impl Iterator + '_ { - self.verified.transactions().map(|tx| tx.id) - } - - /// Returns an iterator over the [`UnminedTx`]s in the mempool. - // - // TODO: make the transactions() method return VerifiedUnminedTx, - // and remove the full_transactions() method - pub fn transactions(&self) -> impl Iterator { - self.verified.transactions() + self.transactions().values().map(|tx| tx.transaction.id) } - /// Returns an iterator over the [`VerifiedUnminedTx`] in the set. + /// Returns a reference to the [`HashMap`] of [`VerifiedUnminedTx`]s in the verified set. /// /// Each [`VerifiedUnminedTx`] contains an [`UnminedTx`], /// and adds extra fields from the transaction verifier result. - #[allow(dead_code)] - pub fn full_transactions(&self) -> impl Iterator + '_ { - self.verified.full_transactions() + pub fn transactions(&self) -> &HashMap { + self.verified.transactions() + } + + /// Returns a reference to the [`TransactionDependencies`] in the verified set. + pub fn transaction_dependencies(&self) -> &TransactionDependencies { + self.verified.transaction_dependencies() + } + + /// Returns a [`transparent::Output`] created by a mempool transaction for the provided + /// [`transparent::OutPoint`] if one exists, or None otherwise. + pub fn created_output(&self, outpoint: &transparent::OutPoint) -> Option { + self.verified.created_output(outpoint) } /// Returns the number of transactions in the mempool. @@ -455,9 +490,11 @@ impl Storage { &self, tx_ids: HashSet, ) -> impl Iterator { - self.verified - .transactions() - .filter(move |tx| tx_ids.contains(&tx.id)) + tx_ids.into_iter().filter_map(|tx_id| { + self.transactions() + .get(&tx_id.mined_id()) + .map(|tx| &tx.transaction) + }) } /// Returns the set of [`UnminedTx`]es with matching [`transaction::Hash`]es @@ -471,7 +508,9 @@ impl Storage { ) -> impl Iterator { self.verified .transactions() - .filter(move |tx| tx_ids.contains(&tx.id.mined_id())) + .iter() + .filter(move |(tx_id, _)| tx_ids.contains(tx_id)) + .map(|(_, tx)| &tx.transaction) } /// Returns `true` if a transaction exactly matching an [`UnminedTxId`] is in @@ -479,8 +518,8 @@ impl Storage { /// /// This matches the exact transaction, with identical blockchain effects, /// signatures, and proofs. - pub fn contains_transaction_exact(&self, txid: &UnminedTxId) -> bool { - self.verified.transactions().any(|tx| &tx.id == txid) + pub fn contains_transaction_exact(&self, tx_id: &transaction::Hash) -> bool { + self.verified.contains(tx_id) } /// Returns the number of rejected [`UnminedTxId`]s or [`transaction::Hash`]es. @@ -498,13 +537,13 @@ impl Storage { } /// Add a transaction to the rejected list for the given reason. - pub fn reject(&mut self, txid: UnminedTxId, reason: RejectionError) { + pub fn reject(&mut self, tx_id: UnminedTxId, reason: RejectionError) { match reason { RejectionError::ExactTip(e) => { - self.tip_rejected_exact.insert(txid, e); + self.tip_rejected_exact.insert(tx_id, e); } RejectionError::SameEffectsTip(e) => { - self.tip_rejected_same_effects.insert(txid.mined_id(), e); + self.tip_rejected_same_effects.insert(tx_id.mined_id(), e); } RejectionError::SameEffectsChain(e) => { let eviction_memory_time = self.eviction_memory_time; @@ -513,7 +552,7 @@ impl Storage { .or_insert_with(|| { EvictionList::new(MAX_EVICTION_MEMORY_ENTRIES, eviction_memory_time) }) - .insert(txid.mined_id()); + .insert(tx_id.mined_id()); } } self.limit_rejection_list_memory(); @@ -565,7 +604,7 @@ impl Storage { /// Add a transaction that failed download and verification to the rejected list /// if needed, depending on the reason for the failure. - pub fn reject_if_needed(&mut self, txid: UnminedTxId, e: TransactionDownloadVerifyError) { + pub fn reject_if_needed(&mut self, tx_id: UnminedTxId, e: TransactionDownloadVerifyError) { match e { // Rejecting a transaction already in state would speed up further // download attempts without checking the state. However it would @@ -588,7 +627,7 @@ impl Storage { // Consensus verification failed. Reject transaction to avoid // having to download and verify it again just for it to fail again. TransactionDownloadVerifyError::Invalid(e) => { - self.reject(txid, ExactTipRejectionError::FailedVerification(e).into()) + self.reject(tx_id, ExactTipRejectionError::FailedVerification(e).into()) } } } @@ -605,31 +644,32 @@ impl Storage { pub fn remove_expired_transactions( &mut self, tip_height: zebra_chain::block::Height, - ) -> HashSet { - let mut txid_set = HashSet::new(); - // we need a separate set, since reject() takes the original unmined ID, - // then extracts the mined ID out of it - let mut unmined_id_set = HashSet::new(); - - for t in self.transactions() { - if let Some(expiry_height) = t.transaction.expiry_height() { + ) -> HashSet { + let mut tx_ids = HashSet::new(); + + for (&tx_id, tx) in self.transactions() { + if let Some(expiry_height) = tx.transaction.transaction.expiry_height() { if tip_height >= expiry_height { - txid_set.insert(t.id.mined_id()); - unmined_id_set.insert(t.id); + tx_ids.insert(tx_id); } } } // expiry height is effecting data, so we match by non-malleable TXID self.verified - .remove_all_that(|tx| txid_set.contains(&tx.transaction.id.mined_id())); + .remove_all_that(|tx| tx_ids.contains(&tx.transaction.id.mined_id())); // also reject it - for id in unmined_id_set.iter() { - self.reject(*id, SameEffectsChainRejectionError::Expired.into()); + for &id in &tx_ids { + self.reject( + // It's okay to omit the auth digest here as we know that `reject()` will always + // use mined ids for `SameEffectsChainRejectionError`s. + UnminedTxId::Legacy(id), + SameEffectsChainRejectionError::Expired.into(), + ); } - unmined_id_set + tx_ids } /// Check if transaction should be downloaded and/or verified. @@ -638,7 +678,7 @@ impl Storage { /// then it shouldn't be downloaded/verified. pub fn should_download_or_verify(&mut self, txid: UnminedTxId) -> Result<(), MempoolError> { // Check if the transaction is already in the mempool. - if self.contains_transaction_exact(&txid) { + if self.contains_transaction_exact(&txid.mined_id()) { return Err(MempoolError::InMempool); } if let Some(error) = self.rejection_error(&txid) { diff --git a/zebrad/src/components/mempool/storage/tests.rs b/zebrad/src/components/mempool/storage/tests.rs index e47808a3860..197b706d2a4 100644 --- a/zebrad/src/components/mempool/storage/tests.rs +++ b/zebrad/src/components/mempool/storage/tests.rs @@ -1,45 +1,4 @@ -//! Tests and test utility functions for mempool storage. - -use std::ops::RangeBounds; - -use zebra_chain::{ - amount::Amount, - block::Block, - parameters::Network, - serialization::ZcashDeserializeInto, - transaction::{UnminedTx, VerifiedUnminedTx}, -}; +//! Tests for mempool storage. mod prop; mod vectors; - -pub fn unmined_transactions_in_blocks( - block_height_range: impl RangeBounds, - network: &Network, -) -> impl DoubleEndedIterator { - let blocks = network.block_iter(); - - // Deserialize the blocks that are selected based on the specified `block_height_range`. - let selected_blocks = blocks - .filter(move |(&height, _)| block_height_range.contains(&height)) - .map(|(_, block)| { - block - .zcash_deserialize_into::() - .expect("block test vector is structurally valid") - }); - - // Extract the transactions from the blocks and wrap each one as an unmined transaction. - // Use a fake zero miner fee and sigops, because we don't have the UTXOs to calculate - // the correct fee. - selected_blocks - .flat_map(|block| block.transactions) - .map(UnminedTx::from) - .map(|transaction| { - VerifiedUnminedTx::new( - transaction, - Amount::try_from(1_000_000).expect("invalid value"), - 0, - ) - .expect("verification should pass") - }) -} diff --git a/zebrad/src/components/mempool/storage/tests/prop.rs b/zebrad/src/components/mempool/storage/tests/prop.rs index eca65935acb..398ba0925f9 100644 --- a/zebrad/src/components/mempool/storage/tests/prop.rs +++ b/zebrad/src/components/mempool/storage/tests/prop.rs @@ -72,7 +72,7 @@ proptest! { for (transaction_to_accept, transaction_to_reject) in input_permutations { let id_to_accept = transaction_to_accept.transaction.id; - prop_assert_eq!(storage.insert(transaction_to_accept), Ok(id_to_accept)); + prop_assert_eq!(storage.insert(transaction_to_accept, Vec::new()), Ok(id_to_accept)); // Make unique IDs by converting the index to bytes, and writing it to each ID let unique_ids = (0..MAX_EVICTION_MEMORY_ENTRIES as u32).map(move |index| { @@ -96,7 +96,7 @@ proptest! { // - transaction_to_accept, or // - a rejection from rejections prop_assert_eq!( - storage.insert(transaction_to_reject), + storage.insert(transaction_to_reject, Vec::new()), Err(MempoolError::StorageEffectsTip(SameEffectsTipRejectionError::SpendConflict)) ); @@ -147,13 +147,13 @@ proptest! { if i < transactions.len() - 1 { // The initial transactions should be successful prop_assert_eq!( - storage.insert(transaction.clone()), + storage.insert(transaction.clone(), Vec::new()), Ok(tx_id) ); } else { // The final transaction will cause a random eviction, // which might return an error if this transaction is chosen - let result = storage.insert(transaction.clone()); + let result = storage.insert(transaction.clone(), Vec::new()); if result.is_ok() { prop_assert_eq!( @@ -281,10 +281,10 @@ proptest! { let id_to_accept = transaction_to_accept.transaction.id; let id_to_reject = transaction_to_reject.transaction.id; - prop_assert_eq!(storage.insert(transaction_to_accept), Ok(id_to_accept)); + prop_assert_eq!(storage.insert(transaction_to_accept, Vec::new()), Ok(id_to_accept)); prop_assert_eq!( - storage.insert(transaction_to_reject), + storage.insert(transaction_to_reject, Vec::new()), Err(MempoolError::StorageEffectsTip(SameEffectsTipRejectionError::SpendConflict)) ); @@ -332,19 +332,19 @@ proptest! { let id_to_reject = transaction_to_reject.transaction.id; prop_assert_eq!( - storage.insert(first_transaction_to_accept), + storage.insert(first_transaction_to_accept, Vec::new()), Ok(first_id_to_accept) ); prop_assert_eq!( - storage.insert(transaction_to_reject), + storage.insert(transaction_to_reject, Vec::new()), Err(MempoolError::StorageEffectsTip(SameEffectsTipRejectionError::SpendConflict)) ); prop_assert!(storage.contains_rejected(&id_to_reject)); prop_assert_eq!( - storage.insert(second_transaction_to_accept), + storage.insert(second_transaction_to_accept, Vec::new()), Ok(second_id_to_accept) ); @@ -371,13 +371,13 @@ proptest! { .filter_map(|transaction| { let id = transaction.transaction.id; - storage.insert(transaction.clone()).ok().map(|_| id) + storage.insert(transaction.clone(), Vec::new()).ok().map(|_| id) }) .collect(); // Check that the inserted transactions are still there. for transaction_id in &inserted_transactions { - prop_assert!(storage.contains_transaction_exact(transaction_id)); + prop_assert!(storage.contains_transaction_exact(&transaction_id.mined_id())); } // Remove some transactions. @@ -399,14 +399,14 @@ proptest! { let removed_transactions = input.removed_transaction_ids(); for removed_transaction_id in &removed_transactions { - prop_assert!(!storage.contains_transaction_exact(removed_transaction_id)); + prop_assert!(!storage.contains_transaction_exact(&removed_transaction_id.mined_id())); } // Check that the remaining transactions are still in the storage. let remaining_transactions = inserted_transactions.difference(&removed_transactions); for remaining_transaction_id in remaining_transactions { - prop_assert!(storage.contains_transaction_exact(remaining_transaction_id)); + prop_assert!(storage.contains_transaction_exact(&remaining_transaction_id.mined_id())); } } } diff --git a/zebrad/src/components/mempool/storage/tests/vectors.rs b/zebrad/src/components/mempool/storage/tests/vectors.rs index 5b60c133e95..30ce35bb832 100644 --- a/zebrad/src/components/mempool/storage/tests/vectors.rs +++ b/zebrad/src/components/mempool/storage/tests/vectors.rs @@ -4,15 +4,14 @@ use std::iter; use color_eyre::eyre::Result; +use transparent::OutPoint; use zebra_chain::{ amount::Amount, block::{Block, Height}, parameters::Network, }; -use crate::components::mempool::{ - storage::tests::unmined_transactions_in_blocks, storage::*, Mempool, -}; +use crate::components::mempool::{storage::*, Mempool}; /// Eviction memory time used for tests. Most tests won't care about this /// so we use a large enough value that will never be reached in the tests. @@ -35,22 +34,23 @@ fn mempool_storage_crud_exact_mainnet() { }); // Get one (1) unmined transaction - let unmined_tx = unmined_transactions_in_blocks(.., &network) + let unmined_tx = network + .unmined_transactions_in_blocks(..) .next() .expect("at least one unmined transaction"); // Insert unmined tx into the mempool. - let _ = storage.insert(unmined_tx.clone()); + let _ = storage.insert(unmined_tx.clone(), Vec::new()); // Check that it is in the mempool, and not rejected. - assert!(storage.contains_transaction_exact(&unmined_tx.transaction.id)); + assert!(storage.contains_transaction_exact(&unmined_tx.transaction.id.mined_id())); // Remove tx let removal_count = storage.remove_exact(&iter::once(unmined_tx.transaction.id).collect()); // Check that it is /not/ in the mempool. assert_eq!(removal_count, 1); - assert!(!storage.contains_transaction_exact(&unmined_tx.transaction.id)); + assert!(!storage.contains_transaction_exact(&unmined_tx.transaction.id.mined_id())); } #[test] @@ -69,7 +69,7 @@ fn mempool_storage_basic() -> Result<()> { fn mempool_storage_basic_for_network(network: Network) -> Result<()> { // Get transactions from the first 10 blocks of the Zcash blockchain - let unmined_transactions: Vec<_> = unmined_transactions_in_blocks(..=10, &network).collect(); + let unmined_transactions: Vec<_> = network.unmined_transactions_in_blocks(..=10).collect(); assert!( MEMPOOL_TX_COUNT < unmined_transactions.len(), @@ -94,7 +94,7 @@ fn mempool_storage_basic_for_network(network: Network) -> Result<()> { let mut maybe_inserted_transactions = Vec::new(); let mut some_rejected_transactions = Vec::new(); for unmined_transaction in unmined_transactions.clone() { - let result = storage.insert(unmined_transaction.clone()); + let result = storage.insert(unmined_transaction.clone(), Vec::new()); match result { Ok(_) => { // While the transaction was inserted here, it can be rejected later. @@ -124,7 +124,7 @@ fn mempool_storage_basic_for_network(network: Network) -> Result<()> { // Test if rejected transactions were actually rejected. for tx in some_rejected_transactions.iter() { - assert!(!storage.contains_transaction_exact(&tx.transaction.id)); + assert!(!storage.contains_transaction_exact(&tx.transaction.id.mined_id())); } // Query all the ids we have for rejected, get back `total - MEMPOOL_SIZE` @@ -162,15 +162,16 @@ fn mempool_storage_crud_same_effects_mainnet() { }); // Get one (1) unmined transaction - let unmined_tx_1 = unmined_transactions_in_blocks(.., &network) + let unmined_tx_1 = network + .unmined_transactions_in_blocks(..) .next() .expect("at least one unmined transaction"); // Insert unmined tx into the mempool. - let _ = storage.insert(unmined_tx_1.clone()); + let _ = storage.insert(unmined_tx_1.clone(), Vec::new()); // Check that it is in the mempool, and not rejected. - assert!(storage.contains_transaction_exact(&unmined_tx_1.transaction.id)); + assert!(storage.contains_transaction_exact(&unmined_tx_1.transaction.id.mined_id())); // Reject and remove mined tx let removal_count = storage.reject_and_remove_same_effects( @@ -180,7 +181,7 @@ fn mempool_storage_crud_same_effects_mainnet() { // Check that it is /not/ in the mempool as a verified transaction. assert_eq!(removal_count, 1); - assert!(!storage.contains_transaction_exact(&unmined_tx_1.transaction.id)); + assert!(!storage.contains_transaction_exact(&unmined_tx_1.transaction.id.mined_id())); // Check that it's rejection is cached in the chain_rejected_same_effects' `Mined` eviction list. assert_eq!( @@ -188,12 +189,13 @@ fn mempool_storage_crud_same_effects_mainnet() { Some(SameEffectsChainRejectionError::Mined.into()) ); assert_eq!( - storage.insert(unmined_tx_1), + storage.insert(unmined_tx_1, Vec::new()), Err(SameEffectsChainRejectionError::Mined.into()) ); // Get a different unmined transaction - let unmined_tx_2 = unmined_transactions_in_blocks(1.., &network) + let unmined_tx_2 = network + .unmined_transactions_in_blocks(1..) .find(|tx| { tx.transaction .transaction @@ -205,12 +207,12 @@ fn mempool_storage_crud_same_effects_mainnet() { // Insert unmined tx into the mempool. assert_eq!( - storage.insert(unmined_tx_2.clone()), + storage.insert(unmined_tx_2.clone(), Vec::new()), Ok(unmined_tx_2.transaction.id) ); // Check that it is in the mempool, and not rejected. - assert!(storage.contains_transaction_exact(&unmined_tx_2.transaction.id)); + assert!(storage.contains_transaction_exact(&unmined_tx_2.transaction.id.mined_id())); // Reject and remove duplicate spend tx let removal_count = storage.reject_and_remove_same_effects( @@ -220,7 +222,7 @@ fn mempool_storage_crud_same_effects_mainnet() { // Check that it is /not/ in the mempool as a verified transaction. assert_eq!(removal_count, 1); - assert!(!storage.contains_transaction_exact(&unmined_tx_2.transaction.id)); + assert!(!storage.contains_transaction_exact(&unmined_tx_2.transaction.id.mined_id())); // Check that it's rejection is cached in the chain_rejected_same_effects' `SpendConflict` eviction list. assert_eq!( @@ -228,7 +230,7 @@ fn mempool_storage_crud_same_effects_mainnet() { Some(SameEffectsChainRejectionError::DuplicateSpend.into()) ); assert_eq!( - storage.insert(unmined_tx_2), + storage.insert(unmined_tx_2, Vec::new()), Err(SameEffectsChainRejectionError::DuplicateSpend.into()) ); } @@ -269,6 +271,7 @@ fn mempool_expired_basic_for_network(network: Network) -> Result<()> { 0, ) .expect("verification should pass"), + Vec::new(), )?; assert_eq!(storage.transaction_count(), 1); @@ -280,7 +283,7 @@ fn mempool_expired_basic_for_network(network: Network) -> Result<()> { // remove_expired_transactions() will return what was removed let expired = storage.remove_expired_transactions(Height(1)); - assert!(expired.contains(&tx_id)); + assert!(expired.contains(&tx_id.mined_id())); let everything_in_mempool: HashSet = storage.tx_ids().collect(); assert_eq!(everything_in_mempool.len(), 0); @@ -290,3 +293,95 @@ fn mempool_expired_basic_for_network(network: Network) -> Result<()> { Ok(()) } + +/// Check that the transaction dependencies are updated when transactions with spent mempool outputs +/// are inserted into storage, and that the `Storage.remove()` method also removes any transactions +/// that directly or indirectly spend outputs of a removed transaction. +#[test] +fn mempool_removes_dependent_transactions() -> Result<()> { + let network = Network::Mainnet; + + // Create an empty storage + let mut storage: Storage = Storage::new(&config::Config { + tx_cost_limit: 160_000_000, + eviction_memory_time: EVICTION_MEMORY_TIME, + ..Default::default() + }); + + let unmined_txs_with_transparent_outputs = || { + network + .unmined_transactions_in_blocks(..) + .filter(|tx| !tx.transaction.transaction.outputs().is_empty()) + }; + + let mut fake_spent_outpoints: Vec = Vec::new(); + let mut expected_transaction_dependencies = HashMap::new(); + let mut expected_transaction_dependents = HashMap::new(); + for unmined_tx in unmined_txs_with_transparent_outputs() { + let tx_id = unmined_tx.transaction.id.mined_id(); + let num_outputs = unmined_tx.transaction.transaction.outputs().len(); + + if let Some(&fake_spent_outpoint) = fake_spent_outpoints.first() { + expected_transaction_dependencies + .insert(tx_id, [fake_spent_outpoint.hash].into_iter().collect()); + expected_transaction_dependents + .insert(fake_spent_outpoint.hash, [tx_id].into_iter().collect()); + } + + storage + .insert(unmined_tx.clone(), fake_spent_outpoints) + .expect("should insert transaction"); + + // Add up to 5 of this transaction's outputs as fake spent outpoints for the next transaction + fake_spent_outpoints = (0..num_outputs.min(5)) + .map(|i| OutPoint::from_usize(tx_id, i)) + .collect(); + } + + assert_eq!( + storage.transaction_dependencies().dependencies().len(), + unmined_txs_with_transparent_outputs() + .count() + .checked_sub(1) + .expect("at least one unmined transaction with transparent outputs"), + "should have an entry all inserted txns except the first one" + ); + + assert_eq!( + storage.transaction_dependencies().dependencies(), + &expected_transaction_dependencies, + "should have expected transaction dependencies" + ); + + assert_eq!( + storage.transaction_dependencies().dependents(), + &expected_transaction_dependents, + "should have expected transaction dependents" + ); + + // Remove the first transaction and check that everything in storage is emptied. + let first_tx = unmined_txs_with_transparent_outputs() + .next() + .expect("at least one unmined transaction with transparent outputs"); + + let expected_num_removed = storage.transaction_count(); + let num_removed = storage.remove_exact(&[first_tx.transaction.id].into_iter().collect()); + + assert_eq!( + num_removed, expected_num_removed, + "remove_exact should total storage transaction count" + ); + + assert!( + storage.transaction_dependencies().dependencies().is_empty(), + "tx deps should be empty" + ); + + assert_eq!( + storage.transaction_count(), + 0, + "verified set should be empty" + ); + + Ok(()) +} diff --git a/zebrad/src/components/mempool/storage/verified_set.rs b/zebrad/src/components/mempool/storage/verified_set.rs index a9c850b4ef8..7cd82fb0be4 100644 --- a/zebrad/src/components/mempool/storage/verified_set.rs +++ b/zebrad/src/components/mempool/storage/verified_set.rs @@ -2,15 +2,18 @@ use std::{ borrow::Cow, - collections::{HashSet, VecDeque}, + collections::{HashMap, HashSet}, hash::Hash, }; use zebra_chain::{ orchard, sapling, sprout, - transaction::{Transaction, UnminedTx, UnminedTxId, VerifiedUnminedTx}, + transaction::{self, UnminedTx, VerifiedUnminedTx}, transparent, }; +use zebra_node_services::mempool::TransactionDependencies; + +use crate::components::mempool::pending_outputs::PendingOutputs; use super::super::SameEffectsTipRejectionError; @@ -23,6 +26,8 @@ use zebra_chain::transaction::MEMPOOL_TRANSACTION_COST_THRESHOLD; /// This also caches the all the spent outputs from the transactions in the mempool. The spent /// outputs include: /// +/// - the dependencies of transactions that spent the outputs of other transactions in the mempool +/// - the outputs of transactions in the mempool /// - the transparent outpoints spent by transactions in the mempool /// - the Sprout nullifiers revealed by transactions in the mempool /// - the Sapling nullifiers revealed by transactions in the mempool @@ -30,7 +35,16 @@ use zebra_chain::transaction::MEMPOOL_TRANSACTION_COST_THRESHOLD; #[derive(Default)] pub struct VerifiedSet { /// The set of verified transactions in the mempool. - transactions: VecDeque, + transactions: HashMap, + + /// A map of dependencies between transactions in the mempool that + /// spend or create outputs of other transactions in the mempool. + transaction_dependencies: TransactionDependencies, + + /// The [`transparent::Output`]s created by verified transactions in the mempool. + /// + /// These outputs may be spent by other transactions in the mempool. + created_outputs: HashMap, /// The total size of the transactions in the mempool if they were /// serialized. @@ -60,20 +74,20 @@ impl Drop for VerifiedSet { } impl VerifiedSet { - /// Returns an iterator over the [`UnminedTx`] in the set. - // - // TODO: make the transactions() method return VerifiedUnminedTx, - // and remove the full_transactions() method - pub fn transactions(&self) -> impl Iterator + '_ { - self.transactions.iter().map(|tx| &tx.transaction) + /// Returns a reference to the [`HashMap`] of [`VerifiedUnminedTx`]s in the set. + pub fn transactions(&self) -> &HashMap { + &self.transactions } - /// Returns an iterator over the [`VerifiedUnminedTx`] in the set. - /// - /// Each [`VerifiedUnminedTx`] contains an [`UnminedTx`], - /// and adds extra fields from the transaction verifier result. - pub fn full_transactions(&self) -> impl Iterator + '_ { - self.transactions.iter() + /// Returns a reference to the [`TransactionDependencies`] in the set. + pub fn transaction_dependencies(&self) -> &TransactionDependencies { + &self.transaction_dependencies + } + + /// Returns a [`transparent::Output`] created by a mempool transaction for the provided + /// [`transparent::OutPoint`] if one exists, or None otherwise. + pub fn created_output(&self, outpoint: &transparent::OutPoint) -> Option { + self.created_outputs.get(outpoint).cloned() } /// Returns the number of verified transactions in the set. @@ -97,9 +111,9 @@ impl VerifiedSet { } /// Returns `true` if the set of verified transactions contains the transaction with the - /// specified [`UnminedTxId`]. - pub fn contains(&self, id: &UnminedTxId) -> bool { - self.transactions.iter().any(|tx| &tx.transaction.id == id) + /// specified [`transaction::Hash`]. + pub fn contains(&self, id: &transaction::Hash) -> bool { + self.transactions.contains_key(id) } /// Clear the set of verified transactions. @@ -107,10 +121,12 @@ impl VerifiedSet { /// Also clears all internal caches. pub fn clear(&mut self) { self.transactions.clear(); + self.transaction_dependencies.clear(); self.spent_outpoints.clear(); self.sprout_nullifiers.clear(); self.sapling_nullifiers.clear(); self.orchard_nullifiers.clear(); + self.created_outputs.clear(); self.transactions_serialized_size = 0; self.total_cost = 0; self.update_metrics(); @@ -126,22 +142,49 @@ impl VerifiedSet { pub fn insert( &mut self, transaction: VerifiedUnminedTx, + spent_mempool_outpoints: Vec, + pending_outputs: &mut PendingOutputs, ) -> Result<(), SameEffectsTipRejectionError> { if self.has_spend_conflicts(&transaction.transaction) { return Err(SameEffectsTipRejectionError::SpendConflict); } - self.cache_outputs_from(&transaction.transaction.transaction); + // This likely only needs to check that the transaction hash of the outpoint is still in the mempool, + // but it's likely rare that a transaction spends multiple transparent outputs of + // a single transaction in practice. + for outpoint in &spent_mempool_outpoints { + if !self.created_outputs.contains_key(outpoint) { + return Err(SameEffectsTipRejectionError::MissingOutput); + } + } + + let tx_id = transaction.transaction.id.mined_id(); + self.transaction_dependencies + .add(tx_id, spent_mempool_outpoints); + + // Inserts the transaction's outputs into the internal caches and responds to pending output requests. + let tx = &transaction.transaction.transaction; + for (index, output) in tx.outputs().iter().cloned().enumerate() { + let outpoint = transparent::OutPoint::from_usize(tx_id, index); + self.created_outputs.insert(outpoint, output.clone()); + pending_outputs.respond(&outpoint, output) + } + self.spent_outpoints.extend(tx.spent_outpoints()); + self.sprout_nullifiers.extend(tx.sprout_nullifiers()); + self.sapling_nullifiers.extend(tx.sapling_nullifiers()); + self.orchard_nullifiers.extend(tx.orchard_nullifiers()); + self.transactions_serialized_size += transaction.transaction.size; self.total_cost += transaction.cost(); - self.transactions.push_front(transaction); + self.transactions.insert(tx_id, transaction); self.update_metrics(); Ok(()) } - /// Evict one transaction from the set, returns the victim transaction. + /// Evict one transaction and any transactions that directly or indirectly depend on + /// its outputs from the set, returns the victim transaction and any dependent transactions. /// /// Removes a transaction with probability in direct proportion to the /// eviction weight, as per [ZIP-401]. @@ -159,72 +202,90 @@ impl VerifiedSet { /// to 20,000 (mempooltxcostlimit/min(cost)), so the actual cost shouldn't /// be too bad. /// + /// This function is equivalent to `EvictTransaction` in [ZIP-401]. + /// /// [ZIP-401]: https://zips.z.cash/zip-0401 #[allow(clippy::unwrap_in_result)] pub fn evict_one(&mut self) -> Option { - if self.transactions.is_empty() { - None - } else { - use rand::distributions::{Distribution, WeightedIndex}; - use rand::prelude::thread_rng; - - let weights: Vec = self - .transactions - .iter() - .map(|tx| tx.clone().eviction_weight()) - .collect(); - - let dist = WeightedIndex::new(weights) - .expect("there is at least one weight, all weights are non-negative, and the total is positive"); - - Some(self.remove(dist.sample(&mut thread_rng()))) - } + use rand::distributions::{Distribution, WeightedIndex}; + use rand::prelude::thread_rng; + + let (keys, weights): (Vec, Vec) = self + .transactions + .iter() + .map(|(&tx_id, tx)| (tx_id, tx.eviction_weight())) + .unzip(); + + let dist = WeightedIndex::new(weights).expect( + "there is at least one weight, all weights are non-negative, and the total is positive", + ); + + let key_to_remove = keys + .get(dist.sample(&mut thread_rng())) + .expect("should have a key at every index in the distribution"); + + // Removes the randomly selected transaction and all of its dependents from the set, + // then returns just the randomly selected transaction + self.remove(key_to_remove).pop() + } + + /// Clears a list of mined transaction ids from the lists of dependencies for + /// any other transactions in the mempool and removes their dependents. + pub fn clear_mined_dependencies(&mut self, mined_ids: &HashSet) { + self.transaction_dependencies + .clear_mined_dependencies(mined_ids); } /// Removes all transactions in the set that match the `predicate`. /// /// Returns the amount of transactions removed. pub fn remove_all_that(&mut self, predicate: impl Fn(&VerifiedUnminedTx) -> bool) -> usize { - // Clippy suggests to remove the `collect` and the `into_iter` further down. However, it is - // unable to detect that when that is done, there is a borrow conflict. What happens is the - // iterator borrows `self.transactions` immutably, but it also need to be borrowed mutably - // in order to remove the transactions while traversing the iterator. - #[allow(clippy::needless_collect)] - let indices_to_remove: Vec<_> = self + let keys_to_remove: Vec<_> = self .transactions .iter() - .enumerate() - .filter(|(_, tx)| predicate(tx)) - .map(|(index, _)| index) + .filter_map(|(&tx_id, tx)| predicate(tx).then_some(tx_id)) .collect(); - let removed_count = indices_to_remove.len(); + let mut removed_count = 0; - // Correctness: remove indexes in reverse order, - // so earlier indexes still correspond to the same transactions - for index_to_remove in indices_to_remove.into_iter().rev() { - self.remove(index_to_remove); + for key_to_remove in keys_to_remove { + removed_count += self.remove(&key_to_remove).len(); } removed_count } - /// Removes a transaction from the set. + /// Accepts a transaction id for a transaction to remove from the verified set. /// - /// Also removes its outputs from the internal caches. - fn remove(&mut self, transaction_index: usize) -> VerifiedUnminedTx { - let removed_tx = self - .transactions - .remove(transaction_index) - .expect("invalid transaction index"); - - self.transactions_serialized_size -= removed_tx.transaction.size; - self.total_cost -= removed_tx.cost(); - self.remove_outputs(&removed_tx.transaction); + /// Removes the transaction and any transactions that directly or indirectly + /// depend on it from the set. + /// + /// Returns a list of transactions that have been removed with the target transaction + /// as the last item. + /// + /// Also removes the outputs of any removed transactions from the internal caches. + fn remove(&mut self, key_to_remove: &transaction::Hash) -> Vec { + let removed_transactions: Vec<_> = self + .transaction_dependencies + .remove_all(key_to_remove) + .iter() + .chain(std::iter::once(key_to_remove)) + .map(|key_to_remove| { + let removed_tx = self + .transactions + .remove(key_to_remove) + .expect("invalid transaction key"); + + self.transactions_serialized_size -= removed_tx.transaction.size; + self.total_cost -= removed_tx.cost(); + self.remove_outputs(&removed_tx.transaction); + + removed_tx + }) + .collect(); self.update_metrics(); - - removed_tx + removed_transactions } /// Returns `true` if the given `transaction` has any spend conflicts with transactions in the @@ -241,18 +302,18 @@ impl VerifiedSet { || Self::has_conflicts(&self.orchard_nullifiers, tx.orchard_nullifiers().copied()) } - /// Inserts the transaction's outputs into the internal caches. - fn cache_outputs_from(&mut self, tx: &Transaction) { - self.spent_outpoints.extend(tx.spent_outpoints()); - self.sprout_nullifiers.extend(tx.sprout_nullifiers()); - self.sapling_nullifiers.extend(tx.sapling_nullifiers()); - self.orchard_nullifiers.extend(tx.orchard_nullifiers()); - } - /// Removes the tracked transaction outputs from the mempool. fn remove_outputs(&mut self, unmined_tx: &UnminedTx) { let tx = &unmined_tx.transaction; + for index in 0..tx.outputs().len() { + self.created_outputs + .remove(&transparent::OutPoint::from_usize( + unmined_tx.id.mined_id(), + index, + )); + } + let spent_outpoints = tx.spent_outpoints().map(Cow::Owned); let sprout_nullifiers = tx.sprout_nullifiers().map(Cow::Borrowed); let sapling_nullifiers = tx.sapling_nullifiers().map(Cow::Borrowed); @@ -308,7 +369,7 @@ impl VerifiedSet { let mut size_with_weight_gt2 = 0; let mut size_with_weight_gt3 = 0; - for entry in self.full_transactions() { + for entry in self.transactions().values() { paid_actions += entry.conventional_actions - entry.unpaid_actions; if entry.fee_weight_ratio > 3.0 { diff --git a/zebrad/src/components/mempool/tests/prop.rs b/zebrad/src/components/mempool/tests/prop.rs index 9f05b79d567..e12b205e34c 100644 --- a/zebrad/src/components/mempool/tests/prop.rs +++ b/zebrad/src/components/mempool/tests/prop.rs @@ -74,7 +74,7 @@ proptest! { // Insert a dummy transaction. mempool .storage() - .insert(transaction.0) + .insert(transaction.0, Vec::new()) .expect("Inserting a transaction should succeed"); // The first call to `poll_ready` shouldn't clear the storage yet. @@ -148,7 +148,7 @@ proptest! { // Insert the dummy transaction into the mempool. mempool .storage() - .insert(transaction.0.clone()) + .insert(transaction.0.clone(), Vec::new()) .expect("Inserting a transaction should succeed"); // Set the new chain tip. @@ -205,7 +205,7 @@ proptest! { // Insert a dummy transaction. mempool .storage() - .insert(transaction) + .insert(transaction, Vec::new()) .expect("Inserting a transaction should succeed"); // The first call to `poll_ready` shouldn't clear the storage yet. diff --git a/zebrad/src/components/mempool/tests/vector.rs b/zebrad/src/components/mempool/tests/vector.rs index 86848c8bae7..1b87097aaf1 100644 --- a/zebrad/src/components/mempool/tests/vector.rs +++ b/zebrad/src/components/mempool/tests/vector.rs @@ -1,6 +1,6 @@ //! Fixed test vectors for the mempool. -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use color_eyre::Report; use tokio::time::{self, timeout}; @@ -8,7 +8,7 @@ use tower::{ServiceBuilder, ServiceExt}; use zebra_chain::{ amount::Amount, block::Block, fmt::humantime_seconds, parameters::Network, - serialization::ZcashDeserializeInto, transaction::VerifiedUnminedTx, + serialization::ZcashDeserializeInto, transaction::VerifiedUnminedTx, transparent::OutPoint, }; use zebra_consensus::transaction as tx; use zebra_state::{Config as StateConfig, CHAIN_TIP_UPDATE_WAIT_LIMIT}; @@ -42,7 +42,7 @@ async fn mempool_service_basic_single() -> Result<(), Report> { let network = Network::Mainnet; // get the genesis block transactions from the Zcash blockchain. - let mut unmined_transactions = unmined_transactions_in_blocks(1..=10, &network); + let mut unmined_transactions = network.unmined_transactions_in_blocks(1..=10); let genesis_transaction = unmined_transactions .next() .expect("Missing genesis transaction"); @@ -61,7 +61,9 @@ async fn mempool_service_basic_single() -> Result<(), Report> { // Insert the genesis block coinbase transaction into the mempool storage. let mut inserted_ids = HashSet::new(); - service.storage().insert(genesis_transaction.clone())?; + service + .storage() + .insert(genesis_transaction.clone(), Vec::new())?; inserted_ids.insert(genesis_transaction.transaction.id); // Test `Request::TransactionIds` @@ -131,7 +133,7 @@ async fn mempool_service_basic_single() -> Result<(), Report> { inserted_ids.insert(tx.transaction.id); // Error must be ignored because a insert can trigger an eviction and // an error is returned if the transaction being inserted in chosen. - let _ = service.storage().insert(tx.clone()); + let _ = service.storage().insert(tx.clone(), Vec::new()); } // Test `Request::RejectedTransactionIds` @@ -185,7 +187,7 @@ async fn mempool_queue_single() -> Result<(), Report> { let network = Network::Mainnet; // Get transactions to use in the test - let unmined_transactions = unmined_transactions_in_blocks(1..=10, &network); + let unmined_transactions = network.unmined_transactions_in_blocks(1..=10); let mut transactions = unmined_transactions.collect::>(); // Split unmined_transactions into: // [transactions..., new_tx] @@ -212,7 +214,7 @@ async fn mempool_queue_single() -> Result<(), Report> { for tx in transactions.iter() { // Error must be ignored because a insert can trigger an eviction and // an error is returned if the transaction being inserted in chosen. - let _ = service.storage().insert(tx.clone()); + let _ = service.storage().insert(tx.clone(), Vec::new()); } // Test `Request::Queue` for a new transaction @@ -278,7 +280,7 @@ async fn mempool_service_disabled() -> Result<(), Report> { setup(&network, u64::MAX, true).await; // get the genesis block transactions from the Zcash blockchain. - let mut unmined_transactions = unmined_transactions_in_blocks(1..=10, &network); + let mut unmined_transactions = network.unmined_transactions_in_blocks(1..=10); let genesis_transaction = unmined_transactions .next() .expect("Missing genesis transaction"); @@ -293,7 +295,9 @@ async fn mempool_service_disabled() -> Result<(), Report> { assert!(service.is_enabled()); // Insert the genesis block coinbase transaction into the mempool storage. - service.storage().insert(genesis_transaction.clone())?; + service + .storage() + .insert(genesis_transaction.clone(), Vec::new())?; // Test if the mempool answers correctly (i.e. is enabled) let response = service @@ -614,7 +618,7 @@ async fn mempool_failed_verification_is_rejected() -> Result<(), Report> { ) = setup(&network, u64::MAX, true).await; // Get transactions to use in the test - let mut unmined_transactions = unmined_transactions_in_blocks(1..=2, &network); + let mut unmined_transactions = network.unmined_transactions_in_blocks(1..=2); let rejected_tx = unmined_transactions.next().unwrap().clone(); // Enable the mempool @@ -689,7 +693,7 @@ async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> { ) = setup(&network, u64::MAX, true).await; // Get transactions to use in the test - let mut unmined_transactions = unmined_transactions_in_blocks(1..=2, &network); + let mut unmined_transactions = network.unmined_transactions_in_blocks(1..=2); let rejected_valid_tx = unmined_transactions.next().unwrap().clone(); // Enable the mempool @@ -918,6 +922,129 @@ async fn mempool_reverifies_after_tip_change() -> Result<(), Report> { Ok(()) } +/// Checks that the mempool service responds to AwaitOutput requests after verifying transactions +/// that create those outputs, or immediately if the outputs had been created by transaction that +/// are already in the mempool. +#[tokio::test(flavor = "multi_thread")] +async fn mempool_responds_to_await_output() -> Result<(), Report> { + let network = Network::Mainnet; + + let ( + mut mempool, + _peer_set, + _state_service, + _chain_tip_change, + mut tx_verifier, + mut recent_syncs, + ) = setup(&network, u64::MAX, true).await; + mempool.enable(&mut recent_syncs).await; + + let verified_unmined_tx = network + .unmined_transactions_in_blocks(1..=10) + .find(|tx| !tx.transaction.transaction.outputs().is_empty()) + .expect("should have at least 1 tx with transparent outputs"); + + let unmined_tx = verified_unmined_tx.transaction.clone(); + let output_index = 0; + let outpoint = OutPoint::from_usize(unmined_tx.id.mined_id(), output_index); + let expected_output = unmined_tx + .transaction + .outputs() + .get(output_index) + .expect("already checked that tx has outputs") + .clone(); + + // Call mempool with an AwaitOutput request + + let request = Request::AwaitOutput(outpoint); + let await_output_response_fut = mempool.ready().await.unwrap().call(request); + + // Queue the transaction with the pending output to be added to the mempool + + let request = Request::Queue(vec![Gossip::Tx(unmined_tx)]); + let queue_response_fut = mempool.ready().await.unwrap().call(request); + let mock_verify_tx_fut = tx_verifier.expect_request_that(|_| true).map(|responder| { + responder.respond(transaction::Response::Mempool { + transaction: verified_unmined_tx, + spent_mempool_outpoints: Vec::new(), + }); + }); + + let (response, _) = futures::join!(queue_response_fut, mock_verify_tx_fut); + let Response::Queued(mut results) = response.expect("response should be Ok") else { + panic!("wrong response from mempool to Queued request"); + }; + + let result_rx = results.remove(0).expect("should pass initial checks"); + assert!(results.is_empty(), "should have 1 result for 1 queued tx"); + + tokio::time::timeout(Duration::from_secs(10), result_rx) + .await + .expect("should not time out") + .expect("mempool tx verification result channel should not be closed") + .expect("mocked verification should be successful"); + + // Wait for next steps in mempool's Downloads to finish + // TODO: Move this and the `ready().await` below above waiting for the mempool verification result above after + // waiting to respond with a transaction's verification result until after it's been inserted into the mempool. + tokio::time::sleep(Duration::from_secs(1)).await; + + mempool + .ready() + .await + .expect("polling mempool should succeed"); + + assert_eq!( + mempool.storage().transaction_count(), + 1, + "should have 1 transaction in mempool's verified set" + ); + + assert_eq!( + mempool.storage().created_output(&outpoint), + Some(expected_output.clone()), + "created output should match expected output" + ); + + // Check that the AwaitOutput request has been responded to after the relevant tx was added to the verified set + + let response_fut = tokio::time::timeout(Duration::from_secs(30), await_output_response_fut); + let response = response_fut + .await + .expect("should not time out") + .expect("should not return RecvError"); + + let Response::UnspentOutput(response) = response else { + panic!("wrong response from mempool to AwaitOutput request"); + }; + + assert_eq!( + response, expected_output, + "AwaitOutput response should match expected output" + ); + + // Check that the mempool responds to AwaitOutput requests correctly when the outpoint is already in its `created_outputs` collection too. + + let request = Request::AwaitOutput(outpoint); + let await_output_response_fut = mempool.ready().await.unwrap().call(request); + let response_fut = tokio::time::timeout(Duration::from_secs(30), await_output_response_fut); + let response = response_fut + .await + .expect("should not time out") + .expect("should not return RecvError"); + + let Response::UnspentOutput(response) = response else { + panic!("wrong response from mempool to AwaitOutput request"); + }; + + assert_eq!( + response, expected_output, + "AwaitOutput response should match expected output" + ); + + Ok(()) +} + /// Create a new [`Mempool`] instance using mocked services. async fn setup( network: &Network, @@ -943,7 +1070,7 @@ async fn setup( let (sync_status, recent_syncs) = SyncStatus::new(); - let (mempool, _mempool_transaction_receiver) = Mempool::new( + let (mempool, mut mempool_transaction_receiver) = Mempool::new( &mempool::Config { tx_cost_limit, ..Default::default() @@ -956,6 +1083,8 @@ async fn setup( chain_tip_change.clone(), ); + tokio::spawn(async move { while mempool_transaction_receiver.recv().await.is_ok() {} }); + if should_commit_genesis_block { let genesis_block: Arc = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES .zcash_deserialize_into() diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index b0949cd336d..1a8cefbe0b2 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -2914,7 +2914,8 @@ async fn validate_regtest_genesis_block() { _transaction_verifier, _parameter_download_task_handle, _max_checkpoint_height, - ) = zebra_consensus::router::init(zebra_consensus::Config::default(), &network, state).await; + ) = zebra_consensus::router::init_test(zebra_consensus::Config::default(), &network, state) + .await; let genesis_hash = block_verifier_router .oneshot(zebra_consensus::Request::Commit(regtest_genesis_block())) @@ -3314,8 +3315,12 @@ async fn nu6_funding_streams_and_coinbase_balance() -> Result<()> { _transaction_verifier, _parameter_download_task_handle, _max_checkpoint_height, - ) = zebra_consensus::router::init(zebra_consensus::Config::default(), &network, state.clone()) - .await; + ) = zebra_consensus::router::init_test( + zebra_consensus::Config::default(), + &network, + state.clone(), + ) + .await; tracing::info!("started state service and block verifier, committing Regtest genesis block"); @@ -3348,6 +3353,7 @@ async fn nu6_funding_streams_and_coinbase_balance() -> Result<()> { .await .respond(mempool::Response::FullTransactions { transactions: vec![], + transaction_dependencies: Default::default(), // tip hash needs to match chain info for long poll requests last_seen_tip_hash: genesis_hash, });