diff --git a/Cargo.lock b/Cargo.lock index 5f041cac303..1b9ebc42aad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6320,6 +6320,7 @@ dependencies = [ "rand 0.8.5", "rand_chacha 0.3.1", "rand_core 0.6.3", + "rayon", "redjubjub", "ripemd", "secp256k1", @@ -6497,6 +6498,7 @@ dependencies = [ "once_cell", "proptest", "proptest-derive", + "rayon", "regex", "rlimit", "rocksdb", diff --git a/tower-batch/src/worker.rs b/tower-batch/src/worker.rs index 2613f2ea30c..f2266e67100 100644 --- a/tower-batch/src/worker.rs +++ b/tower-batch/src/worker.rs @@ -99,7 +99,7 @@ where { /// Creates a new batch worker. /// - /// See [`Service::new()`](crate::Service::new) for details. + /// See [`Batch::new()`](crate::Batch::new) for details. pub(crate) fn new( service: T, rx: mpsc::UnboundedReceiver>, @@ -190,7 +190,7 @@ where /// Run loop for batch requests, which implements the batch policies. /// - /// See [`Service::new()`](crate::Service::new) for details. + /// See [`Batch::new()`](crate::Batch::new) for details. pub async fn run(mut self) { loop { // Wait on either a new message or the batch timer. diff --git a/zebra-chain/Cargo.toml b/zebra-chain/Cargo.toml index 548ca83ed7a..6a31534c0a8 100644 --- a/zebra-chain/Cargo.toml +++ b/zebra-chain/Cargo.toml @@ -66,6 +66,7 @@ serde-big-array = "0.4.1" # Processing futures = "0.3.21" itertools = "0.10.3" +rayon = "1.5.3" # ZF deps ed25519-zebra = "3.0.0" diff --git a/zebra-chain/src/block/arbitrary.rs b/zebra-chain/src/block/arbitrary.rs index d34fdd4963d..be078394936 100644 --- a/zebra-chain/src/block/arbitrary.rs +++ b/zebra-chain/src/block/arbitrary.rs @@ -449,6 +449,9 @@ impl Block { // would be awkward since the genesis block is handled separatedly there. // This forces us to skip the genesis block here too in order to able to use // this to test the finalized state. + // + // TODO: run note commitment tree updates in parallel rayon threads, + // using `NoteCommitmentTrees::update_trees_parallel()` if generate_valid_commitments && *height != Height(0) { for sapling_note_commitment in transaction.sapling_note_commitments() { sapling_tree.append(*sapling_note_commitment).unwrap(); diff --git a/zebra-chain/src/lib.rs b/zebra-chain/src/lib.rs index 755cd48c9cf..a0bd3da12e5 100644 --- a/zebra-chain/src/lib.rs +++ b/zebra-chain/src/lib.rs @@ -21,6 +21,7 @@ pub mod chain_tip; pub mod fmt; pub mod history_tree; pub mod orchard; +pub mod parallel; pub mod parameters; pub mod primitives; pub mod sapling; diff --git a/zebra-chain/src/orchard/tree.rs b/zebra-chain/src/orchard/tree.rs index cb7f9e28035..1ca12267ea4 100644 --- a/zebra-chain/src/orchard/tree.rs +++ b/zebra-chain/src/orchard/tree.rs @@ -11,7 +11,6 @@ //! A root of a note commitment tree is associated with each treestate. #![allow(clippy::derive_hash_xor_eq)] -#![allow(dead_code)] use std::{ fmt, @@ -34,6 +33,11 @@ use crate::serialization::{ serde_helpers, ReadZcashExt, SerializationError, ZcashDeserialize, ZcashSerialize, }; +/// The type that is used to update the note commitment tree. +/// +/// Unfortunately, this is not the same as `orchard::NoteCommitment`. +pub type NoteCommitmentUpdate = pallas::Base; + pub(super) const MERKLE_DEPTH: usize = 32; /// MerkleCRH^Orchard Hash Function @@ -248,8 +252,8 @@ impl<'de> serde::Deserialize<'de> for Node { } } -#[allow(dead_code, missing_docs)] -#[derive(Error, Debug, Clone, PartialEq, Eq)] +#[derive(Error, Copy, Clone, Debug, Eq, PartialEq, Hash)] +#[allow(missing_docs)] pub enum NoteCommitmentTreeError { #[error("The note commitment tree is full")] FullTree, @@ -302,7 +306,7 @@ impl NoteCommitmentTree { /// /// Returns an error if the tree is full. #[allow(clippy::unwrap_in_result)] - pub fn append(&mut self, cm_x: pallas::Base) -> Result<(), NoteCommitmentTreeError> { + pub fn append(&mut self, cm_x: NoteCommitmentUpdate) -> Result<(), NoteCommitmentTreeError> { if self.inner.append(&cm_x.into()) { // Invalidate cached root let cached_root = self diff --git a/zebra-chain/src/parallel.rs b/zebra-chain/src/parallel.rs new file mode 100644 index 00000000000..0a2dcffd720 --- /dev/null +++ b/zebra-chain/src/parallel.rs @@ -0,0 +1,3 @@ +//! Parallel chain update methods. + +pub mod tree; diff --git a/zebra-chain/src/parallel/tree.rs b/zebra-chain/src/parallel/tree.rs new file mode 100644 index 00000000000..265c3db9b11 --- /dev/null +++ b/zebra-chain/src/parallel/tree.rs @@ -0,0 +1,195 @@ +//! Parallel note commitment tree update methods. + +use std::{collections::BTreeMap, sync::Arc}; + +use thiserror::Error; + +use crate::{ + block::{Block, Height}, + orchard, sapling, sprout, +}; + +/// An argument wrapper struct for note commitment trees. +#[derive(Clone, Debug)] +pub struct NoteCommitmentTrees { + /// The sprout note commitment tree. + pub sprout: Arc, + + /// The sapling note commitment tree. + pub sapling: Arc, + + /// The orchard note commitment tree. + pub orchard: Arc, +} + +/// Note commitment tree errors. +#[derive(Error, Copy, Clone, Debug, Eq, PartialEq, Hash)] +pub enum NoteCommitmentTreeError { + /// A sprout tree error + #[error("sprout error: {0}")] + Sprout(#[from] sprout::tree::NoteCommitmentTreeError), + + /// A sapling tree error + #[error("sapling error: {0}")] + Sapling(#[from] sapling::tree::NoteCommitmentTreeError), + + /// A orchard tree error + #[error("orchard error: {0}")] + Orchard(#[from] orchard::tree::NoteCommitmentTreeError), +} + +impl NoteCommitmentTrees { + /// Updates the note commitment trees using the transactions in `block`, + /// then re-calculates the cached tree roots, using parallel `rayon` threads. + /// + /// If any of the tree updates cause an error, + /// it will be returned at the end of the parallel batches. + #[allow(clippy::unwrap_in_result)] + pub fn update_trees_parallel( + &mut self, + block: &Arc, + ) -> Result<(), NoteCommitmentTreeError> { + self.update_trees_parallel_list( + [( + block + .coinbase_height() + .expect("height was already validated"), + block.clone(), + )] + .into_iter() + .collect(), + ) + } + + /// Updates the note commitment trees using the transactions in `block`, + /// then re-calculates the cached tree roots, using parallel `rayon` threads. + /// + /// If any of the tree updates cause an error, + /// it will be returned at the end of the parallel batches. + pub fn update_trees_parallel_list( + &mut self, + block_list: BTreeMap>, + ) -> Result<(), NoteCommitmentTreeError> { + // Prepare arguments for parallel threads + let NoteCommitmentTrees { + sprout, + sapling, + orchard, + } = self.clone(); + + let sprout_note_commitments: Vec<_> = block_list + .values() + .flat_map(|block| block.transactions.iter()) + .flat_map(|tx| tx.sprout_note_commitments()) + .cloned() + .collect(); + let sapling_note_commitments: Vec<_> = block_list + .values() + .flat_map(|block| block.transactions.iter()) + .flat_map(|tx| tx.sapling_note_commitments()) + .cloned() + .collect(); + let orchard_note_commitments: Vec<_> = block_list + .values() + .flat_map(|block| block.transactions.iter()) + .flat_map(|tx| tx.orchard_note_commitments()) + .cloned() + .collect(); + + let mut sprout_result = None; + let mut sapling_result = None; + let mut orchard_result = None; + + rayon::in_place_scope_fifo(|scope| { + if !sprout_note_commitments.is_empty() { + scope.spawn_fifo(|_scope| { + sprout_result = Some(Self::update_sprout_note_commitment_tree( + sprout, + sprout_note_commitments, + )); + }); + } + + if !sapling_note_commitments.is_empty() { + scope.spawn_fifo(|_scope| { + sapling_result = Some(Self::update_sapling_note_commitment_tree( + sapling, + sapling_note_commitments, + )); + }); + } + + if !orchard_note_commitments.is_empty() { + scope.spawn_fifo(|_scope| { + orchard_result = Some(Self::update_orchard_note_commitment_tree( + orchard, + orchard_note_commitments, + )); + }); + } + }); + + if let Some(sprout_result) = sprout_result { + self.sprout = sprout_result?; + } + if let Some(sapling_result) = sapling_result { + self.sapling = sapling_result?; + } + if let Some(orchard_result) = orchard_result { + self.orchard = orchard_result?; + } + + Ok(()) + } + + /// Update the sprout note commitment tree. + fn update_sprout_note_commitment_tree( + mut sprout: Arc, + sprout_note_commitments: Vec, + ) -> Result, NoteCommitmentTreeError> { + let sprout_nct = Arc::make_mut(&mut sprout); + + for sprout_note_commitment in sprout_note_commitments { + sprout_nct.append(sprout_note_commitment)?; + } + + // Re-calculate and cache the tree root. + let _ = sprout_nct.root(); + + Ok(sprout) + } + + /// Update the sapling note commitment tree. + fn update_sapling_note_commitment_tree( + mut sapling: Arc, + sapling_note_commitments: Vec, + ) -> Result, NoteCommitmentTreeError> { + let sapling_nct = Arc::make_mut(&mut sapling); + + for sapling_note_commitment in sapling_note_commitments { + sapling_nct.append(sapling_note_commitment)?; + } + + // Re-calculate and cache the tree root. + let _ = sapling_nct.root(); + + Ok(sapling) + } + + /// Update the orchard note commitment tree. + fn update_orchard_note_commitment_tree( + mut orchard: Arc, + orchard_note_commitments: Vec, + ) -> Result, NoteCommitmentTreeError> { + let orchard_nct = Arc::make_mut(&mut orchard); + + for orchard_note_commitment in orchard_note_commitments { + orchard_nct.append(orchard_note_commitment)?; + } + + // Re-calculate and cache the tree root. + let _ = orchard_nct.root(); + + Ok(orchard) + } +} diff --git a/zebra-chain/src/sapling/tree.rs b/zebra-chain/src/sapling/tree.rs index 90d8af1b5ba..d366fc7eae9 100644 --- a/zebra-chain/src/sapling/tree.rs +++ b/zebra-chain/src/sapling/tree.rs @@ -10,8 +10,6 @@ //! //! A root of a note commitment tree is associated with each treestate. -#![allow(dead_code)] - use std::{ fmt, hash::{Hash, Hasher}, @@ -38,6 +36,11 @@ use crate::serialization::{ serde_helpers, ReadZcashExt, SerializationError, ZcashDeserialize, ZcashSerialize, }; +/// The type that is used to update the note commitment tree. +/// +/// Unfortunately, this is not the same as `sapling::NoteCommitment`. +pub type NoteCommitmentUpdate = jubjub::Fq; + pub(super) const MERKLE_DEPTH: usize = 32; /// MerkleCRH^Sapling Hash Function @@ -252,8 +255,8 @@ impl<'de> serde::Deserialize<'de> for Node { } } -#[allow(dead_code, missing_docs)] -#[derive(Error, Debug, Clone, PartialEq, Eq)] +#[derive(Error, Copy, Clone, Debug, Eq, PartialEq, Hash)] +#[allow(missing_docs)] pub enum NoteCommitmentTreeError { #[error("The note commitment tree is full")] FullTree, @@ -307,7 +310,7 @@ impl NoteCommitmentTree { /// /// Returns an error if the tree is full. #[allow(clippy::unwrap_in_result)] - pub fn append(&mut self, cm_u: jubjub::Fq) -> Result<(), NoteCommitmentTreeError> { + pub fn append(&mut self, cm_u: NoteCommitmentUpdate) -> Result<(), NoteCommitmentTreeError> { if self.inner.append(&cm_u.into()) { // Invalidate cached root let cached_root = self diff --git a/zebra-chain/src/sprout.rs b/zebra-chain/src/sprout.rs index 478c5d1f5cd..a99caa48ebe 100644 --- a/zebra-chain/src/sprout.rs +++ b/zebra-chain/src/sprout.rs @@ -14,6 +14,7 @@ pub mod keys; pub mod note; pub mod tree; +pub use commitment::NoteCommitment; pub use joinsplit::JoinSplit; pub use joinsplit::RandomSeed; pub use note::{EncryptedNote, Note, Nullifier}; diff --git a/zebra-chain/src/sprout/tree.rs b/zebra-chain/src/sprout/tree.rs index cb99cc0dbb8..ed76b00f974 100644 --- a/zebra-chain/src/sprout/tree.rs +++ b/zebra-chain/src/sprout/tree.rs @@ -180,8 +180,8 @@ impl<'de> serde::Deserialize<'de> for Node { } } -#[allow(dead_code, missing_docs)] -#[derive(Error, Debug, Clone, PartialEq, Eq)] +#[derive(Error, Copy, Clone, Debug, Eq, PartialEq, Hash)] +#[allow(missing_docs)] pub enum NoteCommitmentTreeError { #[error("the note commitment tree is full")] FullTree, diff --git a/zebra-state/Cargo.toml b/zebra-state/Cargo.toml index 4937fed1be2..32b4f978e57 100644 --- a/zebra-state/Cargo.toml +++ b/zebra-state/Cargo.toml @@ -19,8 +19,6 @@ itertools = "0.10.3" lazy_static = "1.4.0" metrics = "0.18.1" mset = "0.1.0" -proptest = { version = "0.10.1", optional = true } -proptest-derive = { version = "0.3.0", optional = true } regex = "1.5.6" rlimit = "0.8.3" rocksdb = { version = "0.18.0", default_features = false, features = ["lz4"] } @@ -28,6 +26,7 @@ serde = { version = "1.0.137", features = ["serde_derive"] } tempfile = "3.3.0" thiserror = "1.0.31" +rayon = "1.5.3" tokio = { version = "1.20.0", features = ["sync", "tracing"] } tower = { version = "0.4.13", features = ["buffer", "util"] } tracing = "0.1.31" @@ -35,6 +34,9 @@ tracing = "0.1.31" zebra-chain = { path = "../zebra-chain" } zebra-test = { path = "../zebra-test/", optional = true } +proptest = { version = "0.10.1", optional = true } +proptest-derive = { version = "0.3.0", optional = true } + [dev-dependencies] color-eyre = "0.6.1" once_cell = "1.12.0" diff --git a/zebra-state/src/error.rs b/zebra-state/src/error.rs index 4b5169ff26c..4a08edb1786 100644 --- a/zebra-state/src/error.rs +++ b/zebra-state/src/error.rs @@ -1,3 +1,5 @@ +//! Error types for Zebra's state. + use std::sync::Arc; use chrono::{DateTime, Utc}; @@ -214,14 +216,8 @@ pub enum ValidateContextError { height: Option, }, - #[error("error in Sprout note commitment tree")] - SproutNoteCommitmentTreeError(#[from] zebra_chain::sprout::tree::NoteCommitmentTreeError), - - #[error("error in Sapling note commitment tree")] - SaplingNoteCommitmentTreeError(#[from] zebra_chain::sapling::tree::NoteCommitmentTreeError), - - #[error("error in Orchard note commitment tree")] - OrchardNoteCommitmentTreeError(#[from] zebra_chain::orchard::tree::NoteCommitmentTreeError), + #[error("error updating a note commitment tree")] + NoteCommitmentTreeError(#[from] zebra_chain::parallel::tree::NoteCommitmentTreeError), #[error("error building the history tree")] HistoryTreeError(#[from] HistoryTreeError), @@ -229,17 +225,41 @@ pub enum ValidateContextError { #[error("block contains an invalid commitment")] InvalidBlockCommitment(#[from] block::CommitmentError), - #[error("unknown Sprout anchor: {anchor:?}")] + #[error( + "unknown Sprout anchor: {anchor:?},\n\ + {height:?}, index in block: {tx_index_in_block:?}, {transaction_hash:?}" + )] #[non_exhaustive] - UnknownSproutAnchor { anchor: sprout::tree::Root }, + UnknownSproutAnchor { + anchor: sprout::tree::Root, + height: block::Height, + tx_index_in_block: usize, + transaction_hash: transaction::Hash, + }, - #[error("unknown Sapling anchor: {anchor:?}")] + #[error( + "unknown Sapling anchor: {anchor:?},\n\ + {height:?}, index in block: {tx_index_in_block:?}, {transaction_hash:?}" + )] #[non_exhaustive] - UnknownSaplingAnchor { anchor: sapling::tree::Root }, + UnknownSaplingAnchor { + anchor: sapling::tree::Root, + height: block::Height, + tx_index_in_block: usize, + transaction_hash: transaction::Hash, + }, - #[error("unknown Orchard anchor: {anchor:?}")] + #[error( + "unknown Orchard anchor: {anchor:?},\n\ + {height:?}, index in block: {tx_index_in_block:?}, {transaction_hash:?}" + )] #[non_exhaustive] - UnknownOrchardAnchor { anchor: orchard::tree::Root }, + UnknownOrchardAnchor { + anchor: orchard::tree::Root, + height: block::Height, + tx_index_in_block: usize, + transaction_hash: transaction::Hash, + }, } /// Trait for creating the corresponding duplicate nullifier error from a nullifier. diff --git a/zebra-state/src/service/arbitrary.rs b/zebra-state/src/service/arbitrary.rs index acb08070e87..ed723321674 100644 --- a/zebra-state/src/service/arbitrary.rs +++ b/zebra-state/src/service/arbitrary.rs @@ -32,7 +32,7 @@ pub struct PreparedChainTree { chain: Arc>>, count: BinarySearch, network: Network, - history_tree: HistoryTree, + history_tree: Arc, } impl ValueTree for PreparedChainTree { @@ -40,7 +40,7 @@ impl ValueTree for PreparedChainTree { Arc>>, ::Value, Network, - HistoryTree, + Arc, ); fn current(&self) -> Self::Value { @@ -64,7 +64,13 @@ impl ValueTree for PreparedChainTree { #[derive(Debug, Default)] pub struct PreparedChain { // the proptests are threaded (not async), so we want to use a threaded mutex here - chain: std::sync::Mutex>>, HistoryTree)>>, + chain: std::sync::Mutex< + Option<( + Network, + Arc>>, + Arc, + )>, + >, // the strategy for generating LedgerStates. If None, it calls [`LedgerState::genesis_strategy`]. ledger_strategy: Option>, generate_valid_commitments: bool, @@ -155,7 +161,11 @@ impl Strategy for PreparedChain { &Default::default(), ) .expect("history tree should be created"); - *chain = Some((network, Arc::new(SummaryDebug(blocks)), history_tree)); + *chain = Some(( + network, + Arc::new(SummaryDebug(blocks)), + Arc::new(history_tree), + )); } let chain = chain.clone().expect("should be generated"); diff --git a/zebra-state/src/service/check.rs b/zebra-state/src/service/check.rs index b5c9e1bc670..b3a43057a20 100644 --- a/zebra-state/src/service/check.rs +++ b/zebra-state/src/service/check.rs @@ -12,7 +12,7 @@ use zebra_chain::{ work::difficulty::CompactDifficulty, }; -use crate::{constants, BoxError, FinalizedBlock, PreparedBlock, ValidateContextError}; +use crate::{constants, BoxError, PreparedBlock, ValidateContextError}; // use self as check use super::check; @@ -102,29 +102,10 @@ where Ok(()) } -/// Check that the `prepared` block is contextually valid for `network`, using +/// Check that `block` is contextually valid for `network`, using /// the `history_tree` up to and including the previous block. -#[tracing::instrument(skip(prepared, history_tree))] -pub(crate) fn prepared_block_commitment_is_valid_for_chain_history( - prepared: &PreparedBlock, - network: Network, - history_tree: &HistoryTree, -) -> Result<(), ValidateContextError> { - block_commitment_is_valid_for_chain_history(prepared.block.clone(), network, history_tree) -} - -/// Check that the `finalized` block is contextually valid for `network`, using -/// the `history_tree` up to and including the previous block. -#[tracing::instrument(skip(finalized, history_tree))] -pub(crate) fn finalized_block_commitment_is_valid_for_chain_history( - finalized: &FinalizedBlock, - network: Network, - history_tree: &HistoryTree, -) -> Result<(), ValidateContextError> { - block_commitment_is_valid_for_chain_history(finalized.block.clone(), network, history_tree) -} - -fn block_commitment_is_valid_for_chain_history( +#[tracing::instrument(skip(block, history_tree))] +pub(crate) fn block_commitment_is_valid_for_chain_history( block: Arc, network: Network, history_tree: &HistoryTree, diff --git a/zebra-state/src/service/check/anchors.rs b/zebra-state/src/service/check/anchors.rs index ab62f03ae95..9687fca2751 100644 --- a/zebra-state/src/service/check/anchors.rs +++ b/zebra-state/src/service/check/anchors.rs @@ -3,110 +3,28 @@ use std::{collections::HashMap, sync::Arc}; -use zebra_chain::sprout; +use zebra_chain::{ + block::{Block, Height}, + sprout, transaction, +}; use crate::{ service::{finalized_state::ZebraDb, non_finalized_state::Chain}, PreparedBlock, ValidateContextError, }; -/// Checks that the Sprout, Sapling, and Orchard anchors specified by -/// transactions in this block have been computed previously within the context -/// of its parent chain. We do not check any anchors in checkpointed blocks, -/// which avoids JoinSplits +/// Checks the final Sapling and Orchard anchors specified by transactions in this +/// `prepared` block. /// -/// Sprout anchors may refer to some earlier block's final treestate (like -/// Sapling and Orchard do exclusively) _or_ to the interstitial output -/// treestate of any prior `JoinSplit` _within the same transaction_. +/// This method checks for anchors computed from the final treestate of each block in +/// the `parent_chain` or `finalized_state`. #[tracing::instrument(skip(finalized_state, parent_chain, prepared))] -pub(crate) fn anchors_refer_to_earlier_treestates( +pub(crate) fn sapling_orchard_anchors_refer_to_final_treestates( finalized_state: &ZebraDb, parent_chain: &Chain, prepared: &PreparedBlock, ) -> Result<(), ValidateContextError> { - for transaction in prepared.block.transactions.iter() { - // Sprout JoinSplits, with interstitial treestates to check as well. - if transaction.has_sprout_joinsplit_data() { - let mut interstitial_trees: HashMap< - sprout::tree::Root, - Arc, - > = HashMap::new(); - - for joinsplit in transaction.sprout_groth16_joinsplits() { - // Check all anchor sets, including the one for interstitial - // anchors. - // - // The anchor is checked and the matching tree is obtained, - // which is used to create the interstitial tree state for this - // JoinSplit: - // - // > For each JoinSplit description in a transaction, an - // > interstitial output treestate is constructed which adds the - // > note commitments and nullifiers specified in that JoinSplit - // > description to the input treestate referred to by its - // > anchor. This interstitial output treestate is available for - // > use as the anchor of subsequent JoinSplit descriptions in - // > the same transaction. - // - // - // - // # Consensus - // - // > The anchor of each JoinSplit description in a transaction - // > MUST refer to either some earlier block’s final Sprout - // > treestate, or to the interstitial output treestate of any - // > prior JoinSplit description in the same transaction. - // - // > For the first JoinSplit description of a transaction, the - // > anchor MUST be the output Sprout treestate of a previous - // > block. - // - // - // - // Note that in order to satisfy the latter consensus rule above, - // [`interstitial_trees`] is always empty in the first iteration - // of the loop. - let input_tree = interstitial_trees - .get(&joinsplit.anchor) - .cloned() - .or_else(|| { - parent_chain - .sprout_trees_by_anchor - .get(&joinsplit.anchor) - .cloned() - .or_else(|| { - finalized_state - .sprout_note_commitment_tree_by_anchor(&joinsplit.anchor) - }) - }); - - let mut input_tree = match input_tree { - Some(tree) => tree, - None => { - tracing::warn!(?joinsplit.anchor, ?prepared.height, ?prepared.hash, "failed to find sprout anchor"); - return Err(ValidateContextError::UnknownSproutAnchor { - anchor: joinsplit.anchor, - }); - } - }; - - let input_tree_inner = Arc::make_mut(&mut input_tree); - - tracing::debug!(?joinsplit.anchor, "validated sprout anchor"); - - // Add new anchors to the interstitial note commitment tree. - for cm in joinsplit.commitments { - input_tree_inner - .append(cm) - .expect("note commitment should be appendable to the tree"); - } - - interstitial_trees.insert(input_tree.root(), input_tree); - - tracing::debug!(?joinsplit.anchor, "observed sprout anchor"); - } - } - + for (tx_index_in_block, transaction) in prepared.block.transactions.iter().enumerate() { // Sapling Spends // // MUST refer to some earlier block’s final Sapling treestate. @@ -124,18 +42,33 @@ pub(crate) fn anchors_refer_to_earlier_treestates( // [`zebra_chain::sapling::shielded_data`]. // // The "earlier treestate" check is implemented here. - if transaction.has_sapling_shielded_data() { - for anchor in transaction.sapling_anchors() { - tracing::debug!(?anchor, "observed sapling anchor"); - - if !parent_chain.sapling_anchors.contains(&anchor) - && !finalized_state.contains_sapling_anchor(&anchor) - { - return Err(ValidateContextError::UnknownSaplingAnchor { anchor }); - } + for (anchor_index_in_tx, anchor) in transaction.sapling_anchors().enumerate() { + tracing::debug!( + ?anchor, + ?anchor_index_in_tx, + ?tx_index_in_block, + height = ?prepared.height, + "observed sapling anchor", + ); - tracing::debug!(?anchor, "validated sapling anchor"); + if !parent_chain.sapling_anchors.contains(&anchor) + && !finalized_state.contains_sapling_anchor(&anchor) + { + return Err(ValidateContextError::UnknownSaplingAnchor { + anchor, + height: prepared.height, + tx_index_in_block, + transaction_hash: prepared.transaction_hashes[tx_index_in_block], + }); } + + tracing::debug!( + ?anchor, + ?anchor_index_in_tx, + ?tx_index_in_block, + height = ?prepared.height, + "validated sapling anchor", + ); } // Orchard Actions @@ -150,7 +83,12 @@ pub(crate) fn anchors_refer_to_earlier_treestates( // // if let Some(orchard_shielded_data) = transaction.orchard_shielded_data() { - tracing::debug!(?orchard_shielded_data.shared_anchor, "observed orchard anchor"); + tracing::debug!( + ?orchard_shielded_data.shared_anchor, + ?tx_index_in_block, + height = ?prepared.height, + "observed orchard anchor", + ); if !parent_chain .orchard_anchors @@ -159,10 +97,234 @@ pub(crate) fn anchors_refer_to_earlier_treestates( { return Err(ValidateContextError::UnknownOrchardAnchor { anchor: orchard_shielded_data.shared_anchor, + height: prepared.height, + tx_index_in_block, + transaction_hash: prepared.transaction_hashes[tx_index_in_block], }); } - tracing::debug!(?orchard_shielded_data.shared_anchor, "validated orchard anchor"); + tracing::debug!( + ?orchard_shielded_data.shared_anchor, + ?tx_index_in_block, + height = ?prepared.height, + "validated orchard anchor", + ); + } + } + + Ok(()) +} + +/// This function fetches and returns the Sprout final treestates from the state, +/// so [`sprout_anchors_refer_to_treestates()`] can check Sprout final and interstitial treestates, +/// without accessing the disk. +/// +/// Sprout anchors may also refer to the interstitial output treestate of any prior +/// `JoinSplit` _within the same transaction_; these are created on the fly +/// in [`sprout_anchors_refer_to_treestates()`]. +#[tracing::instrument(skip(finalized_state, parent_chain, prepared))] +pub(crate) fn fetch_sprout_final_treestates( + finalized_state: &ZebraDb, + parent_chain: &Chain, + prepared: &PreparedBlock, +) -> HashMap> { + let mut sprout_final_treestates = HashMap::new(); + + for (tx_index_in_block, transaction) in prepared.block.transactions.iter().enumerate() { + // Fetch and return Sprout JoinSplit final treestates + for (joinsplit_index_in_tx, joinsplit) in + transaction.sprout_groth16_joinsplits().enumerate() + { + // Avoid duplicate fetches + if sprout_final_treestates.contains_key(&joinsplit.anchor) { + continue; + } + + let input_tree = parent_chain + .sprout_trees_by_anchor + .get(&joinsplit.anchor) + .cloned() + .or_else(|| { + finalized_state.sprout_note_commitment_tree_by_anchor(&joinsplit.anchor) + }); + + if let Some(input_tree) = input_tree { + /* TODO: + - fix tests that generate incorrect root data + - assert that roots match the fetched tree during tests + - move this CPU-intensive check to sprout_anchors_refer_to_treestates() + + assert_eq!( + input_tree.root(), + joinsplit.anchor, + "anchor and fetched input tree root did not match:\n\ + anchor: {anchor:?},\n\ + input tree root: {input_tree_root:?},\n\ + input_tree: {input_tree:?}", + anchor = joinsplit.anchor + ); + */ + + sprout_final_treestates.insert(joinsplit.anchor, input_tree); + + tracing::debug!( + sprout_final_treestate_count = ?sprout_final_treestates.len(), + ?joinsplit.anchor, + ?joinsplit_index_in_tx, + ?tx_index_in_block, + height = ?prepared.height, + "observed sprout final treestate anchor", + ); + } + } + } + + tracing::trace!( + sprout_final_treestate_count = ?sprout_final_treestates.len(), + ?sprout_final_treestates, + height = ?prepared.height, + "returning sprout final treestate anchors", + ); + + sprout_final_treestates +} + +/// Checks the Sprout anchors specified by transactions in `block`. +/// +/// Sprout anchors may refer to some earlier block's final treestate (like +/// Sapling and Orchard do exclusively) _or_ to the interstitial output +/// treestate of any prior `JoinSplit` _within the same transaction_. +/// +/// This method searches for anchors in the supplied `sprout_final_treestates` +/// (which must be populated with all treestates pointed to in the `prepared` block; +/// see [`fetch_sprout_final_treestates()`]); or in the interstitial +/// treestates which are computed on the fly in this function. +#[tracing::instrument(skip(sprout_final_treestates, block, transaction_hashes))] +pub(crate) fn sprout_anchors_refer_to_treestates( + sprout_final_treestates: HashMap>, + block: Arc, + // Only used for debugging + height: Height, + transaction_hashes: Arc<[transaction::Hash]>, +) -> Result<(), ValidateContextError> { + tracing::trace!( + sprout_final_treestate_count = ?sprout_final_treestates.len(), + ?sprout_final_treestates, + ?height, + "received sprout final treestate anchors", + ); + + for (tx_index_in_block, transaction) in block.transactions.iter().enumerate() { + // Sprout JoinSplits, with interstitial treestates to check as well. + let mut interstitial_trees: HashMap< + sprout::tree::Root, + Arc, + > = HashMap::new(); + + let joinsplit_count = transaction.sprout_groth16_joinsplits().count(); + + for (joinsplit_index_in_tx, joinsplit) in + transaction.sprout_groth16_joinsplits().enumerate() + { + // Check all anchor sets, including the one for interstitial + // anchors. + // + // The anchor is checked and the matching tree is obtained, + // which is used to create the interstitial tree state for this + // JoinSplit: + // + // > For each JoinSplit description in a transaction, an + // > interstitial output treestate is constructed which adds the + // > note commitments and nullifiers specified in that JoinSplit + // > description to the input treestate referred to by its + // > anchor. This interstitial output treestate is available for + // > use as the anchor of subsequent JoinSplit descriptions in + // > the same transaction. + // + // + // + // # Consensus + // + // > The anchor of each JoinSplit description in a transaction + // > MUST refer to either some earlier block’s final Sprout + // > treestate, or to the interstitial output treestate of any + // > prior JoinSplit description in the same transaction. + // + // > For the first JoinSplit description of a transaction, the + // > anchor MUST be the output Sprout treestate of a previous + // > block. + // + // + // + // Note that in order to satisfy the latter consensus rule above, + // [`interstitial_trees`] is always empty in the first iteration + // of the loop. + let input_tree = interstitial_trees + .get(&joinsplit.anchor) + .cloned() + .or_else(|| sprout_final_treestates.get(&joinsplit.anchor).cloned()); + + tracing::trace!( + ?input_tree, + final_lookup = ?sprout_final_treestates.get(&joinsplit.anchor), + interstitial_lookup = ?interstitial_trees.get(&joinsplit.anchor), + interstitial_tree_count = ?interstitial_trees.len(), + ?interstitial_trees, + ?height, + "looked up sprout treestate anchor", + ); + + let mut input_tree = match input_tree { + Some(tree) => tree, + None => { + tracing::debug!( + ?joinsplit.anchor, + ?joinsplit_index_in_tx, + ?tx_index_in_block, + ?height, + "failed to find sprout anchor", + ); + return Err(ValidateContextError::UnknownSproutAnchor { + anchor: joinsplit.anchor, + height, + tx_index_in_block, + transaction_hash: transaction_hashes[tx_index_in_block], + }); + } + }; + + tracing::debug!( + ?joinsplit.anchor, + ?joinsplit_index_in_tx, + ?tx_index_in_block, + ?height, + "validated sprout anchor", + ); + + // The last interstitial treestate in a transaction can never be used, + // so we avoid generating it. + if joinsplit_index_in_tx == joinsplit_count - 1 { + continue; + } + + let input_tree_inner = Arc::make_mut(&mut input_tree); + + // Add new anchors to the interstitial note commitment tree. + for cm in joinsplit.commitments { + input_tree_inner + .append(cm) + .expect("note commitment should be appendable to the tree"); + } + + interstitial_trees.insert(input_tree.root(), input_tree); + + tracing::debug!( + ?joinsplit.anchor, + ?joinsplit_index_in_tx, + ?tx_index_in_block, + ?height, + "observed sprout interstitial anchor", + ); } } diff --git a/zebra-state/src/service/finalized_state.rs b/zebra-state/src/service/finalized_state.rs index 958910a260c..cc2207b9e95 100644 --- a/zebra-state/src/service/finalized_state.rs +++ b/zebra-state/src/service/finalized_state.rs @@ -279,9 +279,11 @@ impl FinalizedState { // the history tree root. While it _is_ checked during contextual validation, // that is not called by the checkpoint verifier, and keeping a history tree there // would be harder to implement. + // + // TODO: run this CPU-intensive cryptography in a parallel rayon thread, if it shows up in profiles let history_tree = self.db.history_tree(); - check::finalized_block_commitment_is_valid_for_chain_history( - &finalized, + check::block_commitment_is_valid_for_chain_history( + finalized.block.clone(), self.network, &history_tree, )?; diff --git a/zebra-state/src/service/finalized_state/zebra_db/block.rs b/zebra-state/src/service/finalized_state/zebra_db/block.rs index 50f0024200a..8674945b12f 100644 --- a/zebra-state/src/service/finalized_state/zebra_db/block.rs +++ b/zebra-state/src/service/finalized_state/zebra_db/block.rs @@ -21,6 +21,7 @@ use zebra_chain::{ block::{self, Block, Height}, history_tree::HistoryTree, orchard, + parallel::tree::NoteCommitmentTrees, parameters::{Network, GENESIS_PREVIOUS_BLOCK_HASH}, sapling, serialization::TrustedPreallocate, @@ -36,7 +37,7 @@ use crate::{ block::TransactionLocation, transparent::{AddressBalanceLocation, OutputLocation}, }, - zebra_db::{metrics::block_precommit_metrics, shielded::NoteCommitmentTrees, ZebraDb}, + zebra_db::{metrics::block_precommit_metrics, ZebraDb}, FinalizedBlock, }, BoxError, HashOrHeight, @@ -229,7 +230,7 @@ impl ZebraDb { pub(in super::super) fn write_block( &mut self, finalized: FinalizedBlock, - history_tree: HistoryTree, + history_tree: Arc, network: Network, source: &str, ) -> Result { @@ -371,7 +372,7 @@ impl DiskWriteBatch { spent_utxos_by_out_loc: BTreeMap, address_balances: HashMap, mut note_commitment_trees: NoteCommitmentTrees, - history_tree: HistoryTree, + history_tree: Arc, value_pool: ValueBalance, ) -> Result<(), BoxError> { let FinalizedBlock { diff --git a/zebra-state/src/service/finalized_state/zebra_db/block/tests/snapshot.rs b/zebra-state/src/service/finalized_state/zebra_db/block/tests/snapshot.rs index e5be8ec5a44..3eb2bd1a237 100644 --- a/zebra-state/src/service/finalized_state/zebra_db/block/tests/snapshot.rs +++ b/zebra-state/src/service/finalized_state/zebra_db/block/tests/snapshot.rs @@ -297,7 +297,7 @@ fn snapshot_block_and_transaction_data(state: &FinalizedState) { assert_eq!(orchard_tree_at_tip, orchard_tree_by_height); // Skip these checks for empty history trees. - if let Some(history_tree_at_tip) = history_tree_at_tip.as_ref() { + if let Some(history_tree_at_tip) = history_tree_at_tip.as_ref().as_ref() { assert_eq!(history_tree_at_tip.current_height(), max_height); assert_eq!(history_tree_at_tip.network(), state.network()); } diff --git a/zebra-state/src/service/finalized_state/zebra_db/chain.rs b/zebra-state/src/service/finalized_state/zebra_db/chain.rs index 6684fff13b0..25c316b72a2 100644 --- a/zebra-state/src/service/finalized_state/zebra_db/chain.rs +++ b/zebra-state/src/service/finalized_state/zebra_db/chain.rs @@ -11,7 +11,7 @@ //! The [`crate::constants::DATABASE_FORMAT_VERSION`] constant must //! be incremented each time the database format (column, serialization, etc) changes. -use std::{borrow::Borrow, collections::HashMap}; +use std::{borrow::Borrow, collections::HashMap, sync::Arc}; use zebra_chain::{ amount::NonNegative, @@ -32,20 +32,19 @@ use crate::{ impl ZebraDb { /// Returns the ZIP-221 history tree of the finalized tip or `None` /// if it does not exist yet in the state (pre-Heartwood). - pub fn history_tree(&self) -> HistoryTree { - match self.finalized_tip_height() { - Some(height) => { - let history_tree_cf = self.db.cf_handle("history_tree").unwrap(); - let history_tree: Option = - self.db.zs_get(&history_tree_cf, &height); - if let Some(non_empty_tree) = history_tree { - HistoryTree::from(non_empty_tree) - } else { - Default::default() - } + pub fn history_tree(&self) -> Arc { + if let Some(height) = self.finalized_tip_height() { + let history_tree_cf = self.db.cf_handle("history_tree").unwrap(); + + let history_tree: Option = + self.db.zs_get(&history_tree_cf, &height); + + if let Some(non_empty_tree) = history_tree { + return Arc::new(HistoryTree::from(non_empty_tree)); } - None => Default::default(), } + + Default::default() } /// Returns the stored `ValueBalance` for the best chain at the finalized tip height. @@ -74,13 +73,15 @@ impl DiskWriteBatch { finalized: &FinalizedBlock, sapling_root: sapling::tree::Root, orchard_root: orchard::tree::Root, - mut history_tree: HistoryTree, + mut history_tree: Arc, ) -> Result<(), BoxError> { let history_tree_cf = db.cf_handle("history_tree").unwrap(); let FinalizedBlock { block, height, .. } = finalized; - history_tree.push(self.network(), block.clone(), sapling_root, orchard_root)?; + // TODO: run this CPU-intensive cryptography in a parallel rayon thread, if it shows up in profiles + let history_tree_mut = Arc::make_mut(&mut history_tree); + history_tree_mut.push(self.network(), block.clone(), sapling_root, orchard_root)?; // Update the tree in state let current_tip_height = *height - 1; @@ -93,7 +94,7 @@ impl DiskWriteBatch { // Otherwise, the ReadStateService could access a height // that was just deleted by a concurrent StateService write. // This requires a database version update. - if let Some(history_tree) = history_tree.as_ref() { + if let Some(history_tree) = history_tree.as_ref().as_ref() { self.zs_insert(&history_tree_cf, height, history_tree); } diff --git a/zebra-state/src/service/finalized_state/zebra_db/shielded.rs b/zebra-state/src/service/finalized_state/zebra_db/shielded.rs index 95b0b63f2cc..1963d2d2f57 100644 --- a/zebra-state/src/service/finalized_state/zebra_db/shielded.rs +++ b/zebra-state/src/service/finalized_state/zebra_db/shielded.rs @@ -15,7 +15,8 @@ use std::sync::Arc; use zebra_chain::{ - block::Height, history_tree::HistoryTree, orchard, sapling, sprout, transaction::Transaction, + block::Height, history_tree::HistoryTree, orchard, parallel::tree::NoteCommitmentTrees, + sapling, sprout, transaction::Transaction, }; use crate::{ @@ -27,14 +28,6 @@ use crate::{ BoxError, }; -/// An argument wrapper struct for note commitment trees. -#[derive(Clone, Debug)] -pub struct NoteCommitmentTrees { - sprout: Arc, - sapling: Arc, - orchard: Arc, -} - impl ZebraDb { // Read shielded methods @@ -194,10 +187,10 @@ impl DiskWriteBatch { // Index each transaction's shielded data for transaction in &block.transactions { self.prepare_nullifier_batch(db, transaction)?; - - DiskWriteBatch::update_note_commitment_trees(transaction, note_commitment_trees)?; } + note_commitment_trees.update_trees_parallel(block)?; + Ok(()) } @@ -231,36 +224,6 @@ impl DiskWriteBatch { Ok(()) } - /// Updates the supplied note commitment trees. - /// - /// If this method returns an error, it will be propagated, - /// and the batch should not be written to the database. - /// - /// # Errors - /// - /// - Propagates any errors from updating note commitment trees - pub fn update_note_commitment_trees( - transaction: &Transaction, - note_commitment_trees: &mut NoteCommitmentTrees, - ) -> Result<(), BoxError> { - let sprout_nct = Arc::make_mut(&mut note_commitment_trees.sprout); - for sprout_note_commitment in transaction.sprout_note_commitments() { - sprout_nct.append(*sprout_note_commitment)?; - } - - let sapling_nct = Arc::make_mut(&mut note_commitment_trees.sapling); - for sapling_note_commitment in transaction.sapling_note_commitments() { - sapling_nct.append(*sapling_note_commitment)?; - } - - let orchard_nct = Arc::make_mut(&mut note_commitment_trees.orchard); - for orchard_note_commitment in transaction.orchard_note_commitments() { - orchard_nct.append(*orchard_note_commitment)?; - } - - Ok(()) - } - /// Prepare a database batch containing the note commitment and history tree updates /// from `finalized.block`, and return it (without actually writing anything). /// @@ -276,7 +239,7 @@ impl DiskWriteBatch { db: &DiskDb, finalized: &FinalizedBlock, note_commitment_trees: NoteCommitmentTrees, - history_tree: HistoryTree, + history_tree: Arc, ) -> Result<(), BoxError> { let sprout_anchors = db.cf_handle("sprout_anchors").unwrap(); let sapling_anchors = db.cf_handle("sapling_anchors").unwrap(); @@ -288,11 +251,12 @@ impl DiskWriteBatch { let FinalizedBlock { height, .. } = finalized; + // Use the cached values that were previously calculated in parallel. let sprout_root = note_commitment_trees.sprout.root(); let sapling_root = note_commitment_trees.sapling.root(); let orchard_root = note_commitment_trees.orchard.root(); - // Compute the new anchors and index them + // Index the new anchors. // Note: if the root hasn't changed, we write the same value again. self.zs_insert(&sprout_anchors, sprout_root, ¬e_commitment_trees.sprout); self.zs_insert(&sapling_anchors, sapling_root, ()); diff --git a/zebra-state/src/service/non_finalized_state.rs b/zebra-state/src/service/non_finalized_state.rs index 251582a829c..5a23bd07889 100644 --- a/zebra-state/src/service/non_finalized_state.rs +++ b/zebra-state/src/service/non_finalized_state.rs @@ -2,7 +2,11 @@ //! //! [RFC0005]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html -use std::{collections::BTreeSet, mem, sync::Arc}; +use std::{ + collections::{BTreeSet, HashMap}, + mem, + sync::Arc, +}; use zebra_chain::{ block::{self, Block}, @@ -205,6 +209,9 @@ impl NonFinalizedState { prepared: PreparedBlock, finalized_state: &ZebraDb, ) -> Result, ValidateContextError> { + // Reads from disk + // + // TODO: if these disk reads show up in profiles, run them in parallel, using std::thread::spawn() let spent_utxos = check::utxo::transparent_spend( &prepared, &new_chain.unspent_utxos(), @@ -212,18 +219,18 @@ impl NonFinalizedState { finalized_state, )?; - check::prepared_block_commitment_is_valid_for_chain_history( - &prepared, - self.network, - &new_chain.history_tree, - )?; - - check::anchors::anchors_refer_to_earlier_treestates( + // Reads from disk + check::anchors::sapling_orchard_anchors_refer_to_final_treestates( finalized_state, &new_chain, &prepared, )?; + // Reads from disk + let sprout_final_treestates = + check::anchors::fetch_sprout_final_treestates(finalized_state, &new_chain, &prepared); + + // Quick check that doesn't read from disk let contextual = ContextuallyValidBlock::with_block_and_spent_utxos( prepared.clone(), spent_utxos.clone(), @@ -238,12 +245,65 @@ impl NonFinalizedState { } })?; - // We're pretty sure the new block is valid, - // so clone the inner chain if needed, then add the new block. - Arc::try_unwrap(new_chain) - .unwrap_or_else(|shared_chain| (*shared_chain).clone()) - .push(contextual) - .map(Arc::new) + Self::validate_and_update_parallel(new_chain, contextual, sprout_final_treestates) + } + + /// Validate `contextual` and update `new_chain`, doing CPU-intensive work in parallel batches. + #[allow(clippy::unwrap_in_result)] + #[tracing::instrument(skip(new_chain, sprout_final_treestates))] + fn validate_and_update_parallel( + new_chain: Arc, + contextual: ContextuallyValidBlock, + sprout_final_treestates: HashMap>, + ) -> Result, ValidateContextError> { + let mut block_commitment_result = None; + let mut sprout_anchor_result = None; + let mut chain_push_result = None; + + // Clone function arguments for different threads + let block = contextual.block.clone(); + let network = new_chain.network(); + let history_tree = new_chain.history_tree.clone(); + + let block2 = contextual.block.clone(); + let height = contextual.height; + let transaction_hashes = contextual.transaction_hashes.clone(); + + rayon::in_place_scope_fifo(|scope| { + scope.spawn_fifo(|_scope| { + block_commitment_result = Some(check::block_commitment_is_valid_for_chain_history( + block, + network, + &history_tree, + )); + }); + + scope.spawn_fifo(|_scope| { + sprout_anchor_result = Some(check::anchors::sprout_anchors_refer_to_treestates( + sprout_final_treestates, + block2, + height, + transaction_hashes, + )); + }); + + // We're pretty sure the new block is valid, + // so clone the inner chain if needed, then add the new block. + // + // Pushing a block onto a Chain can launch additional parallel batches. + // TODO: should we pass _scope into Chain::push()? + scope.spawn_fifo(|_scope| { + let new_chain = Arc::try_unwrap(new_chain) + .unwrap_or_else(|shared_chain| (*shared_chain).clone()); + chain_push_result = Some(new_chain.push(contextual).map(Arc::new)); + }); + }); + + // Don't return the updated Chain unless all the parallel results were Ok + block_commitment_result.expect("scope has finished")?; + sprout_anchor_result.expect("scope has finished")?; + + chain_push_result.expect("scope has finished") } /// Returns the length of the non-finalized portion of the current best chain. @@ -388,7 +448,7 @@ impl NonFinalizedState { sprout_note_commitment_tree: Arc, sapling_note_commitment_tree: Arc, orchard_note_commitment_tree: Arc, - history_tree: HistoryTree, + history_tree: Arc, ) -> Result, ValidateContextError> { match self.find_chain(|chain| chain.non_finalized_tip_hash() == parent_hash) { // Clone the existing Arc in the non-finalized state diff --git a/zebra-state/src/service/non_finalized_state/chain.rs b/zebra-state/src/service/non_finalized_state/chain.rs index 0c9f51a392f..a6619735c71 100644 --- a/zebra-state/src/service/non_finalized_state/chain.rs +++ b/zebra-state/src/service/non_finalized_state/chain.rs @@ -18,6 +18,7 @@ use zebra_chain::{ fmt::humantime_milliseconds, history_tree::HistoryTree, orchard, + parallel::tree::NoteCommitmentTrees, parameters::Network, primitives::Groth16Proof, sapling, sprout, @@ -84,7 +85,7 @@ pub struct Chain { BTreeMap>, /// The ZIP-221 history tree of the tip of this [`Chain`], /// including all finalized blocks, and the non-finalized `blocks` in this chain. - pub(crate) history_tree: HistoryTree, + pub(crate) history_tree: Arc, /// The Sprout anchors created by `blocks`. pub(crate) sprout_anchors: MultiSet, @@ -132,7 +133,7 @@ impl Chain { sprout_note_commitment_tree: Arc, sapling_note_commitment_tree: Arc, orchard_note_commitment_tree: Arc, - history_tree: HistoryTree, + history_tree: Arc, finalized_tip_chain_value_pools: ValueBalance, ) -> Self { Self { @@ -176,8 +177,6 @@ impl Chain { /// even if the blocks in the two chains are equal. #[cfg(test)] pub(crate) fn eq_internal_state(&self, other: &Chain) -> bool { - use zebra_chain::history_tree::NonEmptyHistoryTree; - // blocks, heights, hashes self.blocks == other.blocks && self.height_by_hash == other.height_by_hash && @@ -196,7 +195,7 @@ impl Chain { self.orchard_trees_by_height== other.orchard_trees_by_height && // history tree - self.history_tree.as_ref().map(NonEmptyHistoryTree::hash) == other.history_tree.as_ref().map(NonEmptyHistoryTree::hash) && + self.history_tree.as_ref().as_ref().map(|tree| tree.hash()) == other.history_tree.as_ref().as_ref().map(|other_tree| other_tree.hash()) && // anchors self.sprout_anchors == other.sprout_anchors && @@ -278,7 +277,7 @@ impl Chain { sprout_note_commitment_tree: Arc, sapling_note_commitment_tree: Arc, orchard_note_commitment_tree: Arc, - history_tree: HistoryTree, + history_tree: Arc, ) -> Result, ValidateContextError> { if !self.height_by_hash.contains_key(&fork_tip) { return Ok(None); @@ -291,20 +290,31 @@ impl Chain { history_tree, ); + // Revert blocks above the fork while forked.non_finalized_tip_hash() != fork_tip { forked.pop_tip(); } - // Rebuild the note commitment and history trees, starting from the finalized tip tree. - // - // Note commitments and history trees are not removed from the Chain during a fork, - // because we don't support that operation yet. Instead, we recreate the tree - // from the finalized tip. - // - // TODO: remove trees and anchors above the fork, to save CPU time (#4794) + // Rebuild trees from the finalized tip, because we haven't implemented reverts yet. + forked.rebuild_trees_parallel()?; + + Ok(Some(forked)) + } + + /// Rebuild the note commitment and history trees after a chain fork, + /// starting from the finalized tip trees, using parallel `rayon` threads. + /// + /// Note commitments and history trees are not removed from the Chain during a fork, + /// because we don't support that operation yet. Instead, we recreate the tree + /// from the finalized tip. + /// + /// TODO: remove trees and anchors above the fork, to save CPU time (#4794) + #[allow(clippy::unwrap_in_result)] + pub fn rebuild_trees_parallel(&mut self) -> Result<(), ValidateContextError> { let start_time = Instant::now(); - let rebuilt_block_count = forked.blocks.len(); - let fork_height = forked.non_finalized_tip_height(); + let rebuilt_block_count = self.blocks.len(); + let fork_height = self.non_finalized_tip_height(); + let fork_tip = self.non_finalized_tip_hash(); info!( ?rebuilt_block_count, @@ -313,44 +323,80 @@ impl Chain { "starting to rebuild note commitment trees after a non-finalized chain fork", ); - let sprout_nct = Arc::make_mut(&mut forked.sprout_note_commitment_tree); - let sapling_nct = Arc::make_mut(&mut forked.sapling_note_commitment_tree); - let orchard_nct = Arc::make_mut(&mut forked.orchard_note_commitment_tree); + // Prepare data for parallel execution + let block_list = self + .blocks + .iter() + .map(|(height, block)| (*height, block.block.clone())) + .collect(); + + // TODO: use NoteCommitmentTrees to store the trees as well? + let mut nct = NoteCommitmentTrees { + sprout: self.sprout_note_commitment_tree.clone(), + sapling: self.sapling_note_commitment_tree.clone(), + orchard: self.orchard_note_commitment_tree.clone(), + }; - for block in forked.blocks.values() { - for transaction in block.block.transactions.iter() { - for sprout_note_commitment in transaction.sprout_note_commitments() { - sprout_nct - .append(*sprout_note_commitment) - .expect("must work since it was already appended before the fork"); - } + let mut note_result = None; + let mut history_result = None; - for sapling_note_commitment in transaction.sapling_note_commitments() { - sapling_nct - .append(*sapling_note_commitment) - .expect("must work since it was already appended before the fork"); - } + // Run 4 tasks in parallel: + // - sprout, sapling, and orchard tree updates and (redundant) root calculations + // - history tree updates + rayon::in_place_scope_fifo(|scope| { + // Spawns a separate rayon task for each note commitment tree. + // + // TODO: skip the unused root calculations? (redundant after #4794) + note_result = Some(nct.update_trees_parallel_list(block_list)); - for orchard_note_commitment in transaction.orchard_note_commitments() { - orchard_nct - .append(*orchard_note_commitment) - .expect("must work since it was already appended before the fork"); - } - } + scope.spawn_fifo(|_scope| { + history_result = Some(self.rebuild_history_tree()); + }); + }); + note_result.expect("scope has already finished")?; + history_result.expect("scope has already finished")?; + + // Update the note commitment trees in the chain. + self.sprout_note_commitment_tree = nct.sprout; + self.sapling_note_commitment_tree = nct.sapling; + self.orchard_note_commitment_tree = nct.orchard; + + let rebuild_time = start_time.elapsed(); + let rebuild_time_per_block = + rebuild_time / rebuilt_block_count.try_into().expect("fits in u32"); + info!( + rebuild_time = ?humantime_milliseconds(rebuild_time), + rebuild_time_per_block = ?humantime_milliseconds(rebuild_time_per_block), + ?rebuilt_block_count, + ?fork_height, + ?fork_tip, + "finished rebuilding note commitment trees after a non-finalized chain fork", + ); + + Ok(()) + } + + /// Rebuild the history tree after a chain fork. + /// + /// TODO: remove trees and anchors above the fork, to save CPU time (#4794) + #[allow(clippy::unwrap_in_result)] + pub fn rebuild_history_tree(&mut self) -> Result<(), ValidateContextError> { + for block in self.blocks.values() { // Note that anchors don't need to be recreated since they are already // handled in revert_chain_state_with. - let sapling_root = forked + let sapling_root = self .sapling_anchors_by_height .get(&block.height) .expect("Sapling anchors must exist for pre-fork blocks"); - let orchard_root = forked + let orchard_root = self .orchard_anchors_by_height .get(&block.height) .expect("Orchard anchors must exist for pre-fork blocks"); - forked.history_tree.push( + let history_tree_mut = Arc::make_mut(&mut self.history_tree); + history_tree_mut.push( self.network, block.block.clone(), *sapling_root, @@ -358,19 +404,12 @@ impl Chain { )?; } - let rebuild_time = start_time.elapsed(); - let rebuild_time_per_block = - rebuild_time / rebuilt_block_count.try_into().expect("fits in u32"); - info!( - rebuild_time = ?humantime_milliseconds(rebuild_time), - rebuild_time_per_block = ?humantime_milliseconds(rebuild_time_per_block), - ?rebuilt_block_count, - ?fork_height, - ?fork_tip, - "finished rebuilding note commitment trees after a non-finalized chain fork", - ); + Ok(()) + } - Ok(Some(forked)) + /// Returns the [`Network`] for this chain. + pub fn network(&self) -> Network { + self.network } /// Returns the [`ContextuallyValidBlock`] with [`block::Hash`] or @@ -674,7 +713,7 @@ impl Chain { sprout_note_commitment_tree: Arc, sapling_note_commitment_tree: Arc, orchard_note_commitment_tree: Arc, - history_tree: HistoryTree, + history_tree: Arc, ) -> Self { Chain { network: self.network, @@ -704,43 +743,97 @@ impl Chain { chain_value_pools: self.chain_value_pools, } } -} -/// The revert position being performed on a chain. -#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] -enum RevertPosition { - /// The chain root is being reverted via [`Chain::pop_root`], when a block - /// is finalized. - Root, + /// Update the chain tip with the `contextually_valid` block, + /// running note commitment tree updates in parallel with other updates. + /// + /// Used to implement `update_chain_tip_with::`. + #[instrument(skip(self, contextually_valid), fields(block = %contextually_valid.block))] + #[allow(clippy::unwrap_in_result)] + fn update_chain_tip_with_block_parallel( + &mut self, + contextually_valid: &ContextuallyValidBlock, + ) -> Result<(), ValidateContextError> { + let height = contextually_valid.height; - /// The chain tip is being reverted via [`Chain::pop_tip`], - /// when a chain is forked. - Tip, -} + // Prepare data for parallel execution + // + // TODO: use NoteCommitmentTrees to store the trees as well? + let mut nct = NoteCommitmentTrees { + sprout: self.sprout_note_commitment_tree.clone(), + sapling: self.sapling_note_commitment_tree.clone(), + orchard: self.orchard_note_commitment_tree.clone(), + }; -/// Helper trait to organize inverse operations done on the [`Chain`] type. -/// -/// Used to overload update and revert methods, based on the type of the argument, -/// and the position of the removed block in the chain. -/// -/// This trait was motivated by the length of the `push`, [`Chain::pop_root`], -/// and [`Chain::pop_tip`] functions, and fear that it would be easy to -/// introduce bugs when updating them, unless the code was reorganized to keep -/// related operations adjacent to each other. -trait UpdateWith { - /// When `T` is added to the chain tip, - /// update [`Chain`] cumulative data members to add data that are derived from `T`. - fn update_chain_tip_with(&mut self, _: &T) -> Result<(), ValidateContextError>; + let mut tree_result = None; + let mut partial_result = None; - /// When `T` is removed from `position` in the chain, - /// revert [`Chain`] cumulative data members to remove data that are derived from `T`. - fn revert_chain_with(&mut self, _: &T, position: RevertPosition); -} + // Run 4 tasks in parallel: + // - sprout, sapling, and orchard tree updates and root calculations + // - the rest of the Chain updates + rayon::in_place_scope_fifo(|scope| { + // Spawns a separate rayon task for each note commitment tree + tree_result = Some(nct.update_trees_parallel(&contextually_valid.block.clone())); -impl UpdateWith for Chain { + scope.spawn_fifo(|_scope| { + partial_result = + Some(self.update_chain_tip_with_block_except_trees(contextually_valid)); + }); + }); + + tree_result.expect("scope has already finished")?; + partial_result.expect("scope has already finished")?; + + // Update the note commitment trees in the chain. + self.sprout_note_commitment_tree = nct.sprout; + self.sapling_note_commitment_tree = nct.sapling; + self.orchard_note_commitment_tree = nct.orchard; + + // Do the Chain updates with data dependencies on note commitment tree updates + + // Update the note commitment trees indexed by height. + self.sapling_trees_by_height + .insert(height, self.sapling_note_commitment_tree.clone()); + self.orchard_trees_by_height + .insert(height, self.orchard_note_commitment_tree.clone()); + + // Having updated all the note commitment trees and nullifier sets in + // this block, the roots of the note commitment trees as of the last + // transaction are the treestates of this block. + // + // Use the previously cached roots, which were calculated in parallel. + let sprout_root = self.sprout_note_commitment_tree.root(); + self.sprout_anchors.insert(sprout_root); + self.sprout_anchors_by_height.insert(height, sprout_root); + self.sprout_trees_by_anchor + .insert(sprout_root, self.sprout_note_commitment_tree.clone()); + let sapling_root = self.sapling_note_commitment_tree.root(); + self.sapling_anchors.insert(sapling_root); + self.sapling_anchors_by_height.insert(height, sapling_root); + + let orchard_root = self.orchard_note_commitment_tree.root(); + self.orchard_anchors.insert(orchard_root); + self.orchard_anchors_by_height.insert(height, orchard_root); + + // TODO: update the history trees in a rayon thread, if they show up in CPU profiles + let history_tree_mut = Arc::make_mut(&mut self.history_tree); + history_tree_mut.push( + self.network, + contextually_valid.block.clone(), + sapling_root, + orchard_root, + )?; + + Ok(()) + } + + /// Update the chain tip with the `contextually_valid` block, + /// except for the note commitment and history tree updates. + /// + /// Used to implement `update_chain_tip_with::`. #[instrument(skip(self, contextually_valid), fields(block = %contextually_valid.block))] #[allow(clippy::unwrap_in_result)] - fn update_chain_tip_with( + fn update_chain_tip_with_block_except_trees( &mut self, contextually_valid: &ContextuallyValidBlock, ) -> Result<(), ValidateContextError> { @@ -840,39 +933,52 @@ impl UpdateWith for Chain { self.update_chain_tip_with(orchard_shielded_data)?; } - // Update the note commitment trees indexed by height. - self.sapling_trees_by_height - .insert(height, self.sapling_note_commitment_tree.clone()); - self.orchard_trees_by_height - .insert(height, self.orchard_note_commitment_tree.clone()); + // update the chain value pool balances + self.update_chain_tip_with(chain_value_pool_change)?; - // Having updated all the note commitment trees and nullifier sets in - // this block, the roots of the note commitment trees as of the last - // transaction are the treestates of this block. - let sprout_root = self.sprout_note_commitment_tree.root(); - self.sprout_anchors.insert(sprout_root); - self.sprout_anchors_by_height.insert(height, sprout_root); - self.sprout_trees_by_anchor - .insert(sprout_root, self.sprout_note_commitment_tree.clone()); - let sapling_root = self.sapling_note_commitment_tree.root(); - self.sapling_anchors.insert(sapling_root); - self.sapling_anchors_by_height.insert(height, sapling_root); + Ok(()) + } +} - let orchard_root = self.orchard_note_commitment_tree.root(); - self.orchard_anchors.insert(orchard_root); - self.orchard_anchors_by_height.insert(height, orchard_root); +/// The revert position being performed on a chain. +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +enum RevertPosition { + /// The chain root is being reverted via [`Chain::pop_root`], when a block + /// is finalized. + Root, - self.history_tree.push( - self.network, - contextually_valid.block.clone(), - sapling_root, - orchard_root, - )?; + /// The chain tip is being reverted via [`Chain::pop_tip`], + /// when a chain is forked. + Tip, +} - // update the chain value pool balances - self.update_chain_tip_with(chain_value_pool_change)?; +/// Helper trait to organize inverse operations done on the [`Chain`] type. +/// +/// Used to overload update and revert methods, based on the type of the argument, +/// and the position of the removed block in the chain. +/// +/// This trait was motivated by the length of the `push`, [`Chain::pop_root`], +/// and [`Chain::pop_tip`] functions, and fear that it would be easy to +/// introduce bugs when updating them, unless the code was reorganized to keep +/// related operations adjacent to each other. +trait UpdateWith { + /// When `T` is added to the chain tip, + /// update [`Chain`] cumulative data members to add data that are derived from `T`. + fn update_chain_tip_with(&mut self, _: &T) -> Result<(), ValidateContextError>; - Ok(()) + /// When `T` is removed from `position` in the chain, + /// revert [`Chain`] cumulative data members to remove data that are derived from `T`. + fn revert_chain_with(&mut self, _: &T, position: RevertPosition); +} + +impl UpdateWith for Chain { + #[instrument(skip(self, contextually_valid), fields(block = %contextually_valid.block))] + #[allow(clippy::unwrap_in_result)] + fn update_chain_tip_with( + &mut self, + contextually_valid: &ContextuallyValidBlock, + ) -> Result<(), ValidateContextError> { + self.update_chain_tip_with_block_parallel(contextually_valid) } #[instrument(skip(self, contextually_valid), fields(block = %contextually_valid.block))] @@ -1235,11 +1341,7 @@ impl UpdateWith>> for Chain { joinsplit_data: &Option>, ) -> Result<(), ValidateContextError> { if let Some(joinsplit_data) = joinsplit_data { - let sprout_ncm = Arc::make_mut(&mut self.sprout_note_commitment_tree); - - for cm in joinsplit_data.note_commitments() { - sprout_ncm.append(*cm)?; - } + // We do note commitment tree updates in parallel rayon threads. check::nullifier::add_to_non_finalized_chain_unique( &mut self.sprout_nullifiers, @@ -1285,14 +1387,7 @@ where sapling_shielded_data: &Option>, ) -> Result<(), ValidateContextError> { if let Some(sapling_shielded_data) = sapling_shielded_data { - let sapling_nct = Arc::make_mut(&mut self.sapling_note_commitment_tree); - - // The `_u` here indicates that the Sapling note commitment is - // specified only by the `u`-coordinate of the Jubjub curve - // point `(u, v)`. - for cm_u in sapling_shielded_data.note_commitments() { - sapling_nct.append(*cm_u)?; - } + // We do note commitment tree updates in parallel rayon threads. check::nullifier::add_to_non_finalized_chain_unique( &mut self.sapling_nullifiers, @@ -1335,11 +1430,7 @@ impl UpdateWith> for Chain { orchard_shielded_data: &Option, ) -> Result<(), ValidateContextError> { if let Some(orchard_shielded_data) = orchard_shielded_data { - let orchard_nct = Arc::make_mut(&mut self.orchard_note_commitment_tree); - - for cm_x in orchard_shielded_data.note_commitments() { - orchard_nct.append(*cm_x)?; - } + // We do note commitment tree updates in parallel rayon threads. check::nullifier::add_to_non_finalized_chain_unique( &mut self.orchard_nullifiers, diff --git a/zebra-state/src/service/non_finalized_state/tests/prop.rs b/zebra-state/src/service/non_finalized_state/tests/prop.rs index 410a7004fea..d2958ae82d6 100644 --- a/zebra-state/src/service/non_finalized_state/tests/prop.rs +++ b/zebra-state/src/service/non_finalized_state/tests/prop.rs @@ -551,18 +551,25 @@ fn different_blocks_different_chains() -> Result<()> { )| { let prev_block1 = vec1[0].clone(); let prev_block2 = vec2[0].clone(); + let height1 = prev_block1.coinbase_height().unwrap(); let height2 = prev_block1.coinbase_height().unwrap(); - let finalized_tree1: HistoryTree = if height1 >= Heartwood.activation_height(Network::Mainnet).unwrap() { - NonEmptyHistoryTree::from_block(Network::Mainnet, prev_block1, &Default::default(), &Default::default()).unwrap().into() + + let finalized_tree1: Arc = if height1 >= Heartwood.activation_height(Network::Mainnet).unwrap() { + Arc::new( + NonEmptyHistoryTree::from_block(Network::Mainnet, prev_block1, &Default::default(), &Default::default()).unwrap().into() + ) } else { Default::default() }; - let finalized_tree2 = if height2 >= NetworkUpgrade::Heartwood.activation_height(Network::Mainnet).unwrap() { - NonEmptyHistoryTree::from_block(Network::Mainnet, prev_block2, &Default::default(), &Default::default()).unwrap().into() + let finalized_tree2: Arc = if height2 >= NetworkUpgrade::Heartwood.activation_height(Network::Mainnet).unwrap() { + Arc::new( + NonEmptyHistoryTree::from_block(Network::Mainnet, prev_block2, &Default::default(), &Default::default()).unwrap().into() + ) } else { Default::default() }; + let chain1 = Chain::new(Network::Mainnet, Default::default(), Default::default(), Default::default(), finalized_tree1, ValueBalance::fake_populated_pool()); let chain2 = Chain::new(Network::Mainnet, Default::default(), Default::default(), Default::default(), finalized_tree2, ValueBalance::fake_populated_pool()); diff --git a/zebra-state/src/service/read.rs b/zebra-state/src/service/read.rs index a77da12862a..d611a35dd99 100644 --- a/zebra-state/src/service/read.rs +++ b/zebra-state/src/service/read.rs @@ -51,8 +51,7 @@ const FINALIZED_ADDRESS_INDEX_RETRIES: usize = 3; pub const ADDRESS_HEIGHTS_FULL_RANGE: RangeInclusive = Height(1)..=Height::MAX; /// Returns the [`Block`] with [`block::Hash`](zebra_chain::block::Hash) or -/// [`Height`](zebra_chain::block::Height), if it exists in the non-finalized -/// `chain` or finalized `db`. +/// [`Height`], if it exists in the non-finalized `chain` or finalized `db`. pub(crate) fn block( chain: Option, db: &ZebraDb, @@ -77,9 +76,8 @@ where .or_else(|| db.block(hash_or_height)) } -/// Returns the [`block:Header`] with [`block::Hash`](zebra_chain::block::Hash) or -/// [`Height`](zebra_chain::block::Height), if it exists in the non-finalized -/// `chain` or finalized `db`. +/// Returns the [`block::Header`] with [`block::Hash`](zebra_chain::block::Hash) or +/// [`Height`], if it exists in the non-finalized `chain` or finalized `db`. pub(crate) fn block_header( chain: Option, db: &ZebraDb,