diff --git a/Cargo.lock b/Cargo.lock index c7d752ed1fca8..8f99c87f9ba82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3875,6 +3875,7 @@ version = "0.1.0" dependencies = [ "anyhow", "aptos-crypto", + "aptos-drop-helper", "aptos-experimental-runtimes", "aptos-logger", "aptos-metrics-core", diff --git a/consensus/consensus-types/src/pipeline_execution_result.rs b/consensus/consensus-types/src/pipeline_execution_result.rs index 03b59ee25c283..813a105c09753 100644 --- a/consensus/consensus-types/src/pipeline_execution_result.rs +++ b/consensus/consensus-types/src/pipeline_execution_result.rs @@ -1,7 +1,7 @@ // Copyright (c) Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use aptos_executor_types::{ExecutorResult, StateComputeResult}; +use aptos_executor_types::{state_compute_result::StateComputeResult, ExecutorResult}; use aptos_types::transaction::SignedTransaction; use derivative::Derivative; use futures::future::BoxFuture; diff --git a/consensus/consensus-types/src/pipelined_block.rs b/consensus/consensus-types/src/pipelined_block.rs index 72713f530d778..eb137936696f6 100644 --- a/consensus/consensus-types/src/pipelined_block.rs +++ b/consensus/consensus-types/src/pipelined_block.rs @@ -11,7 +11,7 @@ use crate::{ vote_proposal::VoteProposal, }; use aptos_crypto::hash::{HashValue, ACCUMULATOR_PLACEHOLDER_HASH}; -use aptos_executor_types::{ExecutorResult, StateComputeResult}; +use aptos_executor_types::{state_compute_result::StateComputeResult, ExecutorResult}; use aptos_infallible::Mutex; use aptos_logger::{error, warn}; use aptos_types::{ diff --git a/consensus/src/block_storage/block_store.rs b/consensus/src/block_storage/block_store.rs index cbc4c85a139b5..3822a3dfe9ba3 100644 --- a/consensus/src/block_storage/block_store.rs +++ b/consensus/src/block_storage/block_store.rs @@ -29,7 +29,7 @@ use aptos_consensus_types::{ wrapped_ledger_info::WrappedLedgerInfo, }; use aptos_crypto::{hash::ACCUMULATOR_PLACEHOLDER_HASH, HashValue}; -use aptos_executor_types::StateComputeResult; +use aptos_executor_types::state_compute_result::StateComputeResult; use aptos_infallible::{Mutex, RwLock}; use aptos_logger::prelude::*; use aptos_types::{ diff --git a/consensus/src/dag/adapter.rs b/consensus/src/dag/adapter.rs index 36941b875cbdb..35650eab6ee72 100644 --- a/consensus/src/dag/adapter.rs +++ b/consensus/src/dag/adapter.rs @@ -25,7 +25,7 @@ use aptos_consensus_types::{ quorum_cert::QuorumCert, }; use aptos_crypto::HashValue; -use aptos_executor_types::StateComputeResult; +use aptos_executor_types::state_compute_result::StateComputeResult; use aptos_infallible::RwLock; use aptos_logger::{error, info}; use aptos_storage_interface::DbReader; diff --git a/consensus/src/execution_pipeline.rs b/consensus/src/execution_pipeline.rs index c95d6c8e1f0f3..ee11b60e12a5e 100644 --- a/consensus/src/execution_pipeline.rs +++ b/consensus/src/execution_pipeline.rs @@ -13,8 +13,8 @@ use crate::{ use aptos_consensus_types::{block::Block, pipeline_execution_result::PipelineExecutionResult}; use aptos_crypto::HashValue; use aptos_executor_types::{ - state_checkpoint_output::StateCheckpointOutput, BlockExecutorTrait, ExecutorError, - ExecutorResult, StateComputeResult, + state_checkpoint_output::StateCheckpointOutput, state_compute_result::StateComputeResult, + BlockExecutorTrait, ExecutorError, ExecutorResult, }; use aptos_experimental_runtimes::thread_manager::optimal_min_len; use aptos_logger::{debug, warn}; @@ -293,7 +293,7 @@ impl ExecutionPipeline { let executor = executor.clone(); Box::pin(async move { tokio::task::spawn_blocking(move || { - executor.pre_commit_block(block_id, parent_block_id) + executor.pre_commit_block(block_id) }) .await .expect("failed to spawn_blocking")?; @@ -307,7 +307,6 @@ impl ExecutionPipeline { pre_commit_tx .send(PreCommitCommand { block_id, - parent_block_id, pre_commit_hook_fut, result_tx: pre_commit_result_tx, lifetime_guard, @@ -335,7 +334,6 @@ impl ExecutionPipeline { ) { while let Some(PreCommitCommand { block_id, - parent_block_id, pre_commit_hook_fut, result_tx, lifetime_guard, @@ -346,9 +344,7 @@ impl ExecutionPipeline { let executor = executor.clone(); monitor!( "pre_commit", - tokio::task::spawn_blocking(move || { - executor.pre_commit_block(block_id, parent_block_id) - }) + tokio::task::spawn_blocking(move || { executor.pre_commit_block(block_id) }) ) .await .expect("Failed to spawn_blocking().")?; @@ -402,7 +398,6 @@ struct LedgerApplyCommand { struct PreCommitCommand { block_id: HashValue, - parent_block_id: HashValue, pre_commit_hook_fut: BoxFuture<'static, ()>, result_tx: oneshot::Sender>, lifetime_guard: CountedRequest<()>, diff --git a/consensus/src/pipeline/buffer_item.rs b/consensus/src/pipeline/buffer_item.rs index c16d8430c42b8..3326eb2f23f96 100644 --- a/consensus/src/pipeline/buffer_item.rs +++ b/consensus/src/pipeline/buffer_item.rs @@ -464,7 +464,7 @@ mod test { use super::*; use aptos_consensus_types::{block::Block, block_data::BlockData}; use aptos_crypto::HashValue; - use aptos_executor_types::StateComputeResult; + use aptos_executor_types::state_compute_result::StateComputeResult; use aptos_types::{ aggregate_signature::AggregateSignature, ledger_info::LedgerInfo, diff --git a/consensus/src/pipeline/tests/execution_phase_tests.rs b/consensus/src/pipeline/tests/execution_phase_tests.rs index bd582b44b2673..a0d7eb6a76c43 100644 --- a/consensus/src/pipeline/tests/execution_phase_tests.rs +++ b/consensus/src/pipeline/tests/execution_phase_tests.rs @@ -20,7 +20,7 @@ use aptos_consensus_types::{ quorum_cert::QuorumCert, }; use aptos_crypto::HashValue; -use aptos_executor_types::{ExecutorError, StateComputeResult}; +use aptos_executor_types::{state_compute_result::StateComputeResult, ExecutorError}; use aptos_types::{ledger_info::LedgerInfo, validator_verifier::random_validator_verifier}; use async_trait::async_trait; use std::sync::{ diff --git a/consensus/src/pipeline/tests/test_utils.rs b/consensus/src/pipeline/tests/test_utils.rs index e099abaceb183..fdeb3fafa261f 100644 --- a/consensus/src/pipeline/tests/test_utils.rs +++ b/consensus/src/pipeline/tests/test_utils.rs @@ -11,7 +11,7 @@ use aptos_consensus_types::{ vote_proposal::VoteProposal, }; use aptos_crypto::{hash::ACCUMULATOR_PLACEHOLDER_HASH, HashValue}; -use aptos_executor_types::StateComputeResult; +use aptos_executor_types::state_compute_result::StateComputeResult; use aptos_infallible::Mutex; use aptos_safety_rules::{ test_utils::{make_proposal_with_parent, make_proposal_with_qc}, diff --git a/consensus/src/rand/rand_gen/test_utils.rs b/consensus/src/rand/rand_gen/test_utils.rs index b31dcb30e87e5..f24c310c697a9 100644 --- a/consensus/src/rand/rand_gen/test_utils.rs +++ b/consensus/src/rand/rand_gen/test_utils.rs @@ -13,7 +13,7 @@ use aptos_consensus_types::{ quorum_cert::QuorumCert, }; use aptos_crypto::HashValue; -use aptos_executor_types::StateComputeResult; +use aptos_executor_types::state_compute_result::StateComputeResult; use aptos_types::{ aggregate_signature::AggregateSignature, ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, diff --git a/consensus/src/state_computer.rs b/consensus/src/state_computer.rs index 1fe6a69d92fbd..fb8230e013c00 100644 --- a/consensus/src/state_computer.rs +++ b/consensus/src/state_computer.rs @@ -24,7 +24,9 @@ use aptos_consensus_types::{ pipelined_block::PipelinedBlock, }; use aptos_crypto::HashValue; -use aptos_executor_types::{BlockExecutorTrait, ExecutorResult, StateComputeResult}; +use aptos_executor_types::{ + state_compute_result::StateComputeResult, BlockExecutorTrait, ExecutorResult, +}; use aptos_infallible::RwLock; use aptos_logger::prelude::*; use aptos_metrics_core::IntGauge; @@ -462,9 +464,7 @@ async fn test_commit_sync_race() { }; use aptos_config::config::transaction_filter_type::Filter; use aptos_consensus_notifications::Error; - use aptos_executor_types::{ - state_checkpoint_output::StateCheckpointOutput, StateComputeResult, - }; + use aptos_executor_types::state_checkpoint_output::StateCheckpointOutput; use aptos_infallible::Mutex; use aptos_types::{ aggregate_signature::AggregateSignature, @@ -516,11 +516,7 @@ async fn test_commit_sync_race() { todo!() } - fn pre_commit_block( - &self, - _block_id: HashValue, - _parent_block_id: HashValue, - ) -> ExecutorResult<()> { + fn pre_commit_block(&self, _block_id: HashValue) -> ExecutorResult<()> { todo!() } diff --git a/consensus/src/state_computer_tests.rs b/consensus/src/state_computer_tests.rs index bbcb5167f1b27..f9979f9941024 100644 --- a/consensus/src/state_computer_tests.rs +++ b/consensus/src/state_computer_tests.rs @@ -12,8 +12,8 @@ use aptos_consensus_notifications::{ConsensusNotificationSender, Error}; use aptos_consensus_types::{block::Block, block_data::BlockData}; use aptos_crypto::HashValue; use aptos_executor_types::{ - state_checkpoint_output::StateCheckpointOutput, BlockExecutorTrait, ExecutorResult, - StateComputeResult, + state_checkpoint_output::StateCheckpointOutput, state_compute_result::StateComputeResult, + BlockExecutorTrait, ExecutorResult, }; use aptos_infallible::Mutex; use aptos_types::{ @@ -144,11 +144,7 @@ impl BlockExecutorTrait for DummyBlockExecutor { Ok(StateComputeResult::new_dummy_with_input_txns(txns)) } - fn pre_commit_block( - &self, - _block_id: HashValue, - _parent_block_id: HashValue, - ) -> ExecutorResult<()> { + fn pre_commit_block(&self, _block_id: HashValue) -> ExecutorResult<()> { Ok(()) } diff --git a/consensus/src/test_utils/mock_state_computer.rs b/consensus/src/test_utils/mock_state_computer.rs index 58cba2eeace05..ff620acb8a0bb 100644 --- a/consensus/src/test_utils/mock_state_computer.rs +++ b/consensus/src/test_utils/mock_state_computer.rs @@ -17,7 +17,9 @@ use aptos_consensus_types::{ pipelined_block::PipelinedBlock, }; use aptos_crypto::HashValue; -use aptos_executor_types::{ExecutorError, ExecutorResult, StateComputeResult}; +use aptos_executor_types::{ + state_compute_result::StateComputeResult, ExecutorError, ExecutorResult, +}; use aptos_logger::debug; use aptos_types::{ block_executor::config::BlockExecutorConfigFromOnchain, epoch_state::EpochState, diff --git a/execution/executor-benchmark/src/ledger_update_stage.rs b/execution/executor-benchmark/src/ledger_update_stage.rs index ceb2f54fa793b..43aeb328a69ce 100644 --- a/execution/executor-benchmark/src/ledger_update_stage.rs +++ b/execution/executor-benchmark/src/ledger_update_stage.rs @@ -75,10 +75,7 @@ where output.root_hash(), self.version, ); - let parent_block_id = self.executor.committed_block_id(); - self.executor - .pre_commit_block(block_id, parent_block_id) - .unwrap(); + self.executor.pre_commit_block(block_id).unwrap(); self.executor.commit_ledger(ledger_info_with_sigs).unwrap(); }, CommitProcessing::Skip => {}, diff --git a/execution/executor-benchmark/src/transaction_committer.rs b/execution/executor-benchmark/src/transaction_committer.rs index 64f5f689433fa..e3aa70b7c534b 100644 --- a/execution/executor-benchmark/src/transaction_committer.rs +++ b/execution/executor-benchmark/src/transaction_committer.rs @@ -85,12 +85,9 @@ where .inc_by(num_txns as u64); self.version += num_txns as u64; - let commit_start = std::time::Instant::now(); + let commit_start = Instant::now(); let ledger_info_with_sigs = gen_li_with_sigs(block_id, root_hash, self.version); - let parent_block_id = self.executor.committed_block_id(); - self.executor - .pre_commit_block(block_id, parent_block_id) - .unwrap(); + self.executor.pre_commit_block(block_id).unwrap(); self.executor.commit_ledger(ledger_info_with_sigs).unwrap(); report_block( diff --git a/execution/executor-test-helpers/src/lib.rs b/execution/executor-test-helpers/src/lib.rs index 8bf2adcf0ea5e..b42280ec8d073 100644 --- a/execution/executor-test-helpers/src/lib.rs +++ b/execution/executor-test-helpers/src/lib.rs @@ -10,7 +10,7 @@ use aptos_crypto::{ HashValue, }; use aptos_executor::db_bootstrapper::{generate_waypoint, maybe_bootstrap}; -use aptos_executor_types::StateComputeResult; +use aptos_executor_types::state_compute_result::StateComputeResult; use aptos_storage_interface::DbReaderWriter; use aptos_types::{ account_address::AccountAddress, diff --git a/execution/executor-types/src/ledger_update_output.rs b/execution/executor-types/src/ledger_update_output.rs index f0928c8f4bf8f..d5f0f101c998c 100644 --- a/execution/executor-types/src/ledger_update_output.rs +++ b/execution/executor-types/src/ledger_update_output.rs @@ -3,14 +3,12 @@ #![forbid(unsafe_code)] -use crate::StateComputeResult; use anyhow::{ensure, Result}; use aptos_crypto::HashValue; use aptos_drop_helper::DropHelper; use aptos_storage_interface::cached_state_view::ShardedStateCache; use aptos_types::{ contract_event::ContractEvent, - epoch_state::EpochState, proof::accumulator::InMemoryTransactionAccumulator, state_store::ShardedStateUpdates, transaction::{ @@ -38,11 +36,6 @@ impl LedgerUpdateOutput { Self::new_impl(Inner::new_dummy_with_input_txns(txns)) } - #[cfg(any(test, feature = "fuzzing"))] - pub fn new_dummy_with_txns_to_commit(txns: Vec) -> Self { - Self::new_impl(Inner::new_dummy_with_txns_to_commit(txns)) - } - pub fn new_dummy_with_root_hash(root_hash: HashValue) -> Self { Self::new_impl(Inner::new_dummy_with_root_hash(root_hash)) } @@ -80,13 +73,6 @@ impl LedgerUpdateOutput { inner: Arc::new(DropHelper::new(inner)), } } - - pub fn as_state_compute_result( - &self, - next_epoch_state: Option, - ) -> StateComputeResult { - StateComputeResult::new(self.clone(), next_epoch_state) - } } #[derive(Default, Debug)] @@ -135,14 +121,6 @@ impl Inner { } } - #[cfg(any(test, feature = "fuzzing"))] - pub fn new_dummy_with_txns_to_commit(txns: Vec) -> Self { - Self { - to_commit: txns, - ..Default::default() - } - } - pub fn new_dummy_with_root_hash(root_hash: HashValue) -> Self { let transaction_accumulator = Arc::new( InMemoryTransactionAccumulator::new_empty_with_root_hash(root_hash), diff --git a/execution/executor-types/src/lib.rs b/execution/executor-types/src/lib.rs index 3af3e304f6a90..b0b1dc81987bf 100644 --- a/execution/executor-types/src/lib.rs +++ b/execution/executor-types/src/lib.rs @@ -5,37 +5,29 @@ use crate::state_checkpoint_output::StateCheckpointOutput; use anyhow::Result; -use aptos_crypto::{ - hash::{TransactionAccumulatorHasher, ACCUMULATOR_PLACEHOLDER_HASH}, - HashValue, -}; +use aptos_crypto::HashValue; use aptos_scratchpad::{ProofRead, SparseMerkleTree}; use aptos_types::{ account_config::NEW_EPOCH_EVENT_MOVE_TYPE_TAG, block_executor::{config::BlockExecutorConfigFromOnchain, partitioner::ExecutableBlock}, contract_event::ContractEvent, dkg::DKG_START_EVENT_MOVE_TYPE_TAG, - epoch_state::EpochState, jwks::OBSERVED_JWK_UPDATED_MOVE_TYPE_TAG, ledger_info::LedgerInfoWithSignatures, - proof::{ - accumulator::InMemoryTransactionAccumulator, AccumulatorExtensionProof, - SparseMerkleProofExt, - }, + proof::SparseMerkleProofExt, state_store::{state_key::StateKey, state_value::StateValue}, transaction::{ Transaction, TransactionInfo, TransactionListWithProof, TransactionOutputListWithProof, - TransactionStatus, Version, + Version, }, write_set::WriteSet, }; pub use error::{ExecutorError, ExecutorResult}; pub use ledger_update_output::LedgerUpdateOutput; pub use parsed_transaction_output::ParsedTransactionOutput; +use state_compute_result::StateComputeResult; use std::{ - cmp::max, collections::{BTreeSet, HashMap}, - fmt::Debug, ops::Deref, sync::{ atomic::{AtomicBool, Ordering}, @@ -47,6 +39,7 @@ mod error; mod ledger_update_output; pub mod parsed_transaction_output; pub mod state_checkpoint_output; +pub mod state_compute_result; pub trait ChunkExecutorTrait: Send + Sync { /// Verifies the transactions based on the provided proofs and ledger info. If the transactions @@ -167,19 +160,13 @@ pub trait BlockExecutorTrait: Send + Sync { block_ids: Vec, ledger_info_with_sigs: LedgerInfoWithSignatures, ) -> ExecutorResult<()> { - let mut parent_block_id = self.committed_block_id(); for block_id in block_ids { - self.pre_commit_block(block_id, parent_block_id)?; - parent_block_id = block_id; + self.pre_commit_block(block_id)?; } self.commit_ledger(ledger_info_with_sigs) } - fn pre_commit_block( - &self, - block_id: HashValue, - parent_block_id: HashValue, - ) -> ExecutorResult<()>; + fn pre_commit_block(&self, block_id: HashValue) -> ExecutorResult<()>; fn commit_ledger(&self, ledger_info_with_sigs: LedgerInfoWithSignatures) -> ExecutorResult<()>; @@ -280,129 +267,6 @@ pub struct ChunkCommitNotification { pub reconfiguration_occurred: bool, } -/// A structure that summarizes the result of the execution needed for consensus to agree on. -/// The execution is responsible for generating the ID of the new state, which is returned in the -/// result. -/// -/// Not every transaction in the payload succeeds: the returned vector keeps the boolean status -/// of success / failure of the transactions. -/// Note that the specific details of compute_status are opaque to StateMachineReplication, -/// which is going to simply pass the results between StateComputer and PayloadClient. -#[derive(Debug, Default, Clone)] -pub struct StateComputeResult { - ledger_update_output: LedgerUpdateOutput, - /// If set, this is the new epoch info that should be changed to if this is committed. - next_epoch_state: Option, -} - -impl StateComputeResult { - pub fn new( - ledger_update_output: LedgerUpdateOutput, - next_epoch_state: Option, - ) -> Self { - Self { - ledger_update_output, - next_epoch_state, - } - } - - pub fn new_empty(transaction_accumulator: Arc) -> Self { - Self { - ledger_update_output: LedgerUpdateOutput::new_empty(transaction_accumulator), - next_epoch_state: None, - } - } - - /// generate a new dummy state compute result with a given root hash. - /// this function is used in RandomComputeResultStateComputer to assert that the compute - /// function is really called. - pub fn new_dummy_with_root_hash(root_hash: HashValue) -> Self { - Self { - ledger_update_output: LedgerUpdateOutput::new_dummy_with_root_hash(root_hash), - next_epoch_state: None, - } - } - - /// generate a new dummy state compute result with ACCUMULATOR_PLACEHOLDER_HASH as the root hash. - /// this function is used in ordering_state_computer as a dummy state compute result, - /// where the real compute result is generated after ordering_state_computer.commit pushes - /// the blocks and the finality proof to the execution phase. - pub fn new_dummy() -> Self { - StateComputeResult::new_dummy_with_root_hash(*ACCUMULATOR_PLACEHOLDER_HASH) - } - - #[cfg(any(test, feature = "fuzzing"))] - pub fn new_dummy_with_input_txns(txns: Vec) -> Self { - Self { - ledger_update_output: LedgerUpdateOutput::new_dummy_with_input_txns(txns), - next_epoch_state: None, - } - } - - pub fn version(&self) -> Version { - max(self.ledger_update_output.next_version(), 1) - .checked_sub(1) - .expect("Integer overflow occurred") - } - - pub fn root_hash(&self) -> HashValue { - self.ledger_update_output.transaction_accumulator.root_hash - } - - pub fn compute_status_for_input_txns(&self) -> &Vec { - &self.ledger_update_output.statuses_for_input_txns - } - - pub fn transactions_to_commit_len(&self) -> usize { - self.ledger_update_output.to_commit.len() - } - - /// On top of input transactions (which contain BlockMetadata and Validator txns), - /// filter out those that should be committed, and add StateCheckpoint/BlockEpilogue if needed. - pub fn transactions_to_commit(&self) -> Vec { - self.ledger_update_output - .to_commit - .iter() - .map(|t| t.transaction.clone()) - .collect() - } - - pub fn epoch_state(&self) -> &Option { - &self.next_epoch_state - } - - pub fn extension_proof(&self) -> AccumulatorExtensionProof { - AccumulatorExtensionProof::new( - self.ledger_update_output - .transaction_accumulator - .frozen_subtree_roots - .clone(), - self.ledger_update_output.transaction_accumulator.num_leaves, - self.transaction_info_hashes().to_vec(), - ) - } - - pub fn transaction_info_hashes(&self) -> &Vec { - &self.ledger_update_output.transaction_info_hashes - } - - pub fn num_leaves(&self) -> u64 { - self.ledger_update_output.next_version() - } - - pub fn has_reconfiguration(&self) -> bool { - self.next_epoch_state.is_some() - } - - pub fn subscribable_events(&self) -> &[ContractEvent] { - &self.ledger_update_output.subscribable_events - } - - pub fn is_reconfiguration_suffix(&self) -> bool { - self.has_reconfiguration() && self.compute_status_for_input_txns().is_empty() - } -} - pub struct ProofReader { proofs: HashMap, } diff --git a/execution/executor-types/src/state_compute_result.rs b/execution/executor-types/src/state_compute_result.rs new file mode 100644 index 0000000000000..8d67930555726 --- /dev/null +++ b/execution/executor-types/src/state_compute_result.rs @@ -0,0 +1,176 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ChunkCommitNotification, LedgerUpdateOutput}; +use aptos_crypto::{ + hash::{TransactionAccumulatorHasher, ACCUMULATOR_PLACEHOLDER_HASH}, + HashValue, +}; +use aptos_storage_interface::{chunk_to_commit::ChunkToCommit, state_delta::StateDelta}; +use aptos_types::{ + contract_event::ContractEvent, + epoch_state::EpochState, + proof::{accumulator::InMemoryTransactionAccumulator, AccumulatorExtensionProof}, + transaction::{Transaction, TransactionStatus, Version}, +}; +use std::{cmp::max, sync::Arc}; + +/// A structure that summarizes the result of the execution needed for consensus to agree on. +/// The execution is responsible for generating the ID of the new state, which is returned in the +/// result. +/// +/// Not every transaction in the payload succeeds: the returned vector keeps the boolean status +/// of success / failure of the transactions. +/// Note that the specific details of compute_status are opaque to StateMachineReplication, +/// which is going to simply pass the results between StateComputer and PayloadClient. +#[derive(Debug, Default, Clone)] +pub struct StateComputeResult { + pub parent_state: Arc, + pub result_state: Arc, + pub ledger_update_output: LedgerUpdateOutput, + /// If set, this is the new epoch info that should be changed to if this is committed. + pub next_epoch_state: Option, +} + +impl StateComputeResult { + pub fn new( + parent_state: Arc, + result_state: Arc, + ledger_update_output: LedgerUpdateOutput, + next_epoch_state: Option, + ) -> Self { + Self { + parent_state, + result_state, + ledger_update_output, + next_epoch_state, + } + } + + pub fn new_empty(transaction_accumulator: Arc) -> Self { + let result_state = Arc::new(StateDelta::new_empty()); + Self { + parent_state: result_state.clone(), + result_state, + ledger_update_output: LedgerUpdateOutput::new_empty(transaction_accumulator), + next_epoch_state: None, + } + } + + /// generate a new dummy state compute result with a given root hash. + /// this function is used in RandomComputeResultStateComputer to assert that the compute + /// function is really called. + pub fn new_dummy_with_root_hash(root_hash: HashValue) -> Self { + let result_state = Arc::new(StateDelta::new_empty()); + Self { + parent_state: result_state.clone(), + result_state, + ledger_update_output: LedgerUpdateOutput::new_dummy_with_root_hash(root_hash), + next_epoch_state: None, + } + } + + /// generate a new dummy state compute result with ACCUMULATOR_PLACEHOLDER_HASH as the root hash. + /// this function is used in ordering_state_computer as a dummy state compute result, + /// where the real compute result is generated after ordering_state_computer.commit pushes + /// the blocks and the finality proof to the execution phase. + pub fn new_dummy() -> Self { + StateComputeResult::new_dummy_with_root_hash(*ACCUMULATOR_PLACEHOLDER_HASH) + } + + #[cfg(any(test, feature = "fuzzing"))] + pub fn new_dummy_with_input_txns(txns: Vec) -> Self { + let result_state = Arc::new(StateDelta::new_empty()); + Self { + parent_state: result_state.clone(), + result_state, + ledger_update_output: LedgerUpdateOutput::new_dummy_with_input_txns(txns), + next_epoch_state: None, + } + } + + pub fn version(&self) -> Version { + max(self.ledger_update_output.next_version(), 1) + .checked_sub(1) + .expect("Integer overflow occurred") + } + + pub fn root_hash(&self) -> HashValue { + self.ledger_update_output.transaction_accumulator.root_hash + } + + pub fn compute_status_for_input_txns(&self) -> &Vec { + &self.ledger_update_output.statuses_for_input_txns + } + + pub fn transactions_to_commit_len(&self) -> usize { + self.ledger_update_output.to_commit.len() + } + + /// On top of input transactions (which contain BlockMetadata and Validator txns), + /// filter out those that should be committed, and add StateCheckpoint/BlockEpilogue if needed. + pub fn transactions_to_commit(&self) -> Vec { + self.ledger_update_output + .to_commit + .iter() + .map(|t| t.transaction.clone()) + .collect() + } + + pub fn epoch_state(&self) -> &Option { + &self.next_epoch_state + } + + pub fn extension_proof(&self) -> AccumulatorExtensionProof { + AccumulatorExtensionProof::new( + self.ledger_update_output + .transaction_accumulator + .frozen_subtree_roots + .clone(), + self.ledger_update_output.transaction_accumulator.num_leaves, + self.transaction_info_hashes().to_vec(), + ) + } + + pub fn transaction_info_hashes(&self) -> &Vec { + &self.ledger_update_output.transaction_info_hashes + } + + pub fn num_leaves(&self) -> u64 { + self.ledger_update_output.next_version() + } + + pub fn has_reconfiguration(&self) -> bool { + self.next_epoch_state.is_some() + } + + pub fn subscribable_events(&self) -> &[ContractEvent] { + &self.ledger_update_output.subscribable_events + } + + pub fn is_reconfiguration_suffix(&self) -> bool { + self.has_reconfiguration() && self.compute_status_for_input_txns().is_empty() + } + + pub fn make_chunk_commit_notification(&self) -> ChunkCommitNotification { + ChunkCommitNotification { + subscribable_events: self.ledger_update_output.subscribable_events.clone(), + committed_transactions: self.transactions_to_commit(), + reconfiguration_occurred: self.has_reconfiguration(), + } + } + + pub fn as_chunk_to_commit(&self) -> ChunkToCommit { + ChunkToCommit { + first_version: self.ledger_update_output.first_version(), + base_state_version: self.parent_state.base_version, + txns_to_commit: &self.ledger_update_output.to_commit, + latest_in_memory_state: &self.result_state, + state_updates_until_last_checkpoint: self + .ledger_update_output + .state_updates_until_last_checkpoint + .as_ref(), + sharded_state_cache: Some(&self.ledger_update_output.sharded_state_cache), + } + } +} diff --git a/execution/executor/src/block_executor.rs b/execution/executor/src/block_executor.rs index 7cbc3811f718f..72426a15dd951 100644 --- a/execution/executor/src/block_executor.rs +++ b/execution/executor/src/block_executor.rs @@ -6,9 +6,8 @@ use crate::{ components::{ - apply_chunk_output::ApplyChunkOutput, - block_tree::{block_output::BlockOutput, BlockTree}, - chunk_output::ChunkOutput, + apply_chunk_output::ApplyChunkOutput, block_tree::BlockTree, chunk_output::ChunkOutput, + partial_state_compute_result::PartialStateComputeResult, }, logging::{LogEntry, LogSchema}, metrics::{ @@ -19,8 +18,8 @@ use crate::{ use anyhow::Result; use aptos_crypto::HashValue; use aptos_executor_types::{ - state_checkpoint_output::StateCheckpointOutput, BlockExecutorTrait, ExecutorError, - ExecutorResult, StateComputeResult, + state_checkpoint_output::StateCheckpointOutput, state_compute_result::StateComputeResult, + BlockExecutorTrait, ExecutorError, ExecutorResult, }; use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER; use aptos_infallible::RwLock; @@ -146,18 +145,14 @@ where .ledger_update(block_id, parent_block_id, state_checkpoint_output) } - fn pre_commit_block( - &self, - block_id: HashValue, - parent_block_id: HashValue, - ) -> ExecutorResult<()> { + fn pre_commit_block(&self, block_id: HashValue) -> ExecutorResult<()> { let _guard = CONCURRENCY_GAUGE.concurrency_with(&["block", "pre_commit_block"]); self.inner .read() .as_ref() .expect("BlockExecutor is not reset") - .pre_commit_block(block_id, parent_block_id) + .pre_commit_block(block_id) } fn commit_ledger(&self, ledger_info_with_sigs: LedgerInfoWithSignatures) -> ExecutorResult<()> { @@ -279,7 +274,7 @@ where let _ = self.block_tree.add_block( parent_block_id, block_id, - BlockOutput::new(state, epoch_state), + PartialStateComputeResult::new(parent_output.result_state.clone(), state, epoch_state), )?; Ok(state_checkpoint_output) } @@ -306,15 +301,12 @@ where // At this point of time two things must happen // 1. The block tree must also have the current block id with or without the ledger update output. // 2. We must have the ledger update output of the parent block. - let parent_output = parent_block.output.get_ledger_update(); + let parent_output = parent_block.output.expect_ledger_update_output(); let parent_accumulator = parent_output.txn_accumulator(); - let current_output = block_vec.pop().expect("Must exist").unwrap(); + let block = block_vec.pop().expect("Must exist").unwrap(); parent_block.ensure_has_child(block_id)?; - if current_output.output.has_ledger_update() { - return Ok(current_output - .output - .get_ledger_update() - .as_state_compute_result(current_output.output.epoch_state().clone())); + if let Some(complete_result) = block.output.get_complete_result() { + return Ok(complete_result); } let output = @@ -334,50 +326,35 @@ where output }; - if !current_output.output.has_reconfiguration() { + if !block.output.has_reconfiguration() { output.ensure_ends_with_state_checkpoint()?; } - let state_compute_result = - output.as_state_compute_result(current_output.output.epoch_state().clone()); - current_output.output.set_ledger_update(output); - Ok(state_compute_result) + block.output.set_ledger_update_output(output); + Ok(block.output.expect_complete_result()) } - fn pre_commit_block( - &self, - block_id: HashValue, - parent_block_id: HashValue, - ) -> ExecutorResult<()> { + fn pre_commit_block(&self, block_id: HashValue) -> ExecutorResult<()> { let _timer = COMMIT_BLOCKS.start_timer(); info!( LogSchema::new(LogEntry::BlockExecutor).block_id(block_id), "pre_commit_block", ); - let mut blocks = self.block_tree.get_blocks(&[parent_block_id, block_id])?; - let block = blocks.pop().expect("guaranteed"); - let parent_block = blocks.pop().expect("guaranteed"); - - let result_in_memory_state = block.output.state().clone(); + let block = self.block_tree.get_block(block_id)?; fail_point!("executor::pre_commit_block", |_| { Err(anyhow::anyhow!("Injected error in pre_commit_block.").into()) }); - let ledger_update = block.output.get_ledger_update(); - if !ledger_update.transactions_to_commit().is_empty() { + let output = block.output.expect_complete_result(); + let num_txns = output.transactions_to_commit_len(); + if num_txns != 0 { let _timer = SAVE_TRANSACTIONS.start_timer(); - self.db.writer.pre_commit_ledger( - ledger_update.transactions_to_commit(), - ledger_update.first_version(), - parent_block.output.state().base_version, - false, - result_in_memory_state, - ledger_update.state_updates_until_last_checkpoint.as_ref(), - Some(&ledger_update.sharded_state_cache), - )?; - TRANSACTIONS_SAVED.observe(ledger_update.num_txns() as f64); + self.db + .writer + .pre_commit_ledger(output.as_chunk_to_commit(), false)?; + TRANSACTIONS_SAVED.observe(num_txns as f64); } Ok(()) diff --git a/execution/executor/src/chunk_executor.rs b/execution/executor/src/chunk_executor.rs index c826ceb7fcc56..4feb6a4bcb076 100644 --- a/execution/executor/src/chunk_executor.rs +++ b/execution/executor/src/chunk_executor.rs @@ -11,13 +11,13 @@ use crate::{ chunk_output::ChunkOutput, chunk_result_verifier::{ChunkResultVerifier, ReplayChunkVerifier, StateSyncChunkVerifier}, executed_chunk::ExecutedChunk, + partial_state_compute_result::PartialStateComputeResult, transaction_chunk::{ChunkToApply, ChunkToExecute, TransactionChunk}, }, logging::{LogEntry, LogSchema}, metrics::{APPLY_CHUNK, CHUNK_OTHER_TIMERS, COMMIT_CHUNK, CONCURRENCY_GAUGE, EXECUTE_CHUNK}, }; use anyhow::{anyhow, ensure, Result}; -use aptos_drop_helper::DEFAULT_DROPPER; use aptos_executor_types::{ ChunkCommitNotification, ChunkExecutorTrait, ParsedTransactionOutput, TransactionReplayer, VerifyExecutionMode, @@ -246,38 +246,29 @@ impl ChunkExecutorInner { fn commit_chunk_impl(&self) -> Result { let _timer = CHUNK_OTHER_TIMERS.timer_with(&["commit_chunk_impl__total"]); - let (persisted_state, chunk) = { + let chunk = { let _timer = CHUNK_OTHER_TIMERS.timer_with(&["commit_chunk_impl__next_chunk_to_commit"]); self.commit_queue.lock().next_chunk_to_commit()? }; - if chunk.ledger_info.is_some() || !chunk.transactions_to_commit().is_empty() { + let output = chunk.output.expect_complete_result(); + let num_txns = output.transactions_to_commit_len(); + if chunk.ledger_info_opt.is_some() || num_txns != 0 { let _timer = CHUNK_OTHER_TIMERS.timer_with(&["commit_chunk_impl__save_txns"]); fail_point!("executor::commit_chunk", |_| { Err(anyhow::anyhow!("Injected error in commit_chunk")) }); + let output = chunk.output.expect_complete_result(); self.db.writer.save_transactions( - chunk.transactions_to_commit(), - persisted_state.next_version(), - persisted_state.base_version, - chunk.ledger_info.as_ref(), + output.as_chunk_to_commit(), + chunk.ledger_info_opt.as_ref(), false, // sync_commit - chunk.result_state.clone(), - chunk - .ledger_update_output - .state_updates_until_last_checkpoint - .as_ref(), - Some(&chunk.ledger_update_output.sharded_state_cache), )?; } - DEFAULT_DROPPER.schedule_drop(persisted_state); - let _timer = CHUNK_OTHER_TIMERS.timer_with(&["commit_chunk_impl__dequeue_and_return"]); - self.commit_queue - .lock() - .dequeue_committed(chunk.result_state.clone())?; + self.commit_queue.lock().dequeue_committed()?; Ok(chunk) } @@ -322,9 +313,12 @@ impl ChunkExecutorInner { self.commit_queue .lock() .enqueue_for_ledger_update(ChunkToUpdateLedger { - result_state, + output: PartialStateComputeResult::new( + parent_state.clone(), + result_state, + next_epoch_state, + ), state_checkpoint_output, - next_epoch_state, chunk_verifier, })?; @@ -347,9 +341,8 @@ impl ChunkExecutorInner { self.commit_queue.lock().next_chunk_to_update_ledger()? }; let ChunkToUpdateLedger { - result_state, + output, state_checkpoint_output, - next_epoch_state, chunk_verifier, } = chunk; @@ -368,16 +361,18 @@ impl ChunkExecutorInner { let ledger_info_opt = chunk_verifier.maybe_select_chunk_ending_ledger_info( &ledger_update_output, - next_epoch_state.as_ref(), + output.next_epoch_state.as_ref(), )?; + output.set_ledger_update_output(ledger_update_output); let executed_chunk = ExecutedChunk { - result_state, - ledger_info: ledger_info_opt, - next_epoch_state, - ledger_update_output, + output, + ledger_info_opt, }; - let num_txns = executed_chunk.transactions_to_commit().len(); + let num_txns = executed_chunk + .output + .expect_complete_result() + .transactions_to_commit_len(); let _timer = CHUNK_OTHER_TIMERS.timer_with(&["chunk_update_ledger__save"]); self.commit_queue @@ -400,7 +395,10 @@ impl ChunkExecutorInner { let commit_notification = { let _timer = CHUNK_OTHER_TIMERS.timer_with(&["commit_chunk__into_chunk_commit_notification"]); - executed_chunk.into_chunk_commit_notification() + executed_chunk + .output + .expect_complete_result() + .make_chunk_commit_notification() }; Ok(commit_notification) @@ -505,6 +503,7 @@ impl ChunkExecutorInner { ); Ok(chunk + .output .result_state .current_version .expect("Version must exist after commit.")) diff --git a/execution/executor/src/components/apply_chunk_output.rs b/execution/executor/src/components/apply_chunk_output.rs index ba358ec2b9caa..321579363ba02 100644 --- a/execution/executor/src/components/apply_chunk_output.rs +++ b/execution/executor/src/components/apply_chunk_output.rs @@ -9,6 +9,7 @@ use crate::{ chunk_output::{update_counters_for_processed_chunk, ChunkOutput}, executed_chunk::ExecutedChunk, in_memory_state_calculator_v2::InMemoryStateCalculatorV2, + partial_state_compute_result::PartialStateComputeResult, }, metrics::{EXECUTOR_ERRORS, OTHER_TIMERS}, }; @@ -48,7 +49,7 @@ impl ApplyChunkOutput { append_state_checkpoint_to_block: Option, known_state_checkpoints: Option>>, is_block: bool, - ) -> Result<(StateDelta, Option, StateCheckpointOutput)> { + ) -> Result<(Arc, Option, StateCheckpointOutput)> { let ChunkOutput { state_cache, transactions, @@ -107,7 +108,11 @@ impl ApplyChunkOutput { .check_and_update_state_checkpoint_hashes(state_checkpoint_hashes)?; } - Ok((result_state, next_epoch_state, state_checkpoint_output)) + Ok(( + Arc::new(result_state), + next_epoch_state, + state_checkpoint_output, + )) } pub fn calculate_ledger_update( @@ -182,13 +187,17 @@ impl ApplyChunkOutput { state_checkpoint_output, base_view.txn_accumulator().clone(), )?; + let output = PartialStateComputeResult::new( + base_view.state().clone(), + result_state, + next_epoch_state, + ); + output.set_ledger_update_output(ledger_update_output); Ok(( ExecutedChunk { - result_state, - ledger_info: None, - next_epoch_state, - ledger_update_output, + output, + ledger_info_opt: None, }, to_discard, to_retry, diff --git a/execution/executor/src/components/block_tree/block_output.rs b/execution/executor/src/components/block_tree/block_output.rs deleted file mode 100644 index 3604604eaa948..0000000000000 --- a/execution/executor/src/components/block_tree/block_output.rs +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -#![forbid(unsafe_code)] - -use aptos_executor_types::LedgerUpdateOutput; -use aptos_storage_interface::state_delta::StateDelta; -use aptos_types::epoch_state::EpochState; -use once_cell::sync::OnceCell; - -pub struct BlockOutput { - state: StateDelta, - /// If set, this is the new epoch info that should be changed to if this is committed. - next_epoch_state: Option, - ledger_update_output: OnceCell, -} - -impl BlockOutput { - pub fn new(state: StateDelta, next_epoch_state: Option) -> Self { - Self { - state, - next_epoch_state, - ledger_update_output: OnceCell::new(), - } - } - - pub fn new_with_ledger_update( - state: StateDelta, - next_epoch_state: Option, - ledger_update_output: LedgerUpdateOutput, - ) -> Self { - let ledger_update = OnceCell::new(); - ledger_update.set(ledger_update_output).unwrap(); - Self { - state, - next_epoch_state, - ledger_update_output: ledger_update, - } - } - - pub fn epoch_state(&self) -> &Option { - &self.next_epoch_state - } - - pub fn has_reconfiguration(&self) -> bool { - self.next_epoch_state.is_some() - } - - pub fn has_ledger_update(&self) -> bool { - self.ledger_update_output.get().is_some() - } - - pub fn get_ledger_update(&self) -> &LedgerUpdateOutput { - self.ledger_update_output.get().unwrap() - } - - pub fn set_ledger_update(&self, ledger_update_output: LedgerUpdateOutput) { - self.ledger_update_output - .set(ledger_update_output) - .expect("LedgerUpdateOutput already set"); - } - - pub fn next_version(&self) -> u64 { - self.state().current_version.unwrap() + 1 - } - - pub fn is_same_state(&self, rhs: &Self) -> bool { - self.state().has_same_current_state(rhs.state()) - } - - pub fn state(&self) -> &StateDelta { - &self.state - } -} diff --git a/execution/executor/src/components/block_tree/mod.rs b/execution/executor/src/components/block_tree/mod.rs index 57a417014158f..c8cca189bf267 100644 --- a/execution/executor/src/components/block_tree/mod.rs +++ b/execution/executor/src/components/block_tree/mod.rs @@ -4,21 +4,22 @@ #![forbid(unsafe_code)] -pub mod block_output; #[cfg(test)] mod test; -use crate::logging::{LogEntry, LogSchema}; +use crate::{ + components::partial_state_compute_result::PartialStateComputeResult, + logging::{LogEntry, LogSchema}, +}; use anyhow::{anyhow, ensure, Result}; use aptos_consensus_types::block::Block as ConsensusBlock; use aptos_crypto::HashValue; use aptos_drop_helper::DEFAULT_DROPPER; -use aptos_executor_types::{ExecutorError, LedgerUpdateOutput}; +use aptos_executor_types::ExecutorError; use aptos_infallible::Mutex; use aptos_logger::{debug, info}; use aptos_storage_interface::DbReader; use aptos_types::{ledger_info::LedgerInfo, proof::definition::LeafCount}; -use block_output::BlockOutput; use std::{ collections::{hash_map::Entry, HashMap}, sync::{mpsc::Receiver, Arc, Weak}, @@ -26,7 +27,7 @@ use std::{ pub struct Block { pub id: HashValue, - pub output: BlockOutput, + pub output: PartialStateComputeResult, children: Mutex>>, block_lookup: Arc, } @@ -48,7 +49,7 @@ impl Block { pub fn num_persisted_transactions(&self) -> LeafCount { self.output - .get_ledger_update() + .expect_ledger_update_output() .txn_accumulator() .num_leaves() } @@ -94,7 +95,7 @@ impl BlockLookupInner { fn fetch_or_add_block( &mut self, id: HashValue, - output: BlockOutput, + output: PartialStateComputeResult, parent_id: Option, block_lookup: &Arc, ) -> Result<(Arc, bool, Option>)> { @@ -150,7 +151,7 @@ impl BlockLookup { fn fetch_or_add_block( self: &Arc, id: HashValue, - output: BlockOutput, + output: PartialStateComputeResult, parent_id: Option, ) -> Result> { let (block, existing, parent_block) = self @@ -226,10 +227,10 @@ impl BlockTree { ledger_info.consensus_block_id() }; - let output = BlockOutput::new_with_ledger_update( + let output = PartialStateComputeResult::new_empty_completed( ledger_view.state().clone(), + ledger_view.txn_accumulator().clone(), None, - LedgerUpdateOutput::new_empty(ledger_view.txn_accumulator().clone()), ); block_lookup.fetch_or_add_block(id, output, None) @@ -252,16 +253,14 @@ impl BlockTree { .original_reconfiguration_block_id(committed_block_id), "Updated with a new root block as a virtual block of reconfiguration block" ); - let output = BlockOutput::new_with_ledger_update( - last_committed_block.output.state().clone(), + let commited_output = last_committed_block.output.expect_complete_result(); + let output = PartialStateComputeResult::new_empty_completed( + commited_output.result_state.clone(), + commited_output + .ledger_update_output + .transaction_accumulator + .clone(), None, - LedgerUpdateOutput::new_empty( - last_committed_block - .output - .get_ledger_update() - .txn_accumulator() - .clone(), - ), ); self.block_lookup .fetch_or_add_block(epoch_genesis_id, output, None)? @@ -291,7 +290,7 @@ impl BlockTree { &self, parent_block_id: HashValue, id: HashValue, - output: BlockOutput, + output: PartialStateComputeResult, ) -> Result> { self.block_lookup .fetch_or_add_block(id, output, Some(parent_block_id)) diff --git a/execution/executor/src/components/block_tree/test.rs b/execution/executor/src/components/block_tree/test.rs index 5318fcb78dd1c..b18acd7dbda82 100644 --- a/execution/executor/src/components/block_tree/test.rs +++ b/execution/executor/src/components/block_tree/test.rs @@ -2,11 +2,11 @@ // Parts of the project are originally copyright © Meta Platforms, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::components::block_tree::{ - block_output::BlockOutput, epoch_genesis_block_id, BlockLookup, BlockTree, +use crate::components::{ + block_tree::{epoch_genesis_block_id, BlockLookup, BlockTree}, + partial_state_compute_result::PartialStateComputeResult, }; use aptos_crypto::{hash::PRE_GENESIS_BLOCK_ID, HashValue}; -use aptos_executor_types::LedgerUpdateOutput; use aptos_infallible::Mutex; use aptos_storage_interface::ExecutedTrees; use aptos_types::{block_info::BlockInfo, epoch_state::EpochState, ledger_info::LedgerInfo}; @@ -38,12 +38,12 @@ fn id(index: u64) -> HashValue { HashValue::new(buf) } -fn empty_block() -> BlockOutput { +fn empty_block() -> PartialStateComputeResult { let result_view = ExecutedTrees::new_empty(); - BlockOutput::new_with_ledger_update( + PartialStateComputeResult::new_empty_completed( result_view.state().clone(), + result_view.transaction_accumulator.clone(), None, - LedgerUpdateOutput::new_empty(ExecutedTrees::new_empty().txn_accumulator().clone()), ) } diff --git a/execution/executor/src/components/chunk_commit_queue.rs b/execution/executor/src/components/chunk_commit_queue.rs index d796079bf812e..ddfd14caca790 100644 --- a/execution/executor/src/components/chunk_commit_queue.rs +++ b/execution/executor/src/components/chunk_commit_queue.rs @@ -6,22 +6,18 @@ use crate::components::{ chunk_result_verifier::ChunkResultVerifier, executed_chunk::ExecutedChunk, + partial_state_compute_result::PartialStateComputeResult, }; use anyhow::{anyhow, ensure, Result}; use aptos_executor_types::state_checkpoint_output::StateCheckpointOutput; use aptos_storage_interface::{state_delta::StateDelta, DbReader, ExecutedTrees}; -use aptos_types::{ - epoch_state::EpochState, proof::accumulator::InMemoryTransactionAccumulator, - transaction::Version, -}; +use aptos_types::{proof::accumulator::InMemoryTransactionAccumulator, transaction::Version}; use std::{collections::VecDeque, sync::Arc}; pub(crate) struct ChunkToUpdateLedger { - pub result_state: StateDelta, + pub output: PartialStateComputeResult, /// transactions sorted by status, state roots, state updates pub state_checkpoint_output: StateCheckpointOutput, - /// If set, this is the new epoch info that should be changed to if this is committed. - pub next_epoch_state: Option, /// from the input -- can be checked / used only after the transaction accumulator /// is updated. @@ -32,15 +28,13 @@ pub(crate) struct ChunkToUpdateLedger { /// (front) (front) /// / / /// ... | to_commit | to_update_ledger | ---> (txn version increases) -/// \ \ \ -/// \ \ latest_state -/// \ latest_txn_accumulator -/// persisted_state +/// \ \ +/// \ latest_state +/// latest_txn_accumulator /// pub struct ChunkCommitQueue { - persisted_state: StateDelta, /// Notice that latest_state and latest_txn_accumulator are at different versions. - latest_state: StateDelta, + latest_state: Arc, latest_txn_accumulator: Arc, to_commit: VecDeque>, to_update_ledger: VecDeque>, @@ -53,7 +47,6 @@ impl ChunkCommitQueue { transaction_accumulator, } = db.get_latest_executed_trees()?; Ok(Self { - persisted_state: state.clone(), latest_state: state, latest_txn_accumulator: transaction_accumulator, to_commit: VecDeque::new(), @@ -61,7 +54,7 @@ impl ChunkCommitQueue { }) } - pub(crate) fn latest_state(&self) -> StateDelta { + pub(crate) fn latest_state(&self) -> Arc { self.latest_state.clone() } @@ -73,7 +66,7 @@ impl ChunkCommitQueue { &mut self, chunk_to_update_ledger: ChunkToUpdateLedger, ) -> Result<()> { - self.latest_state = chunk_to_update_ledger.result_state.clone(); + self.latest_state = chunk_to_update_ledger.output.result_state.clone(); self.to_update_ledger .push_back(Some(chunk_to_update_ledger)); Ok(()) @@ -101,14 +94,18 @@ impl ChunkCommitQueue { self.to_update_ledger.front().unwrap().is_none(), "Head of to_update_ledger has not been processed." ); - self.latest_txn_accumulator = chunk.ledger_update_output.transaction_accumulator.clone(); + self.latest_txn_accumulator = chunk + .output + .expect_ledger_update_output() + .transaction_accumulator + .clone(); self.to_update_ledger.pop_front(); self.to_commit.push_back(Some(chunk)); Ok(()) } - pub(crate) fn next_chunk_to_commit(&mut self) -> Result<(StateDelta, ExecutedChunk)> { + pub(crate) fn next_chunk_to_commit(&mut self) -> Result { let chunk_opt = self .to_commit .front_mut() @@ -116,20 +113,16 @@ impl ChunkCommitQueue { let chunk = chunk_opt .take() .ok_or_else(|| anyhow!("Next chunk to commit has already been processed."))?; - Ok((self.persisted_state.clone(), chunk)) + Ok(chunk) } - pub(crate) fn dequeue_committed(&mut self, latest_state: StateDelta) -> Result<()> { + pub(crate) fn dequeue_committed(&mut self) -> Result<()> { ensure!(!self.to_commit.is_empty(), "to_commit is empty."); ensure!( self.to_commit.front().unwrap().is_none(), "Head of to_commit has not been processed." ); self.to_commit.pop_front(); - self.persisted_state = latest_state; - self.persisted_state - .current - .log_generation("commit_queue_base"); Ok(()) } diff --git a/execution/executor/src/components/chunk_output.rs b/execution/executor/src/components/chunk_output.rs index 9c5019dd8db64..2e321b3bc2110 100644 --- a/execution/executor/src/components/chunk_output.rs +++ b/execution/executor/src/components/chunk_output.rs @@ -154,7 +154,7 @@ impl ChunkOutput { self, parent_state: &StateDelta, block_id: HashValue, - ) -> Result<(StateDelta, Option, StateCheckpointOutput)> { + ) -> Result<(Arc, Option, StateCheckpointOutput)> { fail_point!("executor::into_state_checkpoint_output", |_| { Err(anyhow::anyhow!( "Injected error in into_state_checkpoint_output." diff --git a/execution/executor/src/components/executed_chunk.rs b/execution/executor/src/components/executed_chunk.rs index 57009d3ec33c9..f1d8654729d6e 100644 --- a/execution/executor/src/components/executed_chunk.rs +++ b/execution/executor/src/components/executed_chunk.rs @@ -4,107 +4,17 @@ #![forbid(unsafe_code)] -use aptos_executor_types::{ - should_forward_to_subscription_service, ChunkCommitNotification, LedgerUpdateOutput, -}; -use aptos_storage_interface::{state_delta::StateDelta, ExecutedTrees}; -use aptos_types::{ - epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures, - transaction::TransactionToCommit, -}; +use crate::components::partial_state_compute_result::PartialStateComputeResult; +use aptos_types::{ledger_info::LedgerInfoWithSignatures, transaction::TransactionToCommit}; #[derive(Debug)] pub struct ExecutedChunk { - pub result_state: StateDelta, - pub ledger_info: Option, - /// If set, this is the new epoch info that should be changed to if this is committed. - pub next_epoch_state: Option, - pub ledger_update_output: LedgerUpdateOutput, + pub output: PartialStateComputeResult, + pub ledger_info_opt: Option, } impl ExecutedChunk { - pub fn transactions_to_commit(&self) -> &Vec { - &self.ledger_update_output.to_commit + pub fn transactions_to_commit(&self) -> &[TransactionToCommit] { + &self.output.expect_ledger_update_output().to_commit } - - pub fn has_reconfiguration(&self) -> bool { - self.next_epoch_state.is_some() - } - - pub fn result_view(&self) -> ExecutedTrees { - ExecutedTrees::new( - self.result_state.clone(), - self.ledger_update_output.transaction_accumulator.clone(), - ) - } - - pub fn into_chunk_commit_notification(self) -> ChunkCommitNotification { - let reconfiguration_occurred = self.has_reconfiguration(); - - let mut committed_transactions = - Vec::with_capacity(self.ledger_update_output.to_commit.len()); - let mut subscribable_events = - Vec::with_capacity(self.ledger_update_output.to_commit.len() * 2); - for txn_to_commit in &self.ledger_update_output.to_commit { - let TransactionToCommit { - transaction, - events, - .. - } = txn_to_commit; - committed_transactions.push(transaction.clone()); - subscribable_events.extend( - events - .iter() - .filter(|evt| should_forward_to_subscription_service(evt)) - .cloned(), - ); - } - - ChunkCommitNotification { - committed_transactions, - subscribable_events, - reconfiguration_occurred, - } - } - - #[cfg(any(test, feature = "fuzzing"))] - pub fn dummy() -> Self { - Self { - result_state: Default::default(), - ledger_info: None, - next_epoch_state: None, - ledger_update_output: Default::default(), - } - } -} - -#[test] -fn into_chunk_commit_notification_should_apply_event_filters() { - use aptos_types::{account_config::NewEpochEvent, contract_event::ContractEvent}; - let event_1 = ContractEvent::new_v2_with_type_tag_str( - "0x2345::random_module::RandomEvent", - b"random_data_x".to_vec(), - ); - let event_2 = - ContractEvent::new_v2_with_type_tag_str("0x1::dkg::DKGStartEvent", b"dkg_data_2".to_vec()); - let event_3 = ContractEvent::new_v2_with_type_tag_str( - "0x6789::random_module::RandomEvent", - b"random_data_y".to_vec(), - ); - let event_4 = ContractEvent::from((1, NewEpochEvent::dummy())); - - let ledger_update_output = LedgerUpdateOutput::new_dummy_with_txns_to_commit(vec![ - TransactionToCommit::dummy_with_events(vec![event_1.clone()]), - TransactionToCommit::dummy_with_events(vec![event_2.clone(), event_3.clone()]), - TransactionToCommit::dummy_with_events(vec![event_4.clone()]), - ]); - - let chunk = ExecutedChunk { - ledger_update_output, - ..ExecutedChunk::dummy() - }; - - let notification = chunk.into_chunk_commit_notification(); - - assert_eq!(vec![event_2, event_4], notification.subscribable_events); } diff --git a/execution/executor/src/components/in_memory_state_calculator_v2.rs b/execution/executor/src/components/in_memory_state_calculator_v2.rs index 055477fcc85ac..fa39929928bf0 100644 --- a/execution/executor/src/components/in_memory_state_calculator_v2.rs +++ b/execution/executor/src/components/in_memory_state_calculator_v2.rs @@ -4,7 +4,6 @@ use crate::metrics::OTHER_TIMERS; use anyhow::{anyhow, ensure, Result}; use aptos_crypto::{hash::CryptoHash, HashValue}; -use aptos_drop_helper::DEFAULT_DROPPER; use aptos_executor_types::{parsed_transaction_output::TransactionsWithParsedOutput, ProofReader}; use aptos_logger::info; use aptos_metrics_core::TimerHelper; @@ -28,7 +27,7 @@ use arr_macro::arr; use dashmap::DashMap; use itertools::zip_eq; use rayon::prelude::*; -use std::collections::HashMap; +use std::{collections::HashMap, ops::Deref}; struct StateCacheView<'a> { base: &'a ShardedStateCache, @@ -235,12 +234,11 @@ impl InMemoryStateCalculatorV2 { .smt }; - DEFAULT_DROPPER.schedule_drop(frozen_base); - let updates_since_latest_checkpoint = if last_checkpoint_index.is_some() { updates_after_last_checkpoint } else { - let mut updates_since_latest_checkpoint = base.updates_since_base.clone(); + let mut updates_since_latest_checkpoint = + base.updates_since_base.deref().deref().clone(); zip_eq( updates_since_latest_checkpoint.iter_mut(), updates_after_last_checkpoint, diff --git a/execution/executor/src/components/mod.rs b/execution/executor/src/components/mod.rs index e4a96049b339c..27c9b6ca74ec1 100644 --- a/execution/executor/src/components/mod.rs +++ b/execution/executor/src/components/mod.rs @@ -12,4 +12,5 @@ pub mod in_memory_state_calculator_v2; pub mod chunk_result_verifier; pub mod executed_chunk; +pub mod partial_state_compute_result; pub mod transaction_chunk; diff --git a/execution/executor/src/components/partial_state_compute_result.rs b/execution/executor/src/components/partial_state_compute_result.rs new file mode 100644 index 0000000000000..f2e87565daab9 --- /dev/null +++ b/execution/executor/src/components/partial_state_compute_result.rs @@ -0,0 +1,125 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +#![forbid(unsafe_code)] + +use anyhow::Result; +use aptos_executor_types::{state_compute_result::StateComputeResult, LedgerUpdateOutput}; +use aptos_storage_interface::{ + async_proof_fetcher::AsyncProofFetcher, cached_state_view::CachedStateView, + state_delta::StateDelta, DbReader, +}; +use aptos_types::{ + epoch_state::EpochState, proof::accumulator::InMemoryTransactionAccumulator, + state_store::StateViewId, +}; +use once_cell::sync::OnceCell; +use std::sync::Arc; + +#[derive(Debug)] +pub struct PartialStateComputeResult { + pub parent_state: Arc, + pub result_state: Arc, + /// If set, this is the new epoch info that should be changed to if this is committed. + pub next_epoch_state: Option, + pub ledger_update_output: OnceCell, +} + +impl PartialStateComputeResult { + pub fn new( + parent_state: Arc, + result_state: Arc, + next_epoch_state: Option, + ) -> Self { + Self { + parent_state, + result_state, + next_epoch_state, + ledger_update_output: OnceCell::new(), + } + } + + pub fn new_empty_completed( + state: Arc, + txn_accumulator: Arc, + next_epoch_state: Option, + ) -> Self { + let ledger_update_output = OnceCell::new(); + ledger_update_output + .set(LedgerUpdateOutput::new_empty(txn_accumulator)) + .expect("First set."); + + Self { + parent_state: state.clone(), + result_state: state, + next_epoch_state, + ledger_update_output, + } + } + + pub fn epoch_state(&self) -> &Option { + &self.next_epoch_state + } + + pub fn has_reconfiguration(&self) -> bool { + self.next_epoch_state.is_some() + } + + pub fn get_ledger_update_output(&self) -> Option<&LedgerUpdateOutput> { + self.ledger_update_output.get() + } + + pub fn expect_ledger_update_output(&self) -> &LedgerUpdateOutput { + self.ledger_update_output + .get() + .expect("LedgerUpdateOutput not set") + } + + pub fn set_ledger_update_output(&self, ledger_update_output: LedgerUpdateOutput) { + self.ledger_update_output + .set(ledger_update_output) + .expect("LedgerUpdateOutput already set"); + } + + pub fn next_version(&self) -> u64 { + self.state().current_version.unwrap() + 1 + } + + pub fn is_same_state(&self, rhs: &Self) -> bool { + self.state().has_same_current_state(rhs.state()) + } + + pub fn state(&self) -> &Arc { + &self.result_state + } + + pub fn get_complete_result(&self) -> Option { + self.ledger_update_output.get().map(|ledger_update_output| { + StateComputeResult::new( + self.parent_state.clone(), + self.result_state.clone(), + ledger_update_output.clone(), + self.next_epoch_state.clone(), + ) + }) + } + + pub fn expect_complete_result(&self) -> StateComputeResult { + self.get_complete_result().expect("Result is not complete.") + } + + pub fn verified_state_view( + &self, + id: StateViewId, + reader: Arc, + proof_fetcher: Arc, + ) -> Result { + Ok(CachedStateView::new( + id, + reader, + self.next_version(), + self.result_state.current.clone(), + proof_fetcher, + )?) + } +} diff --git a/execution/executor/src/db_bootstrapper.rs b/execution/executor/src/db_bootstrapper.rs index c27d91bef7375..02f74c12118c6 100644 --- a/execution/executor/src/db_bootstrapper.rs +++ b/execution/executor/src/db_bootstrapper.rs @@ -21,7 +21,7 @@ use aptos_types::{ on_chain_config::ConfigurationResource, state_store::{state_key::StateKey, StateViewId, TStateView}, timestamp::TimestampResource, - transaction::{Transaction, Version}, + transaction::Transaction, waypoint::Waypoint, }; use aptos_vm::VMExecutor; @@ -60,7 +60,7 @@ pub fn maybe_bootstrap( waypoint, committer.waypoint(), ); - let ledger_info = committer.output.ledger_info.clone(); + let ledger_info = committer.output.ledger_info_opt.clone(); committer.commit()?; Ok(ledger_info) } @@ -68,18 +68,13 @@ pub fn maybe_bootstrap( pub struct GenesisCommitter { db: Arc, output: ExecutedChunk, - base_state_version: Option, waypoint: Waypoint, } impl GenesisCommitter { - pub fn new( - db: Arc, - output: ExecutedChunk, - base_state_version: Option, - ) -> Result { + pub fn new(db: Arc, output: ExecutedChunk) -> Result { let ledger_info = output - .ledger_info + .ledger_info_opt .as_ref() .ok_or_else(|| anyhow!("LedgerInfo missing."))? .ledger_info(); @@ -89,7 +84,6 @@ impl GenesisCommitter { db, output, waypoint, - base_state_version, }) } @@ -99,21 +93,12 @@ impl GenesisCommitter { pub fn commit(self) -> Result<()> { self.db.save_transactions( - self.output.transactions_to_commit(), self.output - .ledger_update_output - .transaction_accumulator - .num_leaves() - - 1, - self.base_state_version, - self.output.ledger_info.as_ref(), + .output + .expect_complete_result() + .as_chunk_to_commit(), + self.output.ledger_info_opt.as_ref(), true, /* sync_commit */ - self.output.result_state.clone(), - self.output - .ledger_update_output - .state_updates_until_last_checkpoint - .as_ref(), - Some(&self.output.ledger_update_output.sharded_state_cache), )?; info!("Genesis commited."); // DB bootstrapped, avoid anything that could fail after this. @@ -143,14 +128,15 @@ pub fn calculate_genesis( get_state_epoch(&base_state_view)? }; - let (mut output, _, _) = ChunkOutput::by_transaction_execution::( + let (mut chunk, _, _) = ChunkOutput::by_transaction_execution::( vec![genesis_txn.clone().into()].into(), base_state_view, BlockExecutorConfigFromOnchain::new_no_block_limit(), )? .apply_to_ledger(&executed_trees, None)?; + let output = &chunk.output; ensure!( - !output.transactions_to_commit().is_empty(), + !chunk.transactions_to_commit().is_empty(), "Genesis txn execution failed." ); @@ -158,7 +144,7 @@ pub fn calculate_genesis( // TODO(aldenhu): fix existing tests before using real timestamp and check on-chain epoch. GENESIS_TIMESTAMP_USECS } else { - let state_view = output.result_view().verified_state_view( + let state_view = output.verified_state_view( StateViewId::Miscellaneous, Arc::clone(&db.reader), Arc::new(AsyncProofFetcher::new(db.reader.clone())), @@ -177,6 +163,7 @@ pub fn calculate_genesis( "Genesis txn didn't output reconfig event." ); + let output = output.expect_complete_result(); let ledger_info_with_sigs = LedgerInfoWithSignatures::new( LedgerInfo::new( BlockInfo::new( @@ -195,16 +182,12 @@ pub fn calculate_genesis( ), AggregateSignature::empty(), /* signatures */ ); - output.ledger_info = Some(ledger_info_with_sigs); + chunk.ledger_info_opt = Some(ledger_info_with_sigs); - let committer = GenesisCommitter::new( - db.writer.clone(), - output, - executed_trees.state().base_version, - )?; + let committer = GenesisCommitter::new(db.writer.clone(), chunk)?; info!( "Genesis calculated: ledger_info_with_sigs {:?}, waypoint {:?}", - &committer.output.ledger_info, committer.waypoint, + &committer.output.ledger_info_opt, committer.waypoint, ); Ok(committer) } diff --git a/execution/executor/src/fuzzing.rs b/execution/executor/src/fuzzing.rs index d801df7234792..2621e4b3fcd7e 100644 --- a/execution/executor/src/fuzzing.rs +++ b/execution/executor/src/fuzzing.rs @@ -10,9 +10,8 @@ use anyhow::Result; use aptos_crypto::{hash::SPARSE_MERKLE_PLACEHOLDER_HASH, HashValue}; use aptos_executor_types::BlockExecutorTrait; use aptos_storage_interface::{ - cached_state_view::{CachedStateView, ShardedStateCache}, - state_delta::StateDelta, - DbReader, DbReaderWriter, DbWriter, + cached_state_view::CachedStateView, chunk_to_commit::ChunkToCommit, DbReader, DbReaderWriter, + DbWriter, }; use aptos_types::{ block_executor::{ @@ -20,7 +19,7 @@ use aptos_types::{ partitioner::{ExecutableTransactions, PartitionedTransactions}, }, ledger_info::LedgerInfoWithSignatures, - state_store::{ShardedStateUpdates, StateView}, + state_store::StateView, test_helpers::transaction_test_helpers::TEST_BLOCK_EXECUTOR_ONCHAIN_CONFIG, transaction::{ signature_verified_transaction::{ @@ -115,13 +114,8 @@ impl DbReader for FakeDb { impl DbWriter for FakeDb { fn pre_commit_ledger( &self, - _txns_to_commit: &[TransactionToCommit], - _first_version: Version, - _base_state_version: Option, + _chunk: ChunkToCommit, _sync_commit: bool, - _latest_in_memory_state: StateDelta, - _state_updates_until_last_checkpoint: Option<&ShardedStateUpdates>, - _sharded_state_cache: Option<&ShardedStateCache>, ) -> aptos_storage_interface::Result<()> { Ok(()) } diff --git a/execution/executor/src/tests/chunk_executor_tests.rs b/execution/executor/src/tests/chunk_executor_tests.rs index f7e7154aac8be..46de72e5d06fd 100644 --- a/execution/executor/src/tests/chunk_executor_tests.rs +++ b/execution/executor/src/tests/chunk_executor_tests.rs @@ -344,9 +344,7 @@ fn commit_1_pre_commit_2_return_3() -> ( output.root_hash(), ledger_info.ledger_info().transaction_accumulator_hash() ); - block_executor - .pre_commit_block(block_id, parent_block_id) - .unwrap(); + block_executor.pre_commit_block(block_id).unwrap(); parent_block_id = block_id; } // commit till block 1 diff --git a/execution/executor/src/tests/mod.rs b/execution/executor/src/tests/mod.rs index 89b1e68dae89e..e34cab4e0eccb 100644 --- a/execution/executor/src/tests/mod.rs +++ b/execution/executor/src/tests/mod.rs @@ -291,9 +291,7 @@ fn test_executor_commit_twice() { ) .unwrap(); let ledger_info = gen_ledger_info(6, output1.root_hash(), block1_id, 1); - executor - .pre_commit_block(block1_id, executor.committed_block_id()) - .unwrap(); + executor.pre_commit_block(block1_id).unwrap(); executor.commit_ledger(ledger_info.clone()).unwrap(); executor.commit_ledger(ledger_info).unwrap(); } @@ -388,9 +386,7 @@ fn create_blocks_and_chunks( ) .unwrap(); assert_eq!(output.version(), version); - block_executor - .pre_commit_block(block_id, parent_block_id) - .unwrap(); + block_executor.pre_commit_block(block_id).unwrap(); let ledger_info = gen_ledger_info(version, output.root_hash(), block_id, version); out_blocks.push((txns, ledger_info)); parent_block_id = block_id; @@ -494,24 +490,15 @@ fn apply_transaction_by_writeset( let (executed, _, _) = chunk_output.apply_to_ledger(&ledger_view, None).unwrap(); let ExecutedChunk { - result_state, - ledger_info, - next_epoch_state: _, - ledger_update_output, + output, + ledger_info_opt, } = executed; db.writer .save_transactions( - &ledger_update_output.to_commit, - ledger_view.txn_accumulator().num_leaves(), - ledger_view.state().base_version, - ledger_info.as_ref(), + output.expect_complete_result().as_chunk_to_commit(), + ledger_info_opt.as_ref(), true, /* sync_commit */ - result_state, - ledger_update_output - .state_updates_until_last_checkpoint - .as_ref(), - Some(&ledger_update_output.sharded_state_cache), ) .unwrap(); } @@ -688,9 +675,9 @@ fn run_transactions_naive( ) -> HashValue { let executor = TestExecutor::new(); let db = &executor.db; - let mut ledger_view: ExecutedTrees = db.reader.get_latest_executed_trees().unwrap(); for txn in transactions { + let ledger_view: ExecutedTrees = db.reader.get_latest_executed_trees().unwrap(); let out = ChunkOutput::by_transaction_execution::( vec![txn].into(), ledger_view @@ -704,30 +691,23 @@ fn run_transactions_naive( ) .unwrap(); let (executed, _, _) = out.apply_to_ledger(&ledger_view, None).unwrap(); - let next_ledger_view = executed.result_view(); let ExecutedChunk { - result_state, - ledger_info, - next_epoch_state: _, - ledger_update_output, + output, + ledger_info_opt, } = executed; db.writer .save_transactions( - &ledger_update_output.to_commit, - ledger_view.txn_accumulator().num_leaves(), - ledger_view.state().base_version, - ledger_info.as_ref(), + output.expect_complete_result().as_chunk_to_commit(), + ledger_info_opt.as_ref(), true, /* sync_commit */ - result_state, - ledger_update_output - .state_updates_until_last_checkpoint - .as_ref(), - Some(&ledger_update_output.sharded_state_cache), ) .unwrap(); - ledger_view = next_ledger_view; } - ledger_view.txn_accumulator().root_hash() + db.reader + .get_latest_executed_trees() + .unwrap() + .transaction_accumulator + .root_hash() } proptest! { diff --git a/state-sync/state-sync-driver/src/tests/mocks.rs b/state-sync/state-sync-driver/src/tests/mocks.rs index 382c2d24ec579..246a8b68602ef 100644 --- a/state-sync/state-sync-driver/src/tests/mocks.rs +++ b/state-sync/state-sync-driver/src/tests/mocks.rs @@ -16,8 +16,8 @@ use aptos_data_streaming_service::{ }; use aptos_executor_types::{ChunkCommitNotification, ChunkExecutorTrait}; use aptos_storage_interface::{ - cached_state_view::ShardedStateCache, state_delta::StateDelta, DbReader, DbReaderWriter, - DbWriter, ExecutedTrees, Order, Result, StateSnapshotReceiver, + chunk_to_commit::ChunkToCommit, DbReader, DbReaderWriter, DbWriter, ExecutedTrees, Order, + Result, StateSnapshotReceiver, }; use aptos_types::{ account_address::AccountAddress, @@ -34,11 +34,10 @@ use aptos_types::{ state_store::{ state_key::StateKey, state_value::{StateValue, StateValueChunkWithProof}, - ShardedStateUpdates, }, transaction::{ AccountTransactionsWithProof, TransactionListWithProof, TransactionOutputListWithProof, - TransactionToCommit, TransactionWithProof, Version, + TransactionWithProof, Version, }, }; use async_trait::async_trait; @@ -326,14 +325,9 @@ mock! { fn save_transactions<'a, 'b>( &self, - txns_to_commit: &[TransactionToCommit], - first_version: Version, - base_state_version: Option, + chunk: ChunkToCommit<'b>, ledger_info_with_sigs: Option<&'a LedgerInfoWithSignatures>, sync_commit: bool, - in_memory_state: StateDelta, - state_updates_until_last_checkpoint: Option<&'b ShardedStateUpdates>, - sharded_state_cache: Option<&'b ShardedStateCache>, ) -> Result<()>; } } diff --git a/storage/aptosdb/src/backup/test.rs b/storage/aptosdb/src/backup/test.rs index 4b0ddd053c93f..b5d537c4a3e1c 100644 --- a/storage/aptosdb/src/backup/test.rs +++ b/storage/aptosdb/src/backup/test.rs @@ -29,7 +29,7 @@ proptest! { cur_ver.checked_sub(1), Some(ledger_info_with_sigs), true, // sync commit - in_memory_state.clone(), + &in_memory_state, ) .unwrap(); cur_ver += txns_to_commit.len() as u64; diff --git a/storage/aptosdb/src/db/aptosdb_test.rs b/storage/aptosdb/src/db/aptosdb_test.rs index 78a58842283c3..4d023085f57bf 100644 --- a/storage/aptosdb/src/db/aptosdb_test.rs +++ b/storage/aptosdb/src/db/aptosdb_test.rs @@ -262,7 +262,7 @@ pub fn test_state_merkle_pruning_impl( next_ver.checked_sub(1), /* base_state_version */ Some(ledger_info_with_sigs), true, /* sync_commit */ - in_memory_state.clone(), + &in_memory_state, ) .unwrap(); diff --git a/storage/aptosdb/src/db/fake_aptosdb.rs b/storage/aptosdb/src/db/fake_aptosdb.rs index 7c576d12e039c..948d7e9504280 100644 --- a/storage/aptosdb/src/db/fake_aptosdb.rs +++ b/storage/aptosdb/src/db/fake_aptosdb.rs @@ -406,7 +406,7 @@ impl FakeAptosDB { base_state_version, ledger_info_with_sigs, sync_commit, - latest_in_memory_state.clone(), + &latest_in_memory_state, )?; } @@ -1008,7 +1008,7 @@ mod tests { cur_ver.checked_sub(1), /* base_state_version */ Some(ledger_info_with_sigs), false, /* sync_commit */ - in_memory_state.clone(), + &in_memory_state, None, // ignored Some(&ShardedStateCache::default()) // ignored ) diff --git a/storage/aptosdb/src/db/include/aptosdb_reader.rs b/storage/aptosdb/src/db/include/aptosdb_reader.rs index 6770f114b379e..4ef1dcab25c9d 100644 --- a/storage/aptosdb/src/db/include/aptosdb_reader.rs +++ b/storage/aptosdb/src/db/include/aptosdb_reader.rs @@ -542,7 +542,7 @@ impl DbReader for AptosDB { let transaction_accumulator = Arc::new(InMemoryAccumulator::new(frozen_subtrees, num_txns)?); let executed_trees = ExecutedTrees::new( - buffered_state.current_state().clone(), + Arc::new(buffered_state.current_state().clone()), transaction_accumulator, ); Ok(executed_trees) diff --git a/storage/aptosdb/src/db/include/aptosdb_testonly.rs b/storage/aptosdb/src/db/include/aptosdb_testonly.rs index bf8ca7580850a..73d5e10d13f5b 100644 --- a/storage/aptosdb/src/db/include/aptosdb_testonly.rs +++ b/storage/aptosdb/src/db/include/aptosdb_testonly.rs @@ -136,22 +136,25 @@ impl AptosDB { base_state_version: Option, ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>, sync_commit: bool, - latest_in_memory_state: StateDelta, + latest_in_memory_state: &StateDelta, ) -> Result<()> { let state_updates_until_last_checkpoint = gather_state_updates_until_last_checkpoint( first_version, - &latest_in_memory_state, + latest_in_memory_state, txns_to_commit, ); - self.save_transactions( - txns_to_commit, + let chunk = ChunkToCommit { first_version, base_state_version, + txns_to_commit, + latest_in_memory_state, + state_updates_until_last_checkpoint: state_updates_until_last_checkpoint.as_ref(), + sharded_state_cache: None, + }; + self.save_transactions( + chunk, ledger_info_with_sigs, sync_commit, - latest_in_memory_state, - state_updates_until_last_checkpoint.as_ref(), - None, ) } } diff --git a/storage/aptosdb/src/db/include/aptosdb_writer.rs b/storage/aptosdb/src/db/include/aptosdb_writer.rs index f38a62c65e1ab..dadffda470ce0 100644 --- a/storage/aptosdb/src/db/include/aptosdb_writer.rs +++ b/storage/aptosdb/src/db/include/aptosdb_writer.rs @@ -2,17 +2,13 @@ // SPDX-License-Identifier: Apache-2.0 use itertools::Itertools; +use aptos_storage_interface::chunk_to_commit::ChunkToCommit; impl DbWriter for AptosDB { fn pre_commit_ledger( &self, - txns_to_commit: &[TransactionToCommit], - first_version: Version, - base_state_version: Option, + chunk: ChunkToCommit, sync_commit: bool, - latest_in_memory_state: StateDelta, - state_updates_until_last_checkpoint: Option<&ShardedStateUpdates>, - sharded_state_cache: Option<&ShardedStateCache>, ) -> Result<()> { gauged_api("pre_commit_ledger", || { // Pre-committing and committing in concurrency is allowed but not pre-committing at the @@ -25,13 +21,22 @@ impl DbWriter for AptosDB { .expect("Concurrent committing detected."); let _timer = OTHER_TIMERS_SECONDS.timer_with(&["pre_commit_ledger"]); + let ChunkToCommit { + txns_to_commit, + first_version, + base_state_version, + state_updates_until_last_checkpoint, + latest_in_memory_state, + sharded_state_cache, + } = chunk; + latest_in_memory_state.current.log_generation("db_save"); self.pre_commit_validation( txns_to_commit, first_version, base_state_version, - &latest_in_memory_state, + latest_in_memory_state, )?; let last_version = first_version + txns_to_commit.len() as u64 - 1; diff --git a/storage/aptosdb/src/db/test_helper.rs b/storage/aptosdb/src/db/test_helper.rs index 59b1634da3f7c..058447ad5209c 100644 --- a/storage/aptosdb/src/db/test_helper.rs +++ b/storage/aptosdb/src/db/test_helper.rs @@ -19,7 +19,9 @@ use aptos_executor_types::ProofReader; use aptos_jellyfish_merkle::node_type::{Node, NodeKey}; #[cfg(test)] use aptos_schemadb::SchemaBatch; -use aptos_storage_interface::{state_delta::StateDelta, DbReader, DbWriter, Order, Result}; +use aptos_storage_interface::{ + chunk_to_commit::ChunkToCommit, state_delta::StateDelta, DbReader, DbWriter, Order, Result, +}; use aptos_temppath::TempPath; #[cfg(test)] use aptos_types::state_store::state_storage_usage::StateStorageUsage; @@ -384,7 +386,7 @@ pub fn test_save_blocks_impl( cur_ver.checked_sub(1), /* base_state_version */ Some(ledger_info_with_sigs), false, /* sync_commit */ - in_memory_state.clone(), + &in_memory_state, ) .unwrap(); @@ -969,7 +971,7 @@ pub fn put_as_state_root(db: &AptosDB, version: Version, key: StateKey, value: S db.state_store .buffered_state() .lock() - .update(None, in_memory_state, true) + .update(None, &in_memory_state, true) .unwrap(); } @@ -999,36 +1001,39 @@ pub fn test_sync_transactions_impl( if batch1_len > 0 { let txns_to_commit_batch = &txns_to_commit[..batch1_len]; update_in_memory_state(&mut in_memory_state, txns_to_commit_batch); - db.save_transactions( + let state_updates = gather_state_updates_until_last_checkpoint( + cur_ver, + &in_memory_state, txns_to_commit_batch, - cur_ver, /* first_version */ + ); + let chunk = ChunkToCommit { + first_version: cur_ver, base_state_version, - None, - false, /* sync_commit */ - in_memory_state.clone(), - gather_state_updates_until_last_checkpoint( - cur_ver, - &in_memory_state, - txns_to_commit_batch, - ) - .as_ref(), - None, - ) - .unwrap(); + txns_to_commit: txns_to_commit_batch, + latest_in_memory_state: &in_memory_state, + state_updates_until_last_checkpoint: state_updates.as_ref(), + sharded_state_cache: None, + }; + db.save_transactions(chunk, None, false /* sync_commit */) + .unwrap(); } let ver = cur_ver + batch1_len as Version; let txns_to_commit_batch = &txns_to_commit[batch1_len..]; update_in_memory_state(&mut in_memory_state, txns_to_commit_batch); - db.save_transactions( - txns_to_commit_batch, - ver, + let state_updates = + gather_state_updates_until_last_checkpoint(ver, &in_memory_state, txns_to_commit_batch); + let chunk = ChunkToCommit { + first_version: ver, base_state_version, + txns_to_commit: txns_to_commit_batch, + latest_in_memory_state: &in_memory_state, + state_updates_until_last_checkpoint: state_updates.as_ref(), + sharded_state_cache: None, + }; + db.save_transactions( + chunk, Some(ledger_info_with_sigs), false, /* sync_commit */ - in_memory_state.clone(), - gather_state_updates_until_last_checkpoint(ver, &in_memory_state, txns_to_commit_batch) - .as_ref(), - None, ) .unwrap(); diff --git a/storage/aptosdb/src/db_debugger/truncate/mod.rs b/storage/aptosdb/src/db_debugger/truncate/mod.rs index 8a2960f660f18..6d30148842e0d 100644 --- a/storage/aptosdb/src/db_debugger/truncate/mod.rs +++ b/storage/aptosdb/src/db_debugger/truncate/mod.rs @@ -269,7 +269,7 @@ mod test { version.checked_sub(1), Some(ledger_info_with_sigs), true, - in_memory_state.clone() + &in_memory_state, ) .unwrap(); version += txns_to_commit.len() as u64; diff --git a/storage/aptosdb/src/fast_sync_storage_wrapper.rs b/storage/aptosdb/src/fast_sync_storage_wrapper.rs index 4b1fac3e7e284..8530b34586a0b 100644 --- a/storage/aptosdb/src/fast_sync_storage_wrapper.rs +++ b/storage/aptosdb/src/fast_sync_storage_wrapper.rs @@ -8,12 +8,11 @@ use aptos_crypto::HashValue; use aptos_db_indexer::db_indexer::InternalIndexerDB; use aptos_infallible::RwLock; use aptos_storage_interface::{ - cached_state_view::ShardedStateCache, state_delta::StateDelta, DbReader, DbWriter, Result, - StateSnapshotReceiver, + chunk_to_commit::ChunkToCommit, DbReader, DbWriter, Result, StateSnapshotReceiver, }; use aptos_types::{ ledger_info::LedgerInfoWithSignatures, - state_store::{state_key::StateKey, state_value::StateValue, ShardedStateUpdates}, + state_store::{state_key::StateKey, state_value::StateValue}, transaction::{TransactionOutputListWithProof, TransactionToCommit, Version}, }; use either::Either; @@ -169,25 +168,9 @@ impl DbWriter for FastSyncStorageWrapper { Ok(()) } - fn pre_commit_ledger( - &self, - txns_to_commit: &[TransactionToCommit], - first_version: Version, - base_state_version: Option, - sync_commit: bool, - latest_in_memory_state: StateDelta, - state_updates_until_last_checkpoint: Option<&ShardedStateUpdates>, - sharded_state_cache: Option<&ShardedStateCache>, - ) -> Result<()> { - self.get_aptos_db_write_ref().pre_commit_ledger( - txns_to_commit, - first_version, - base_state_version, - sync_commit, - latest_in_memory_state, - state_updates_until_last_checkpoint, - sharded_state_cache, - ) + fn pre_commit_ledger(&self, chunk: ChunkToCommit, sync_commit: bool) -> Result<()> { + self.get_aptos_db_write_ref() + .pre_commit_ledger(chunk, sync_commit) } fn commit_ledger( diff --git a/storage/aptosdb/src/state_store/buffered_state.rs b/storage/aptosdb/src/state_store/buffered_state.rs index 253343be00c6a..2050af1e1de3a 100644 --- a/storage/aptosdb/src/state_store/buffered_state.rs +++ b/storage/aptosdb/src/state_store/buffered_state.rs @@ -155,7 +155,7 @@ impl BufferedState { pub fn update( &mut self, updates_until_next_checkpoint_since_current_option: Option<&ShardedStateUpdates>, - new_state_after_checkpoint: StateDelta, + new_state_after_checkpoint: &StateDelta, sync_commit: bool, ) -> Result<()> { assert!(new_state_after_checkpoint @@ -180,7 +180,7 @@ impl BufferedState { self.state_after_checkpoint.current_version = new_state_after_checkpoint.base_version; let state_after_checkpoint = self .state_after_checkpoint - .replace_with(new_state_after_checkpoint); + .replace_with(new_state_after_checkpoint.clone()); if let Some(ref mut delta) = self.state_until_checkpoint { delta.merge(state_after_checkpoint); } else { @@ -191,7 +191,7 @@ impl BufferedState { new_state_after_checkpoint.base_version == self.state_after_checkpoint.base_version, "Diff between base and latest checkpoints not provided.", ); - self.state_after_checkpoint = new_state_after_checkpoint; + self.state_after_checkpoint = new_state_after_checkpoint.clone(); } self.maybe_commit(sync_commit); self.report_latest_committed_version(); diff --git a/storage/aptosdb/src/state_store/mod.rs b/storage/aptosdb/src/state_store/mod.rs index 3902125a08ae1..6196ad2e73e5d 100644 --- a/storage/aptosdb/src/state_store/mod.rs +++ b/storage/aptosdb/src/state_store/mod.rs @@ -542,7 +542,7 @@ impl StateStore { // synchronously commit the snapshot at the last checkpoint here if not committed to disk yet. buffered_state.update( updates_until_last_checkpoint.as_ref(), - state_after_last_checkpoint, + &state_after_last_checkpoint, true, /* sync_commit */ )?; } diff --git a/storage/backup/backup-cli/src/utils/test_utils.rs b/storage/backup/backup-cli/src/utils/test_utils.rs index 53d348460b55a..1b26911098081 100644 --- a/storage/backup/backup-cli/src/utils/test_utils.rs +++ b/storage/backup/backup-cli/src/utils/test_utils.rs @@ -45,7 +45,7 @@ pub fn tmp_db_with_random_content() -> ( cur_ver.checked_sub(1), Some(ledger_info_with_sigs), true, /* sync_commit */ - in_memory_state.clone(), + &in_memory_state, ) .unwrap(); cur_ver += txns_to_commit.len() as u64; diff --git a/storage/storage-interface/Cargo.toml b/storage/storage-interface/Cargo.toml index 6b2cb638242b1..d3bdb9fcbcae1 100644 --- a/storage/storage-interface/Cargo.toml +++ b/storage/storage-interface/Cargo.toml @@ -15,6 +15,7 @@ rust-version = { workspace = true } [dependencies] anyhow = { workspace = true } aptos-crypto = { workspace = true } +aptos-drop-helper = { workspace = true } aptos-experimental-runtimes = { workspace = true } aptos-logger = { workspace = true } aptos-metrics-core = { workspace = true } diff --git a/storage/storage-interface/src/chunk_to_commit.rs b/storage/storage-interface/src/chunk_to_commit.rs new file mode 100644 index 0000000000000..ef1bd38a81771 --- /dev/null +++ b/storage/storage-interface/src/chunk_to_commit.rs @@ -0,0 +1,18 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{cached_state_view::ShardedStateCache, state_delta::StateDelta}; +use aptos_types::{ + state_store::ShardedStateUpdates, + transaction::{TransactionToCommit, Version}, +}; + +#[derive(Copy, Clone)] +pub struct ChunkToCommit<'a> { + pub first_version: Version, + pub base_state_version: Option, + pub txns_to_commit: &'a [TransactionToCommit], + pub latest_in_memory_state: &'a StateDelta, + pub state_updates_until_last_checkpoint: Option<&'a ShardedStateUpdates>, + pub sharded_state_cache: Option<&'a ShardedStateCache>, +} diff --git a/storage/storage-interface/src/executed_trees.rs b/storage/storage-interface/src/executed_trees.rs index ca08758d5c92c..58c6194fa8668 100644 --- a/storage/storage-interface/src/executed_trees.rs +++ b/storage/storage-interface/src/executed_trees.rs @@ -20,7 +20,7 @@ type Result = std::result::Result; #[derive(Clone, Debug)] pub struct ExecutedTrees { /// The in-memory representation of state after execution. - pub state: StateDelta, + pub state: Arc, /// The in-memory Merkle Accumulator representing a blockchain state consistent with the /// `state_tree`. @@ -28,7 +28,7 @@ pub struct ExecutedTrees { } impl ExecutedTrees { - pub fn state(&self) -> &StateDelta { + pub fn state(&self) -> &Arc { &self.state } @@ -49,7 +49,7 @@ impl ExecutedTrees { } pub fn new( - state: StateDelta, + state: Arc, transaction_accumulator: Arc, ) -> Self { assert_eq!( @@ -68,11 +68,11 @@ impl ExecutedTrees { frozen_subtrees_in_accumulator: Vec, num_leaves_in_accumulator: u64, ) -> Self { - let state = StateDelta::new_at_checkpoint( + let state = Arc::new(StateDelta::new_at_checkpoint( state_root_hash, state_usage, num_leaves_in_accumulator.checked_sub(1), - ); + )); let transaction_accumulator = Arc::new( InMemoryAccumulator::new(frozen_subtrees_in_accumulator, num_leaves_in_accumulator) .expect("The startup info read from storage should be valid."), @@ -83,7 +83,7 @@ impl ExecutedTrees { pub fn new_empty() -> Self { Self::new( - StateDelta::new_empty(), + Arc::new(StateDelta::new_empty()), Arc::new(InMemoryAccumulator::new_empty()), ) } diff --git a/storage/storage-interface/src/lib.rs b/storage/storage-interface/src/lib.rs index 8a357eda90116..e2fd5d8abab02 100644 --- a/storage/storage-interface/src/lib.rs +++ b/storage/storage-interface/src/lib.rs @@ -2,7 +2,6 @@ // Parts of the project are originally copyright © Meta Platforms, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::cached_state_view::ShardedStateCache; use aptos_crypto::{hash::CryptoHash, HashValue}; pub use aptos_types::indexer::indexer_db_reader::Order; use aptos_types::{ @@ -23,7 +22,6 @@ use aptos_types::{ state_storage_usage::StateStorageUsage, state_value::{StateValue, StateValueChunkWithProof}, table::{TableHandle, TableInfo}, - ShardedStateUpdates, }, transaction::{ AccountTransactionsWithProof, Transaction, TransactionAuxiliaryData, TransactionInfo, @@ -39,6 +37,7 @@ use thiserror::Error; pub mod async_proof_fetcher; pub mod block_info; pub mod cached_state_view; +pub mod chunk_to_commit; pub mod errors; mod executed_trees; mod metrics; @@ -47,7 +46,7 @@ pub mod mock; pub mod state_delta; pub mod state_view; -use crate::state_delta::StateDelta; +use crate::chunk_to_commit::ChunkToCommit; use aptos_scratchpad::SparseMerkleTree; pub use aptos_types::block_info::BlockHeight; use aptos_types::state_store::state_key::prefix::StateKeyPrefix; @@ -542,41 +541,28 @@ pub trait DbWriter: Send + Sync { /// Persist transactions. Called by state sync to save verified transactions to the DB. fn save_transactions( &self, - txns_to_commit: &[TransactionToCommit], - first_version: Version, - base_state_version: Option, + chunk: ChunkToCommit, ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>, sync_commit: bool, - latest_in_memory_state: StateDelta, - state_updates_until_last_checkpoint: Option<&ShardedStateUpdates>, - sharded_state_cache: Option<&ShardedStateCache>, ) -> Result<()> { // For reconfig suffix. - if ledger_info_with_sigs.is_none() && txns_to_commit.is_empty() { + if ledger_info_with_sigs.is_none() && chunk.txns_to_commit.is_empty() { return Ok(()); } - if !txns_to_commit.is_empty() { - self.pre_commit_ledger( - txns_to_commit, - first_version, - base_state_version, - sync_commit, - latest_in_memory_state, - state_updates_until_last_checkpoint, - sharded_state_cache, - )?; + if !chunk.txns_to_commit.is_empty() { + self.pre_commit_ledger(chunk, sync_commit)?; } let version_to_commit = if let Some(ledger_info_with_sigs) = ledger_info_with_sigs { ledger_info_with_sigs.ledger_info().version() } else { // here txns_to_commit is known to be non-empty - first_version + txns_to_commit.len() as u64 - 1 + chunk.first_version + chunk.txns_to_commit.len() as u64 - 1 }; self.commit_ledger( version_to_commit, ledger_info_with_sigs, - Some(txns_to_commit), + Some(chunk.txns_to_commit), ) } @@ -589,16 +575,7 @@ pub trait DbWriter: Send + Sync { /// called with a `LedgerInfoWithSignatures`. /// If not, the consensus needs to panic, resulting in a reboot of the node where the DB will /// truncate the unconfirmed data. - fn pre_commit_ledger( - &self, - txns_to_commit: &[TransactionToCommit], - first_version: Version, - base_state_version: Option, - sync_commit: bool, - latest_in_memory_state: StateDelta, - state_updates_until_last_checkpoint: Option<&ShardedStateUpdates>, - sharded_state_cache: Option<&ShardedStateCache>, - ) -> Result<()> { + fn pre_commit_ledger(&self, chunk: ChunkToCommit, sync_commit: bool) -> Result<()> { unimplemented!() } diff --git a/storage/storage-interface/src/state_delta.rs b/storage/storage-interface/src/state_delta.rs index bbe4016a86b15..ea93d8afa910d 100644 --- a/storage/storage-interface/src/state_delta.rs +++ b/storage/storage-interface/src/state_delta.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use aptos_crypto::HashValue; +use aptos_drop_helper::DropHelper; use aptos_scratchpad::SparseMerkleTree; use aptos_types::{ state_store::{ @@ -24,7 +25,7 @@ pub struct StateDelta { pub base_version: Option, pub current: SparseMerkleTree, pub current_version: Option, - pub updates_since_base: ShardedStateUpdates, + pub updates_since_base: DropHelper, } impl StateDelta { @@ -42,7 +43,7 @@ impl StateDelta { base_version, current, current_version, - updates_since_base, + updates_since_base: DropHelper::new(updates_since_base), } }