From 1a0bb2d5639109b65a80b7c1955345b6f58b43c0 Mon Sep 17 00:00:00 2001 From: aldenhu Date: Fri, 18 Oct 2024 22:10:28 +0000 Subject: [PATCH] Makes StateComputeResult an combination of immutable stage outputs --- Cargo.lock | 2 + .../consensus-types/src/pipelined_block.rs | 2 +- consensus/src/block_storage/block_store.rs | 2 +- .../src/block_storage/block_store_test.rs | 4 +- consensus/src/counters.rs | 2 +- consensus/src/pipeline/tests/test_utils.rs | 2 +- consensus/src/state_computer.rs | 7 +- .../src/ledger_update_stage.rs | 35 +- execution/executor-benchmark/src/lib.rs | 12 +- .../executor-benchmark/src/native_executor.rs | 208 +++++----- execution/executor-benchmark/src/pipeline.rs | 41 +- .../src/transaction_committer.rs | 29 +- .../src/transaction_executor.rs | 26 +- execution/executor-test-helpers/src/lib.rs | 2 +- execution/executor-types/Cargo.toml | 1 + .../executor-types/src/execution_output.rs | 221 ++++++++++- .../src/ledger_update_output.rs | 125 ++---- execution/executor-types/src/lib.rs | 18 +- .../src/parsed_transaction_output.rs | 21 +- .../src/state_checkpoint_output.rs | 181 +++------ .../src/state_compute_result.rs | 119 +++--- .../src/block_executor/block_tree/mod.rs | 33 +- .../src/block_executor/block_tree/test.rs | 3 +- execution/executor/src/block_executor/mod.rs | 88 ++--- .../src/chunk_executor/chunk_commit_queue.rs | 5 +- .../chunk_executor/chunk_result_verifier.rs | 8 - execution/executor/src/chunk_executor/mod.rs | 58 +-- .../src/chunk_executor/transaction_chunk.rs | 1 + execution/executor/src/db_bootstrapper/mod.rs | 21 +- execution/executor/src/fuzzing.rs | 2 + execution/executor/src/tests/mock_vm/mod.rs | 4 +- execution/executor/src/tests/mod.rs | 11 +- .../types/in_memory_state_calculator_v2.rs | 195 +++------- .../src/types/partial_state_compute_result.rs | 102 ++--- .../src/workflow/do_get_execution_output.rs | 356 ++++++++++++++++-- .../executor/src/workflow/do_ledger_update.rs | 162 ++------ .../src/workflow/do_state_checkpoint.rs | 219 +---------- execution/executor/src/workflow/mod.rs | 26 +- .../executor/tests/db_bootstrapper_test.rs | 2 +- .../execution/ptx-executor/Cargo.toml | 1 + .../execution/ptx-executor/src/lib.rs | 3 + .../src/db/include/aptosdb_testonly.rs | 8 +- .../aptosdb/src/db/include/aptosdb_writer.rs | 4 +- storage/aptosdb/src/db/mod.rs | 8 +- storage/aptosdb/src/state_store/mod.rs | 14 +- .../src/cached_state_view.rs | 25 ++ .../storage-interface/src/chunk_to_commit.rs | 5 +- storage/storage-interface/src/lib.rs | 2 +- types/src/transaction/mod.rs | 17 + 49 files changed, 1234 insertions(+), 1209 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 33a3c058497a8..698eb805db182 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1558,6 +1558,7 @@ dependencies = [ "criterion", "derive_more", "itertools 0.13.0", + "once_cell", "serde", "thiserror", ] @@ -1602,6 +1603,7 @@ name = "aptos-experimental-ptx-executor" version = "0.1.0" dependencies = [ "anyhow", + "aptos-crypto", "aptos-executor", "aptos-executor-types", "aptos-experimental-runtimes", diff --git a/consensus/consensus-types/src/pipelined_block.rs b/consensus/consensus-types/src/pipelined_block.rs index eb137936696f6..d080279e6aa6a 100644 --- a/consensus/consensus-types/src/pipelined_block.rs +++ b/consensus/consensus-types/src/pipelined_block.rs @@ -288,7 +288,7 @@ impl PipelinedBlock { pub fn block_info(&self) -> BlockInfo { self.block().gen_block_info( self.compute_result().root_hash(), - self.compute_result().version(), + self.compute_result().last_version_or_0(), self.compute_result().epoch_state().clone(), ) } diff --git a/consensus/src/block_storage/block_store.rs b/consensus/src/block_storage/block_store.rs index 3822a3dfe9ba3..aee68d4985d83 100644 --- a/consensus/src/block_storage/block_store.rs +++ b/consensus/src/block_storage/block_store.rs @@ -177,7 +177,7 @@ impl BlockStore { root_metadata.accu_hash, ); - let result = StateComputeResult::new_empty(Arc::new( + let result = StateComputeResult::new_dummy_with_accumulator(Arc::new( InMemoryTransactionAccumulator::new( root_metadata.frozen_root_hashes, root_metadata.num_leaves, diff --git a/consensus/src/block_storage/block_store_test.rs b/consensus/src/block_storage/block_store_test.rs index 41def8f1c322d..3d17decfacec4 100644 --- a/consensus/src/block_storage/block_store_test.rs +++ b/consensus/src/block_storage/block_store_test.rs @@ -290,7 +290,7 @@ async fn test_insert_vote() { VoteData::new( block.block().gen_block_info( block.compute_result().root_hash(), - block.compute_result().version(), + block.compute_result().last_version_or_0(), block.compute_result().epoch_state().clone(), ), block.quorum_cert().certified_block().clone(), @@ -319,7 +319,7 @@ async fn test_insert_vote() { VoteData::new( block.block().gen_block_info( block.compute_result().root_hash(), - block.compute_result().version(), + block.compute_result().last_version_or_0(), block.compute_result().epoch_state().clone(), ), block.quorum_cert().certified_block().clone(), diff --git a/consensus/src/counters.rs b/consensus/src/counters.rs index 277fe38ed2866..4086c4352adb0 100644 --- a/consensus/src/counters.rs +++ b/consensus/src/counters.rs @@ -1249,7 +1249,7 @@ pub fn update_counters_for_committed_blocks(blocks_to_commit: &[Arc { executor: Arc>, commit_processing: CommitProcessing, - version: Version, + allow_aborts: bool, + allow_discards: bool, + allow_retries: bool, } impl LedgerUpdateStage @@ -28,44 +29,54 @@ where pub fn new( executor: Arc>, commit_processing: CommitProcessing, - version: Version, + allow_aborts: bool, + allow_discards: bool, + allow_retries: bool, ) -> Self { Self { executor, - version, commit_processing, + allow_aborts, + allow_discards, + allow_retries, } } pub fn ledger_update(&mut self, ledger_update_message: LedgerUpdateMessage) { // let ledger_update_start_time = Instant::now(); let LedgerUpdateMessage { + first_block_start_time, current_block_start_time, - execution_time, partition_time, + execution_time, block_id, parent_block_id, + num_input_txns, state_checkpoint_output, - first_block_start_time, } = ledger_update_message; let output = self .executor - .ledger_update(block_id, parent_block_id, state_checkpoint_output) + .ledger_update(block_id, parent_block_id, state_checkpoint_output.clone()) .unwrap(); - - self.version += output.transactions_to_commit_len() as Version; + output.execution_output.check_aborts_discards_retries( + self.allow_aborts, + self.allow_discards, + self.allow_retries, + ); + if !self.allow_retries { + assert_eq!(output.num_transactions_to_commit(), num_input_txns + 1); + } match &self.commit_processing { CommitProcessing::SendToQueue(commit_sender) => { let msg = CommitBlockMessage { block_id, - root_hash: output.root_hash(), first_block_start_time, current_block_start_time, partition_time, execution_time, - num_txns: output.transactions_to_commit_len(), + output, }; commit_sender.send(msg).unwrap(); }, @@ -73,7 +84,7 @@ where let ledger_info_with_sigs = super::transaction_committer::gen_li_with_sigs( block_id, output.root_hash(), - self.version, + output.expect_last_version(), ); self.executor.pre_commit_block(block_id).unwrap(); self.executor.commit_ledger(ledger_info_with_sigs).unwrap(); diff --git a/execution/executor-benchmark/src/lib.rs b/execution/executor-benchmark/src/lib.rs index d34ef9a7c7041..09f2a5b91558b 100644 --- a/execution/executor-benchmark/src/lib.rs +++ b/execution/executor-benchmark/src/lib.rs @@ -165,10 +165,9 @@ pub fn run_benchmark( ((0..pipeline_config.num_generator_workers).map(|_| transaction_generator_creator.create_transaction_generator()).collect::>(), phase) }); - let version = db.reader.expect_synced_version(); - + let start_version = db.reader.expect_synced_version(); let (pipeline, block_sender) = - Pipeline::new(executor, version, &pipeline_config, Some(num_blocks)); + Pipeline::new(executor, start_version, &pipeline_config, Some(num_blocks)); let mut num_accounts_to_load = num_main_signer_accounts; if let Some(mix) = &transaction_mix { @@ -235,7 +234,8 @@ pub fn run_benchmark( ); if !pipeline_config.skip_commit { - let num_txns = db.reader.expect_synced_version() - version - num_blocks_created as u64; + let num_txns = + db.reader.expect_synced_version() - start_version - num_blocks_created as u64; overall_measuring.print_end("Overall", num_txns); if verify_sequence_numbers { @@ -259,10 +259,10 @@ fn init_workload( where V: TransactionBlockExecutor + 'static, { - let version = db.reader.expect_synced_version(); + let start_version = db.reader.expect_synced_version(); let (pipeline, block_sender) = Pipeline::::new( BlockExecutor::new(db.clone()), - version, + start_version, pipeline_config, None, ); diff --git a/execution/executor-benchmark/src/native_executor.rs b/execution/executor-benchmark/src/native_executor.rs index 2eea6d915e11a..00b725936e1be 100644 --- a/execution/executor-benchmark/src/native_executor.rs +++ b/execution/executor-benchmark/src/native_executor.rs @@ -6,31 +6,43 @@ use crate::{ metrics::TIMER, }; use anyhow::Result; -use aptos_executor::block_executor::TransactionBlockExecutor; +use aptos_crypto::HashValue; +use aptos_executor::{ + block_executor::TransactionBlockExecutor, + workflow::do_get_execution_output::DoGetExecutionOutput, +}; use aptos_executor_types::execution_output::ExecutionOutput; use aptos_storage_interface::cached_state_view::CachedStateView; use aptos_types::{ account_address::AccountAddress, account_config::{deposit::DepositEvent, withdraw::WithdrawEvent}, - block_executor::{config::BlockExecutorConfigFromOnchain, partitioner::ExecutableTransactions}, + block_executor::{ + config::BlockExecutorConfigFromOnchain, + partitioner::{ExecutableTransactions, PartitionedTransactions}, + }, contract_event::ContractEvent, event::EventKey, - state_store::state_key::StateKey, + state_store::{state_key::StateKey, StateView}, transaction::{ - ExecutionStatus, Transaction, TransactionAuxiliaryData, TransactionOutput, - TransactionStatus, + signature_verified_transaction::SignatureVerifiedTransaction, BlockOutput, ExecutionStatus, + Transaction, TransactionAuxiliaryData, TransactionOutput, TransactionStatus, }, vm_status::AbortLocation, write_set::{WriteOp, WriteSet, WriteSetMut}, }; +use aptos_vm::{ + sharded_block_executor::{executor_client::ExecutorClient, ShardedBlockExecutor}, + VMExecutor, +}; use move_core_types::{ ident_str, language_storage::{ModuleId, TypeTag}, move_resource::MoveStructType, + vm_status::{StatusCode, VMStatus}, }; use once_cell::sync::{Lazy, OnceCell}; use rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}; -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; struct IncrementalOutput { write_set: Vec<(StateKey, WriteOp)>, @@ -92,7 +104,7 @@ impl NativeExecutor { fn withdraw_from_signer( sender_address: AccountAddress, transfer_amount: u64, - state_view: &CachedStateView, + state_view: &impl StateView, ) -> Result> { let sender_account_key = DbAccessUtil::new_state_key_account(sender_address); let mut sender_account = { @@ -151,7 +163,7 @@ impl NativeExecutor { fn deposit( recipient_address: AccountAddress, transfer_amount: u64, - state_view: &CachedStateView, + state_view: &impl StateView, fail_on_existing: bool, fail_on_missing: bool, ) -> Result> { @@ -247,7 +259,7 @@ impl NativeExecutor { sender_address: AccountAddress, recipient_address: AccountAddress, transfer_amount: u64, - state_view: &CachedStateView, + state_view: &impl StateView, fail_on_existing: bool, fail_on_missing: bool, ) -> Result { @@ -282,7 +294,7 @@ impl NativeExecutor { sender_address: AccountAddress, recipient_addresses: Vec, transfer_amounts: Vec, - state_view: &CachedStateView, + state_view: &impl StateView, fail_on_existing: bool, fail_on_missing: bool, ) -> Result { @@ -345,87 +357,109 @@ impl NativeExecutor { } } -impl TransactionBlockExecutor for NativeExecutor { - fn execute_transaction_block( - transactions: ExecutableTransactions, - state_view: CachedStateView, +impl VMExecutor for NativeExecutor { + fn execute_block( + transactions: &[SignatureVerifiedTransaction], + state_view: &(impl StateView + Sync), _onchain_config: BlockExecutorConfigFromOnchain, - ) -> Result { - let transactions = match transactions { - ExecutableTransactions::Unsharded(txns) => txns, - _ => todo!("sharded execution not yet supported"), - }; - let transaction_outputs = NATIVE_EXECUTOR_POOL.install(|| { - transactions - .par_iter() - .map(|txn| match &txn.expect_valid() { - Transaction::StateCheckpoint(_) => Self::handle_state_checkpoint(), - Transaction::UserTransaction(user_txn) => match user_txn.payload() { - aptos_types::transaction::TransactionPayload::EntryFunction(f) => { - match ( - *f.module().address(), - f.module().name().as_str(), - f.function().as_str(), - ) { - (AccountAddress::ONE, "coin", "transfer") => { - Self::handle_account_creation_and_transfer( - user_txn.sender(), - bcs::from_bytes(&f.args()[0]).unwrap(), - bcs::from_bytes(&f.args()[1]).unwrap(), - &state_view, - false, - true, - ) - }, - (AccountAddress::ONE, "aptos_account", "transfer") => { - Self::handle_account_creation_and_transfer( - user_txn.sender(), - bcs::from_bytes(&f.args()[0]).unwrap(), - bcs::from_bytes(&f.args()[1]).unwrap(), - &state_view, - false, - false, - ) - }, - (AccountAddress::ONE, "aptos_account", "create_account") => { - Self::handle_account_creation_and_transfer( - user_txn.sender(), - bcs::from_bytes(&f.args()[0]).unwrap(), - 0, - &state_view, - true, - false, - ) - }, - (AccountAddress::ONE, "aptos_account", "batch_transfer") => { - Self::handle_batch_account_creation_and_transfer( - user_txn.sender(), - bcs::from_bytes(&f.args()[0]).unwrap(), - bcs::from_bytes(&f.args()[1]).unwrap(), - &state_view, - false, - true, - ) - }, - _ => unimplemented!( - "{} {}::{}", + ) -> Result, VMStatus> { + let transaction_outputs = NATIVE_EXECUTOR_POOL + .install(|| { + transactions + .par_iter() + .map(|txn| match &txn.expect_valid() { + Transaction::StateCheckpoint(_) => Self::handle_state_checkpoint(), + Transaction::UserTransaction(user_txn) => match user_txn.payload() { + aptos_types::transaction::TransactionPayload::EntryFunction(f) => { + match ( *f.module().address(), f.module().name().as_str(), - f.function().as_str() - ), - } + f.function().as_str(), + ) { + (AccountAddress::ONE, "coin", "transfer") => { + Self::handle_account_creation_and_transfer( + user_txn.sender(), + bcs::from_bytes(&f.args()[0]).unwrap(), + bcs::from_bytes(&f.args()[1]).unwrap(), + &state_view, + false, + true, + ) + }, + (AccountAddress::ONE, "aptos_account", "transfer") => { + Self::handle_account_creation_and_transfer( + user_txn.sender(), + bcs::from_bytes(&f.args()[0]).unwrap(), + bcs::from_bytes(&f.args()[1]).unwrap(), + &state_view, + false, + false, + ) + }, + (AccountAddress::ONE, "aptos_account", "create_account") => { + Self::handle_account_creation_and_transfer( + user_txn.sender(), + bcs::from_bytes(&f.args()[0]).unwrap(), + 0, + &state_view, + true, + false, + ) + }, + (AccountAddress::ONE, "aptos_account", "batch_transfer") => { + Self::handle_batch_account_creation_and_transfer( + user_txn.sender(), + bcs::from_bytes(&f.args()[0]).unwrap(), + bcs::from_bytes(&f.args()[1]).unwrap(), + &state_view, + false, + true, + ) + }, + _ => unimplemented!( + "{} {}::{}", + *f.module().address(), + f.module().name().as_str(), + f.function().as_str() + ), + } + }, + _ => unimplemented!(), }, _ => unimplemented!(), - }, - _ => unimplemented!(), - }) - .collect::>>() - })?; - Ok(ExecutionOutput { - transactions: transactions.into_iter().map(|t| t.into_inner()).collect(), - transaction_outputs, - state_cache: state_view.into_state_cache(), - block_end_info: None, - }) + }) + .collect::>>() + }) + .map_err(|err| VMStatus::Error { + status_code: StatusCode::ABORTED, + sub_status: None, + message: Some(err.to_string()), + })?; + Ok(BlockOutput::new(transaction_outputs, None)) + } + + fn execute_block_sharded>( + _sharded_block_executor: &ShardedBlockExecutor, + _transactions: PartitionedTransactions, + _state_view: Arc, + _onchain_config: BlockExecutorConfigFromOnchain, + ) -> std::result::Result, VMStatus> { + unimplemented!() + } +} + +impl TransactionBlockExecutor for NativeExecutor { + fn execute_transaction_block( + transactions: ExecutableTransactions, + state_view: CachedStateView, + onchain_config: BlockExecutorConfigFromOnchain, + append_state_checkpoint_to_block: Option, + ) -> Result { + DoGetExecutionOutput::by_transaction_execution::( + transactions, + state_view, + onchain_config, + append_state_checkpoint_to_block, + ) } } diff --git a/execution/executor-benchmark/src/pipeline.rs b/execution/executor-benchmark/src/pipeline.rs index 8d4efaf380d70..5763c19567c72 100644 --- a/execution/executor-benchmark/src/pipeline.rs +++ b/execution/executor-benchmark/src/pipeline.rs @@ -10,7 +10,10 @@ use crate::{ use aptos_block_partitioner::v2::config::PartitionerV2Config; use aptos_crypto::HashValue; use aptos_executor::block_executor::{BlockExecutor, TransactionBlockExecutor}; -use aptos_executor_types::{state_checkpoint_output::StateCheckpointOutput, BlockExecutorTrait}; +use aptos_executor_types::{ + state_checkpoint_output::StateCheckpointOutput, state_compute_result::StateComputeResult, + BlockExecutorTrait, +}; use aptos_logger::info; use aptos_types::{ block_executor::partitioner::ExecutableBlock, @@ -56,11 +59,11 @@ where { pub fn new( executor: BlockExecutor, - version: Version, + start_version: Version, config: &PipelineConfig, // Need to specify num blocks, to size queues correctly, when delay_execution_start, split_stages or skip_commit are used num_blocks: Option, - ) -> (Self, mpsc::SyncSender>) { + ) -> (Self, SyncSender>) { let parent_block_id = executor.committed_block_id(); let executor_1 = Arc::new(executor); let executor_2 = executor_1.clone(); @@ -113,22 +116,20 @@ where let mut partitioning_stage = BlockPreparationStage::new(num_partitioner_shards, &config.partitioner_config); - let mut exe = TransactionExecutor::new( - executor_1, - parent_block_id, - ledger_update_sender, - config.allow_aborts, - config.allow_discards, - config.allow_retries, - ); + let mut exe = TransactionExecutor::new(executor_1, parent_block_id, ledger_update_sender); let commit_processing = if config.skip_commit { CommitProcessing::Skip } else { CommitProcessing::SendToQueue(commit_sender) }; - let mut ledger_update_stage = - LedgerUpdateStage::new(executor_2, commit_processing, version); + let mut ledger_update_stage = LedgerUpdateStage::new( + executor_2, + commit_processing, + config.allow_aborts, + config.allow_discards, + config.allow_retries, + ); let (executable_block_sender, executable_block_receiver) = mpsc::sync_channel::(3); @@ -210,11 +211,9 @@ where .name("ledger_update".to_string()) .spawn(move || { while let Ok(ledger_update_msg) = ledger_update_receiver.recv() { - let input_block_size = - ledger_update_msg.state_checkpoint_output.input_txns_len(); NUM_TXNS .with_label_values(&["ledger_update"]) - .inc_by(input_block_size as u64); + .inc_by(ledger_update_msg.num_input_txns as u64); ledger_update_stage.ledger_update(ledger_update_msg); } }) @@ -228,7 +227,7 @@ where start_commit_rx.map(|rx| rx.recv()); info!("Starting commit thread"); let mut committer = - TransactionCommitter::new(executor_3, version, commit_receiver); + TransactionCommitter::new(executor_3, start_version, commit_receiver); committer.run(); }) .expect("Failed to spawn transaction committer thread."); @@ -264,22 +263,22 @@ pub struct ExecuteBlockMessage { } pub struct LedgerUpdateMessage { + pub first_block_start_time: Instant, pub current_block_start_time: Instant, pub execution_time: Duration, pub partition_time: Duration, pub block_id: HashValue, pub parent_block_id: HashValue, + pub num_input_txns: usize, pub state_checkpoint_output: StateCheckpointOutput, - pub first_block_start_time: Instant, } /// Message from execution stage to commit stage. pub struct CommitBlockMessage { pub(crate) block_id: HashValue, - pub(crate) root_hash: HashValue, pub(crate) first_block_start_time: Instant, pub(crate) current_block_start_time: Instant, - pub(crate) partition_time: Duration, pub(crate) execution_time: Duration, - pub(crate) num_txns: usize, + pub(crate) partition_time: Duration, + pub(crate) output: StateComputeResult, } diff --git a/execution/executor-benchmark/src/transaction_committer.rs b/execution/executor-benchmark/src/transaction_committer.rs index e3aa70b7c534b..a725bbe3ed15b 100644 --- a/execution/executor-benchmark/src/transaction_committer.rs +++ b/execution/executor-benchmark/src/transaction_committer.rs @@ -46,7 +46,7 @@ pub(crate) fn gen_li_with_sigs( pub struct TransactionCommitter { executor: Arc>, - version: Version, + start_version: Version, block_receiver: mpsc::Receiver, } @@ -56,49 +56,52 @@ where { pub fn new( executor: Arc>, - version: Version, + start_version: Version, block_receiver: mpsc::Receiver, ) -> Self { Self { - version, executor, + start_version, block_receiver, } } pub fn run(&mut self) { - let start_version = self.version; - info!("Start with version: {}", start_version); + info!("Start with version: {}", self.start_version); while let Ok(msg) = self.block_receiver.recv() { let CommitBlockMessage { block_id, - root_hash, first_block_start_time, current_block_start_time, partition_time, execution_time, - num_txns, + output, } = msg; + let root_hash = output + .ledger_update_output + .transaction_accumulator + .root_hash(); + let num_input_txns = output.num_input_transactions(); NUM_TXNS .with_label_values(&["commit"]) - .inc_by(num_txns as u64); + .inc_by(num_input_txns as u64); - self.version += num_txns as u64; + let version = output.expect_last_version(); let commit_start = Instant::now(); - let ledger_info_with_sigs = gen_li_with_sigs(block_id, root_hash, self.version); + let ledger_info_with_sigs = gen_li_with_sigs(block_id, root_hash, version); self.executor.pre_commit_block(block_id).unwrap(); self.executor.commit_ledger(ledger_info_with_sigs).unwrap(); report_block( - start_version, - self.version, + self.start_version, + version, first_block_start_time, current_block_start_time, partition_time, execution_time, Instant::now().duration_since(commit_start), - num_txns, + num_input_txns, ); } } diff --git a/execution/executor-benchmark/src/transaction_executor.rs b/execution/executor-benchmark/src/transaction_executor.rs index 968e3c5a1219c..da6fbbd456387 100644 --- a/execution/executor-benchmark/src/transaction_executor.rs +++ b/execution/executor-benchmark/src/transaction_executor.rs @@ -24,9 +24,6 @@ pub struct TransactionExecutor { parent_block_id: HashValue, maybe_first_block_start_time: Option, ledger_update_sender: mpsc::SyncSender, - allow_aborts: bool, - allow_discards: bool, - allow_retries: bool, } impl TransactionExecutor @@ -37,9 +34,6 @@ where executor: Arc>, parent_block_id: HashValue, ledger_update_sender: mpsc::SyncSender, - allow_aborts: bool, - allow_discards: bool, - allow_retries: bool, ) -> Self { Self { num_blocks_processed: 0, @@ -47,9 +41,6 @@ where parent_block_id, maybe_first_block_start_time: None, ledger_update_sender, - allow_aborts, - allow_discards, - allow_retries, } } @@ -68,8 +59,8 @@ where "In iteration {}, received block {}.", self.num_blocks_processed, block_id ); - let num_txns = executable_block.transactions.num_transactions(); - let output = self + let num_input_txns = executable_block.transactions.num_transactions(); + let state_checkpoint_output = self .executor .execute_and_state_checkpoint( executable_block, @@ -78,16 +69,6 @@ where ) .unwrap(); - assert_eq!(output.input_txns_len(), num_txns); - output.check_aborts_discards_retries( - self.allow_aborts, - self.allow_discards, - self.allow_retries, - ); - if !self.allow_retries { - assert_eq!(output.txns_to_commit_len(), num_txns + 1); - } - let msg = LedgerUpdateMessage { current_block_start_time, first_block_start_time: *self.maybe_first_block_start_time.as_ref().unwrap(), @@ -95,7 +76,8 @@ where execution_time: Instant::now().duration_since(execution_start_time), block_id, parent_block_id: self.parent_block_id, - state_checkpoint_output: output, + num_input_txns, + state_checkpoint_output, }; self.ledger_update_sender.send(msg).unwrap(); self.parent_block_id = block_id; diff --git a/execution/executor-test-helpers/src/lib.rs b/execution/executor-test-helpers/src/lib.rs index b42280ec8d073..34d735fc9893a 100644 --- a/execution/executor-test-helpers/src/lib.rs +++ b/execution/executor-test-helpers/src/lib.rs @@ -50,7 +50,7 @@ pub fn gen_ledger_info_with_sigs( 0, /* round */ commit_block_id, output.root_hash(), - output.version(), + output.expect_last_version(), 0, /* timestamp */ output.epoch_state().clone(), ), diff --git a/execution/executor-types/Cargo.toml b/execution/executor-types/Cargo.toml index 9a449959bd01a..b27f6f09efbdc 100644 --- a/execution/executor-types/Cargo.toml +++ b/execution/executor-types/Cargo.toml @@ -24,6 +24,7 @@ bcs = { workspace = true } criterion = { workspace = true } derive_more = { workspace = true } itertools = { workspace = true } +once_cell = { workspace = true } serde = { workspace = true } thiserror = { workspace = true } diff --git a/execution/executor-types/src/execution_output.rs b/execution/executor-types/src/execution_output.rs index 41b20fd1828b2..eeded76a4aed2 100644 --- a/execution/executor-types/src/execution_output.rs +++ b/execution/executor-types/src/execution_output.rs @@ -4,18 +4,227 @@ #![forbid(unsafe_code)] -use aptos_storage_interface::cached_state_view::StateCache; -use aptos_types::transaction::{block_epilogue::BlockEndInfo, Transaction, TransactionOutput}; +use crate::{parsed_transaction_output::TransactionsWithParsedOutput, ParsedTransactionOutput}; +use aptos_drop_helper::DropHelper; +use aptos_storage_interface::{cached_state_view::StateCache, state_delta::StateDelta}; +use aptos_types::{ + contract_event::ContractEvent, + epoch_state::EpochState, + transaction::{ + block_epilogue::BlockEndInfo, ExecutionStatus, Transaction, TransactionOutput, + TransactionStatus, Version, + }, +}; +use derive_more::Deref; +use std::sync::Arc; +#[derive(Clone, Debug, Deref)] pub struct ExecutionOutput { - /// Input transactions. - pub transactions: Vec, - /// Raw VM output. - pub transaction_outputs: Vec, + #[deref] + inner: Arc>, +} + +impl ExecutionOutput { + pub fn new( + is_block: bool, + first_version: Version, + statuses_for_input_txns: Vec, + to_commit: TransactionsWithParsedOutput, + to_discard: TransactionsWithParsedOutput, + to_retry: TransactionsWithParsedOutput, + state_cache: StateCache, + block_end_info: Option, + next_epoch_state: Option, + subscribable_events: Vec, + ) -> Self { + if is_block { + // If it's a block, ensure it ends with state checkpoint. + assert!( + next_epoch_state.is_some() + || to_commit.is_empty() // reconfig suffix + || to_commit.transactions.last().unwrap().is_non_reconfig_block_ending() + ); + } else { + // If it's not, there shouldn't be any transaction to be discarded or retried. + assert!(to_discard.is_empty() && to_retry.is_empty()); + } + + Self::new_impl(Inner { + is_block, + first_version, + statuses_for_input_txns, + to_commit, + to_discard, + to_retry, + state_cache, + block_end_info, + next_epoch_state, + subscribable_events, + }) + } + + pub fn new_empty(state: Arc) -> Self { + Self::new_impl(Inner { + is_block: false, + first_version: state.next_version(), + statuses_for_input_txns: vec![], + to_commit: TransactionsWithParsedOutput::new_empty(), + to_discard: TransactionsWithParsedOutput::new_empty(), + to_retry: TransactionsWithParsedOutput::new_empty(), + state_cache: StateCache::new_empty(state.current.clone()), + block_end_info: None, + next_epoch_state: None, + subscribable_events: vec![], + }) + } + + pub fn new_dummy_with_input_txns(txns: Vec) -> Self { + let num_txns = txns.len(); + let success_status = TransactionStatus::Keep(ExecutionStatus::Success); + let success_output = ParsedTransactionOutput::from(TransactionOutput::new_empty_success()); + Self::new_impl(Inner { + is_block: false, + first_version: 0, + statuses_for_input_txns: vec![success_status; num_txns], + to_commit: TransactionsWithParsedOutput::new(txns, vec![success_output; num_txns]), + to_discard: TransactionsWithParsedOutput::new_empty(), + to_retry: TransactionsWithParsedOutput::new_empty(), + state_cache: StateCache::new_dummy(), + block_end_info: None, + next_epoch_state: None, + subscribable_events: vec![], + }) + } + + pub fn new_dummy() -> Self { + Self::new_dummy_with_input_txns(vec![]) + } + + pub fn reconfig_suffix(&self) -> Self { + Self::new_impl(Inner { + is_block: false, + first_version: self.next_version(), + statuses_for_input_txns: vec![], + to_commit: TransactionsWithParsedOutput::new_empty(), + to_discard: TransactionsWithParsedOutput::new_empty(), + to_retry: TransactionsWithParsedOutput::new_empty(), + state_cache: StateCache::new_dummy(), + block_end_info: None, + next_epoch_state: self.next_epoch_state.clone(), + subscribable_events: vec![], + }) + } + + fn new_impl(inner: Inner) -> Self { + Self { + inner: Arc::new(DropHelper::new(inner)), + } + } + + pub fn num_transactions_to_commit(&self) -> usize { + self.to_commit.txns().len() + } + + pub fn next_version(&self) -> Version { + self.first_version + self.num_transactions_to_commit() as Version + } + + pub fn expect_last_version(&self) -> Version { + self.first_version + self.num_transactions_to_commit() as Version - 1 + } +} + +#[derive(Debug)] +pub struct Inner { + pub is_block: bool, + pub first_version: Version, + // Statuses of the input transactions, in the same order as the input transactions. + // Contains BlockMetadata/Validator transactions, + // but doesn't contain StateCheckpoint/BlockEpilogue, as those get added during execution + pub statuses_for_input_txns: Vec, + // List of all transactions to be committed, including StateCheckpoint/BlockEpilogue if needed. + pub to_commit: TransactionsWithParsedOutput, + pub to_discard: TransactionsWithParsedOutput, + pub to_retry: TransactionsWithParsedOutput, + /// Carries the frozen base state view, so all in-mem nodes involved won't drop before the /// execution result is processed; as well as all the accounts touched during execution, together /// with their proofs. pub state_cache: StateCache, /// Optional StateCheckpoint payload pub block_end_info: Option, + /// Optional EpochState payload. + /// Only present if the block is the last block of an epoch, and is parsed output of the + /// state cache. + pub next_epoch_state: Option, + pub subscribable_events: Vec, +} + +impl Inner { + pub fn check_aborts_discards_retries( + &self, + allow_aborts: bool, + allow_discards: bool, + allow_retries: bool, + ) { + let aborts = self + .to_commit + .iter() + .flat_map(|(txn, output)| match output.status().status() { + Ok(execution_status) => { + if execution_status.is_success() { + None + } else { + Some(format!("{:?}: {:?}", txn, output.status())) + } + }, + Err(_) => None, + }) + .collect::>(); + + let discards_3 = self + .to_discard + .iter() + .take(3) + .map(|(txn, output)| format!("{:?}: {:?}", txn, output.status())) + .collect::>(); + let retries_3 = self + .to_retry + .iter() + .take(3) + .map(|(txn, output)| format!("{:?}: {:?}", txn, output.status())) + .collect::>(); + + if !aborts.is_empty() || !discards_3.is_empty() || !retries_3.is_empty() { + println!( + "Some transactions were not successful: {} aborts, {} discards and {} retries out of {}, examples: aborts: {:?}, discards: {:?}, retries: {:?}", + aborts.len(), + self.to_discard.len(), + self.to_retry.len(), + self.statuses_for_input_txns.len(), + &aborts[..aborts.len().min(3)], + discards_3, + retries_3, + ) + } + + assert!( + allow_aborts || aborts.is_empty(), + "No aborts allowed, {}, examples: {:?}", + aborts.len(), + &aborts[..(aborts.len().min(3))] + ); + assert!( + allow_discards || discards_3.is_empty(), + "No discards allowed, {}, examples: {:?}", + self.to_discard.len(), + discards_3, + ); + assert!( + allow_retries || retries_3.is_empty(), + "No retries allowed, {}, examples: {:?}", + self.to_retry.len(), + retries_3, + ); + } } diff --git a/execution/executor-types/src/ledger_update_output.rs b/execution/executor-types/src/ledger_update_output.rs index 4187f0e80245a..7e9aa4b9bc501 100644 --- a/execution/executor-types/src/ledger_update_output.rs +++ b/execution/executor-types/src/ledger_update_output.rs @@ -6,19 +6,13 @@ use anyhow::{ensure, Result}; use aptos_crypto::HashValue; use aptos_drop_helper::DropHelper; -use aptos_storage_interface::cached_state_view::ShardedStateCache; use aptos_types::{ - contract_event::ContractEvent, proof::accumulator::InMemoryTransactionAccumulator, - state_store::ShardedStateUpdates, - transaction::{ - block_epilogue::BlockEndInfo, Transaction, TransactionInfo, TransactionOutput, - TransactionStatus, Version, - }, + transaction::{TransactionInfo, Version}, }; use derive_more::Deref; use itertools::zip_eq; -use std::sync::Arc; +use std::{clone::Clone, sync::Arc}; #[derive(Clone, Debug, Default, Deref)] pub struct LedgerUpdateOutput { @@ -27,53 +21,44 @@ pub struct LedgerUpdateOutput { } impl LedgerUpdateOutput { - pub fn new_empty(transaction_accumulator: Arc) -> Self { - Self::new_impl(Inner::new_empty(transaction_accumulator)) - } - - #[cfg(any(test, feature = "fuzzing"))] - pub fn new_dummy_with_input_txns(txns: Vec) -> Self { - Self::new_impl(Inner::new_dummy_with_input_txns(txns)) - } - - pub fn new_dummy_with_root_hash(root_hash: HashValue) -> Self { - Self::new_impl(Inner::new_dummy_with_root_hash(root_hash)) - } - - pub fn reconfig_suffix(&self) -> Self { - Self::new_impl(Inner::new_empty(self.transaction_accumulator.clone())) - } - pub fn new( - statuses_for_input_txns: Vec, - transactions: Vec, - transaction_outputs: Vec, transaction_infos: Vec, - per_version_state_updates: Vec, - subscribable_events: Vec, transaction_info_hashes: Vec, - state_updates_until_last_checkpoint: Option, - sharded_state_cache: ShardedStateCache, transaction_accumulator: Arc, parent_accumulator: Arc, - block_end_info: Option, ) -> Self { Self::new_impl(Inner { - statuses_for_input_txns, - transactions, - transaction_outputs, transaction_infos, - per_version_state_updates, - subscribable_events, transaction_info_hashes, - state_updates_until_last_checkpoint, - sharded_state_cache, transaction_accumulator, parent_accumulator, - block_end_info, }) } + pub fn new_empty(transaction_accumulator: Arc) -> Self { + Self::new_impl(Inner::new_empty(transaction_accumulator)) + } + + #[cfg(any(test, feature = "fuzzing"))] + pub fn new_dummy() -> Self { + Self::new_empty(Arc::new(InMemoryTransactionAccumulator::new_empty())) + } + + pub fn new_dummy_with_root_hash(root_hash: HashValue) -> Self { + let transaction_accumulator = Arc::new( + InMemoryTransactionAccumulator::new_empty_with_root_hash(root_hash), + ); + Self::new_impl(Inner { + parent_accumulator: transaction_accumulator.clone(), + transaction_accumulator, + ..Default::default() + }) + } + + pub fn reconfig_suffix(&self) -> Self { + Self::new_impl(Inner::new_empty(self.transaction_accumulator.clone())) + } + fn new_impl(inner: Inner) -> Self { Self { inner: Arc::new(DropHelper::new(inner)), @@ -83,20 +68,10 @@ impl LedgerUpdateOutput { #[derive(Default, Debug)] pub struct Inner { - pub statuses_for_input_txns: Vec, - pub transactions: Vec, - pub transaction_outputs: Vec, pub transaction_infos: Vec, - pub per_version_state_updates: Vec, - pub subscribable_events: Vec, pub transaction_info_hashes: Vec, - pub state_updates_until_last_checkpoint: Option, - pub sharded_state_cache: ShardedStateCache, - /// The in-memory Merkle Accumulator representing a blockchain state consistent with the - /// `state_tree`. pub transaction_accumulator: Arc, pub parent_accumulator: Arc, - pub block_end_info: Option, } impl Inner { @@ -108,50 +83,10 @@ impl Inner { } } - #[cfg(any(test, feature = "fuzzing"))] - pub fn new_dummy_with_input_txns(transactions: Vec) -> Self { - let num_txns = transactions.len(); - Self { - transactions, - statuses_for_input_txns: vec![ - TransactionStatus::Keep( - aptos_types::transaction::ExecutionStatus::Success - ); - num_txns - ], - ..Default::default() - } - } - - pub fn new_dummy_with_root_hash(root_hash: HashValue) -> Self { - let transaction_accumulator = Arc::new( - InMemoryTransactionAccumulator::new_empty_with_root_hash(root_hash), - ); - Self { - parent_accumulator: transaction_accumulator.clone(), - transaction_accumulator, - ..Default::default() - } - } - pub fn txn_accumulator(&self) -> &Arc { &self.transaction_accumulator } - /// Ensure that every block committed by consensus ends with a state checkpoint. That can be - /// one of the two cases: 1. a reconfiguration (txns in the proposed block after the txn caused - /// the reconfiguration will be retried) 2. a Transaction::StateCheckpoint at the end of the - /// block. - pub fn ensure_ends_with_state_checkpoint(&self) -> Result<()> { - ensure!( - self.transactions - .last() - .map_or(true, |t| t.is_non_reconfig_block_ending()), - "Block not ending with a state checkpoint.", - ); - Ok(()) - } - pub fn ensure_transaction_infos_match( &self, transaction_infos: &[TransactionInfo], @@ -176,15 +111,7 @@ impl Inner { Ok(()) } - pub fn next_version(&self) -> Version { - self.transaction_accumulator.num_leaves() as Version - } - pub fn first_version(&self) -> Version { self.parent_accumulator.num_leaves } - - pub fn num_txns(&self) -> usize { - self.transactions.len() - } } diff --git a/execution/executor-types/src/lib.rs b/execution/executor-types/src/lib.rs index 8d5bf3cb8c1eb..ca24f3283d581 100644 --- a/execution/executor-types/src/lib.rs +++ b/execution/executor-types/src/lib.rs @@ -268,23 +268,25 @@ pub struct ChunkCommitNotification { pub reconfiguration_occurred: bool, } -pub struct ProofReader { - proofs: HashMap, +pub struct ProofReader<'a> { + proofs: Option<&'a HashMap>, } -impl ProofReader { - pub fn new(proofs: HashMap) -> Self { - ProofReader { proofs } +impl<'a> ProofReader<'a> { + pub fn new(proofs: &'a HashMap) -> Self { + Self { + proofs: Some(proofs), + } } pub fn new_empty() -> Self { - Self::new(HashMap::new()) + Self { proofs: None } } } -impl ProofRead for ProofReader { +impl<'a> ProofRead for ProofReader<'a> { fn get_proof(&self, key: HashValue) -> Option<&SparseMerkleProofExt> { - self.proofs.get(&key) + self.proofs.and_then(|proofs| proofs.get(&key)) } } diff --git a/execution/executor-types/src/parsed_transaction_output.rs b/execution/executor-types/src/parsed_transaction_output.rs index ec1f9d3668c3e..2a5e3569bdfb4 100644 --- a/execution/executor-types/src/parsed_transaction_output.rs +++ b/execution/executor-types/src/parsed_transaction_output.rs @@ -3,6 +3,8 @@ use aptos_types::{ contract_event::ContractEvent, + event::EventKey, + on_chain_config, transaction::{ Transaction, TransactionAuxiliaryData, TransactionOutput, TransactionOutputProvider, TransactionStatus, @@ -10,9 +12,12 @@ use aptos_types::{ write_set::WriteSet, }; use itertools::zip_eq; +use once_cell::sync::Lazy; use std::ops::Deref; -#[derive(Clone)] +pub static NEW_EPOCH_EVENT_KEY: Lazy = Lazy::new(on_chain_config::new_epoch_event_key); + +#[derive(Clone, Debug)] pub struct ParsedTransactionOutput { output: TransactionOutput, reconfig_events: Vec, @@ -82,10 +87,10 @@ impl ParsedTransactionOutput { } } -#[derive(Default)] +#[derive(Debug, Default)] pub struct TransactionsWithParsedOutput { - transactions: Vec, - parsed_output: Vec, + pub transactions: Vec, + pub parsed_output: Vec, } impl TransactionsWithParsedOutput { @@ -124,6 +129,14 @@ impl TransactionsWithParsedOutput { &self.transactions } + pub fn make_transaction_outputs(&self) -> Vec { + self.parsed_output + .iter() + .map(|t| &t.output) + .cloned() + .collect() + } + pub fn parsed_outputs(&self) -> &Vec { &self.parsed_output } diff --git a/execution/executor-types/src/state_checkpoint_output.rs b/execution/executor-types/src/state_checkpoint_output.rs index b8bb881f4ed6d..5e3c32cc27749 100644 --- a/execution/executor-types/src/state_checkpoint_output.rs +++ b/execution/executor-types/src/state_checkpoint_output.rs @@ -4,14 +4,12 @@ #![forbid(unsafe_code)] use crate::parsed_transaction_output::TransactionsWithParsedOutput; -use anyhow::{ensure, Result}; use aptos_crypto::HashValue; -use aptos_storage_interface::cached_state_view::ShardedStateCache; -use aptos_types::{ - state_store::ShardedStateUpdates, - transaction::{block_epilogue::BlockEndInfo, TransactionStatus}, -}; -use itertools::zip_eq; +use aptos_drop_helper::DropHelper; +use aptos_storage_interface::state_delta::StateDelta; +use aptos_types::{state_store::ShardedStateUpdates, transaction::TransactionStatus}; +use derive_more::Deref; +use std::sync::Arc; #[derive(Default)] pub struct TransactionsByStatus { @@ -61,156 +59,61 @@ impl TransactionsByStatus { } } -#[derive(Default)] +#[derive(Clone, Debug, Default, Deref)] pub struct StateCheckpointOutput { - txns: TransactionsByStatus, - per_version_state_updates: Vec, - state_checkpoint_hashes: Vec>, - state_updates_before_last_checkpoint: Option, - sharded_state_cache: ShardedStateCache, - block_end_info: Option, + #[deref] + inner: Arc>, } impl StateCheckpointOutput { pub fn new( - txns: TransactionsByStatus, + parent_state: Arc, + result_state: Arc, + state_updates_before_last_checkpoint: Option, per_version_state_updates: Vec, state_checkpoint_hashes: Vec>, - state_updates_before_last_checkpoint: Option, - sharded_state_cache: ShardedStateCache, - block_end_info: Option, ) -> Self { - Self { - txns, + Self::new_impl(Inner { + parent_state, + result_state, + state_updates_before_last_checkpoint, per_version_state_updates, state_checkpoint_hashes, - state_updates_before_last_checkpoint, - sharded_state_cache, - block_end_info, - } + }) } - pub fn input_txns_len(&self) -> usize { - self.txns.input_txns_len() + pub fn new_empty(state: Arc) -> Self { + Self::new_impl(Inner { + parent_state: state.clone(), + result_state: state, + state_updates_before_last_checkpoint: None, + per_version_state_updates: vec![], + state_checkpoint_hashes: vec![], + }) } - pub fn txns_to_commit_len(&self) -> usize { - self.txns.to_commit.len() + pub fn new_dummy() -> Self { + Self::new_empty(Arc::new(StateDelta::new_empty())) } - pub fn into_inner( - self, - ) -> ( - TransactionsByStatus, - Vec, - Vec>, - Option, - ShardedStateCache, - Option, - ) { - ( - self.txns, - self.per_version_state_updates, - self.state_checkpoint_hashes, - self.state_updates_before_last_checkpoint, - self.sharded_state_cache, - self.block_end_info, - ) + fn new_impl(inner: Inner) -> Self { + Self { + inner: Arc::new(DropHelper::new(inner)), + } } - pub fn check_and_update_state_checkpoint_hashes( - &mut self, - trusted_hashes: Vec>, - ) -> Result<()> { - let len = self.state_checkpoint_hashes.len(); - ensure!( - len == trusted_hashes.len(), - "Number of txns doesn't match. self: {len}, trusted: {}", - trusted_hashes.len() - ); - - zip_eq( - self.state_checkpoint_hashes.iter_mut(), - trusted_hashes.iter(), - ) - .try_for_each(|(self_hash, trusted_hash)| { - if self_hash.is_none() && trusted_hash.is_some() { - *self_hash = *trusted_hash; - } else { - ensure!(self_hash == trusted_hash, - "State checkpoint hash doesn't match, self: {self_hash:?}, trusted: {trusted_hash:?}"); - } - Ok(()) - }) + pub fn reconfig_suffix(&self) -> Self { + Self::new_empty(self.result_state.clone()) } +} - pub fn check_aborts_discards_retries( - &self, - allow_aborts: bool, - allow_discards: bool, - allow_retries: bool, - ) { - let aborts = self - .txns - .to_commit - .iter() - .flat_map(|(txn, output)| match output.status().status() { - Ok(execution_status) => { - if execution_status.is_success() { - None - } else { - Some(format!("{:?}: {:?}", txn, output.status())) - } - }, - Err(_) => None, - }) - .collect::>(); - - let discards_3 = self - .txns - .to_discard - .iter() - .take(3) - .map(|(txn, output)| format!("{:?}: {:?}", txn, output.status())) - .collect::>(); - let retries_3 = self - .txns - .to_retry - .iter() - .take(3) - .map(|(txn, output)| format!("{:?}: {:?}", txn, output.status())) - .collect::>(); - - if !aborts.is_empty() || !discards_3.is_empty() || !retries_3.is_empty() { - println!( - "Some transactions were not successful: {} aborts, {} discards and {} retries out of {}, examples: aborts: {:?}, discards: {:?}, retries: {:?}", - aborts.len(), - self.txns.to_discard.len(), - self.txns.to_retry.len(), - self.input_txns_len(), - &aborts[..(aborts.len().min(3))], - discards_3, - retries_3, - ) - } - - assert!( - allow_aborts || aborts.is_empty(), - "No aborts allowed, {}, examples: {:?}", - aborts.len(), - &aborts[..(aborts.len().min(3))] - ); - assert!( - allow_discards || discards_3.is_empty(), - "No discards allowed, {}, examples: {:?}", - self.txns.to_discard.len(), - discards_3, - ); - assert!( - allow_retries || retries_3.is_empty(), - "No retries allowed, {}, examples: {:?}", - self.txns.to_retry.len(), - retries_3, - ); - } +#[derive(Debug, Default)] +pub struct Inner { + pub parent_state: Arc, + pub result_state: Arc, + pub state_updates_before_last_checkpoint: Option, + pub per_version_state_updates: Vec, + pub state_checkpoint_hashes: Vec>, } + +impl Inner {} diff --git a/execution/executor-types/src/state_compute_result.rs b/execution/executor-types/src/state_compute_result.rs index 0fd5f85ea24d6..66517fd15f97f 100644 --- a/execution/executor-types/src/state_compute_result.rs +++ b/execution/executor-types/src/state_compute_result.rs @@ -1,19 +1,22 @@ // Copyright (c) Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::{ChunkCommitNotification, LedgerUpdateOutput}; +use crate::{ + execution_output::ExecutionOutput, state_checkpoint_output::StateCheckpointOutput, + ChunkCommitNotification, LedgerUpdateOutput, +}; use aptos_crypto::{ hash::{TransactionAccumulatorHasher, ACCUMULATOR_PLACEHOLDER_HASH}, HashValue, }; -use aptos_storage_interface::{chunk_to_commit::ChunkToCommit, state_delta::StateDelta}; +use aptos_storage_interface::chunk_to_commit::ChunkToCommit; use aptos_types::{ contract_event::ContractEvent, epoch_state::EpochState, proof::{accumulator::InMemoryTransactionAccumulator, AccumulatorExtensionProof}, transaction::{Transaction, TransactionStatus, Version}, }; -use std::{cmp::max, sync::Arc}; +use std::sync::Arc; /// A structure that summarizes the result of the execution needed for consensus to agree on. /// The execution is responsible for generating the ID of the new state, which is returned in the @@ -23,37 +26,33 @@ use std::{cmp::max, sync::Arc}; /// of success / failure of the transactions. /// Note that the specific details of compute_status are opaque to StateMachineReplication, /// which is going to simply pass the results between StateComputer and PayloadClient. -#[derive(Debug, Default, Clone)] +#[derive(Clone, Debug)] pub struct StateComputeResult { - pub parent_state: Arc, - pub result_state: Arc, + pub execution_output: ExecutionOutput, + pub state_checkpoint_output: StateCheckpointOutput, pub ledger_update_output: LedgerUpdateOutput, - /// If set, this is the new epoch info that should be changed to if this is committed. - pub next_epoch_state: Option, } impl StateComputeResult { pub fn new( - parent_state: Arc, - result_state: Arc, + execution_output: ExecutionOutput, + state_checkpoint_output: StateCheckpointOutput, ledger_update_output: LedgerUpdateOutput, - next_epoch_state: Option, ) -> Self { Self { - parent_state, - result_state, + execution_output, + state_checkpoint_output, ledger_update_output, - next_epoch_state, } } - pub fn new_empty(transaction_accumulator: Arc) -> Self { - let result_state = Arc::new(StateDelta::new_empty()); + pub fn new_dummy_with_accumulator( + transaction_accumulator: Arc, + ) -> Self { Self { - parent_state: result_state.clone(), - result_state, + execution_output: ExecutionOutput::new_dummy(), + state_checkpoint_output: StateCheckpointOutput::new_dummy(), ledger_update_output: LedgerUpdateOutput::new_empty(transaction_accumulator), - next_epoch_state: None, } } @@ -61,12 +60,10 @@ impl StateComputeResult { /// this function is used in RandomComputeResultStateComputer to assert that the compute /// function is really called. pub fn new_dummy_with_root_hash(root_hash: HashValue) -> Self { - let result_state = Arc::new(StateDelta::new_empty()); Self { - parent_state: result_state.clone(), - result_state, + execution_output: ExecutionOutput::new_dummy(), + state_checkpoint_output: StateCheckpointOutput::new_dummy(), ledger_update_output: LedgerUpdateOutput::new_dummy_with_root_hash(root_hash), - next_epoch_state: None, } } @@ -75,40 +72,36 @@ impl StateComputeResult { /// where the real compute result is generated after ordering_state_computer.commit pushes /// the blocks and the finality proof to the execution phase. pub fn new_dummy() -> Self { - StateComputeResult::new_dummy_with_root_hash(*ACCUMULATOR_PLACEHOLDER_HASH) + Self::new_dummy_with_root_hash(*ACCUMULATOR_PLACEHOLDER_HASH) } #[cfg(any(test, feature = "fuzzing"))] pub fn new_dummy_with_input_txns(txns: Vec) -> Self { - let result_state = Arc::new(StateDelta::new_empty()); Self { - parent_state: result_state.clone(), - result_state, - ledger_update_output: LedgerUpdateOutput::new_dummy_with_input_txns(txns), - next_epoch_state: None, + execution_output: ExecutionOutput::new_dummy_with_input_txns(txns), + state_checkpoint_output: StateCheckpointOutput::new_dummy(), + ledger_update_output: LedgerUpdateOutput::new_dummy(), } } - pub fn version(&self) -> Version { - max(self.ledger_update_output.next_version(), 1) - .checked_sub(1) - .expect("Integer overflow occurred") - } - pub fn root_hash(&self) -> HashValue { self.ledger_update_output.transaction_accumulator.root_hash } pub fn compute_status_for_input_txns(&self) -> &Vec { - &self.ledger_update_output.statuses_for_input_txns + &self.execution_output.statuses_for_input_txns } - pub fn transactions_to_commit_len(&self) -> usize { - self.ledger_update_output.transactions.len() + pub fn num_input_transactions(&self) -> usize { + self.execution_output.statuses_for_input_txns.len() + } + + pub fn num_transactions_to_commit(&self) -> usize { + self.execution_output.num_transactions_to_commit() } pub fn epoch_state(&self) -> &Option { - &self.next_epoch_state + &self.execution_output.next_epoch_state } pub fn extension_proof(&self) -> AccumulatorExtensionProof { @@ -122,49 +115,57 @@ impl StateComputeResult { ) } + pub fn transactions_to_commit(&self) -> &[Transaction] { + self.execution_output.to_commit.txns() + } + pub fn transaction_info_hashes(&self) -> &Vec { &self.ledger_update_output.transaction_info_hashes } - pub fn num_leaves(&self) -> u64 { - self.ledger_update_output.next_version() + pub fn expect_last_version(&self) -> Version { + self.execution_output.expect_last_version() } - pub fn has_reconfiguration(&self) -> bool { - self.next_epoch_state.is_some() + pub fn next_version(&self) -> Version { + self.execution_output.next_version() } - pub fn subscribable_events(&self) -> &[ContractEvent] { - &self.ledger_update_output.subscribable_events + pub fn last_version_or_0(&self) -> Version { + self.next_version().saturating_sub(1) } - pub fn is_reconfiguration_suffix(&self) -> bool { - self.has_reconfiguration() && self.compute_status_for_input_txns().is_empty() + pub fn has_reconfiguration(&self) -> bool { + self.execution_output.next_epoch_state.is_some() + } + + pub fn subscribable_events(&self) -> &[ContractEvent] { + &self.execution_output.subscribable_events } pub fn make_chunk_commit_notification(&self) -> ChunkCommitNotification { ChunkCommitNotification { - subscribable_events: self.ledger_update_output.subscribable_events.clone(), - committed_transactions: self.ledger_update_output.transactions.clone(), - reconfiguration_occurred: self.has_reconfiguration(), + subscribable_events: self.execution_output.subscribable_events.clone(), + committed_transactions: self.execution_output.to_commit.txns().to_vec(), + reconfiguration_occurred: self.execution_output.next_epoch_state.is_some(), } } pub fn as_chunk_to_commit(&self) -> ChunkToCommit { ChunkToCommit { first_version: self.ledger_update_output.first_version(), - transactions: &self.ledger_update_output.transactions, - transaction_outputs: &self.ledger_update_output.transaction_outputs, + transactions: self.execution_output.to_commit.txns(), + transaction_outputs: self.execution_output.to_commit.make_transaction_outputs(), transaction_infos: &self.ledger_update_output.transaction_infos, - per_version_state_updates: &self.ledger_update_output.per_version_state_updates, - base_state_version: self.parent_state.base_version, - latest_in_memory_state: &self.result_state, + per_version_state_updates: &self.state_checkpoint_output.per_version_state_updates, + base_state_version: self.state_checkpoint_output.parent_state.base_version, + latest_in_memory_state: &self.state_checkpoint_output.result_state, state_updates_until_last_checkpoint: self - .ledger_update_output - .state_updates_until_last_checkpoint + .state_checkpoint_output + .state_updates_before_last_checkpoint .as_ref(), - sharded_state_cache: Some(&self.ledger_update_output.sharded_state_cache), - is_reconfig: self.ledger_update_output.block_end_info.is_some(), + sharded_state_cache: Some(&self.execution_output.state_cache.sharded_state_cache), + is_reconfig: self.execution_output.block_end_info.is_some(), } } } diff --git a/execution/executor/src/block_executor/block_tree/mod.rs b/execution/executor/src/block_executor/block_tree/mod.rs index 3967ac1e63120..013a5c58235dc 100644 --- a/execution/executor/src/block_executor/block_tree/mod.rs +++ b/execution/executor/src/block_executor/block_tree/mod.rs @@ -112,11 +112,6 @@ impl BlockLookupInner { .get() .upgrade() .ok_or_else(|| anyhow!("block dropped unexpected."))?; - ensure!( - existing.output.is_same_state(&output), - "Different block with same id {:x}", - id, - ); Ok((existing, true, parent_block)) }, Entry::Vacant(entry) => { @@ -227,10 +222,9 @@ impl BlockTree { ledger_info.consensus_block_id() }; - let output = PartialStateComputeResult::new_empty_completed( + let output = PartialStateComputeResult::new_empty( ledger_view.state().clone(), ledger_view.txn_accumulator().clone(), - None, ); block_lookup.fetch_or_add_block(id, output, None) @@ -253,17 +247,11 @@ impl BlockTree { .original_reconfiguration_block_id(committed_block_id), "Updated with a new root block as a virtual block of reconfiguration block" ); - let commited_output = last_committed_block.output.expect_complete_result(); - let output = PartialStateComputeResult::new_empty_completed( - commited_output.result_state.clone(), - commited_output - .ledger_update_output - .transaction_accumulator - .clone(), + self.block_lookup.fetch_or_add_block( + epoch_genesis_id, + last_committed_block.output.clone(), None, - ); - self.block_lookup - .fetch_or_add_block(epoch_genesis_id, output, None)? + )? } else { info!( LogSchema::new(LogEntry::SpeculationCache).root_block_id(committed_block_id), @@ -272,17 +260,12 @@ impl BlockTree { last_committed_block }; root.output - .state() + .expect_result_state() .current .log_generation("block_tree_base"); - let old_root = { - let mut root_locked = self.root.lock(); - // send old root to async task to drop it - let old_root = root_locked.clone(); - *root_locked = root; - old_root - }; + let old_root = std::mem::replace(&mut *self.root.lock(), root); + // send old root to async task to drop it Ok(DEFAULT_DROPPER.schedule_drop_with_waiter(old_root)) } diff --git a/execution/executor/src/block_executor/block_tree/test.rs b/execution/executor/src/block_executor/block_tree/test.rs index 3687b8059e52b..f69df868eafa0 100644 --- a/execution/executor/src/block_executor/block_tree/test.rs +++ b/execution/executor/src/block_executor/block_tree/test.rs @@ -40,10 +40,9 @@ fn id(index: u64) -> HashValue { fn empty_block() -> PartialStateComputeResult { let result_view = ExecutedTrees::new_empty(); - PartialStateComputeResult::new_empty_completed( + PartialStateComputeResult::new_empty( result_view.state().clone(), result_view.transaction_accumulator.clone(), - None, ) } diff --git a/execution/executor/src/block_executor/mod.rs b/execution/executor/src/block_executor/mod.rs index 1dd9f38d343f9..fe150370826b0 100644 --- a/execution/executor/src/block_executor/mod.rs +++ b/execution/executor/src/block_executor/mod.rs @@ -26,7 +26,6 @@ use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER; use aptos_infallible::RwLock; use aptos_logger::prelude::*; use aptos_metrics_core::{IntGaugeHelper, TimerHelper}; -use aptos_scratchpad::SparseMerkleTree; use aptos_storage_interface::{ async_proof_fetcher::AsyncProofFetcher, cached_state_view::CachedStateView, DbReaderWriter, }; @@ -36,7 +35,7 @@ use aptos_types::{ partitioner::{ExecutableBlock, ExecutableTransactions}, }, ledger_info::LedgerInfoWithSignatures, - state_store::{state_value::StateValue, StateViewId}, + state_store::StateViewId, }; use aptos_vm::AptosVM; use block_tree::BlockTree; @@ -50,6 +49,7 @@ pub trait TransactionBlockExecutor: Send + Sync { transactions: ExecutableTransactions, state_view: CachedStateView, onchain_config: BlockExecutorConfigFromOnchain, + append_state_checkpoint_to_block: Option, ) -> Result; } @@ -58,11 +58,13 @@ impl TransactionBlockExecutor for AptosVM { transactions: ExecutableTransactions, state_view: CachedStateView, onchain_config: BlockExecutorConfigFromOnchain, + append_state_checkpoint_to_block: Option, ) -> Result { DoGetExecutionOutput::by_transaction_execution::( transactions, state_view, onchain_config, + append_state_checkpoint_to_block, ) } } @@ -83,14 +85,6 @@ where } } - pub fn root_smt(&self) -> SparseMerkleTree { - self.inner - .read() - .as_ref() - .expect("BlockExecutor is not reset") - .root_smt() - } - fn maybe_initialize(&self) -> Result<()> { if self.inner.read().is_none() { self.reset()?; @@ -198,10 +192,6 @@ where phantom: PhantomData, }) } - - fn root_smt(&self) -> SparseMerkleTree { - self.block_tree.root_block().output.state().current.clone() - } } impl BlockExecutorInner @@ -232,11 +222,12 @@ where .ok_or(ExecutorError::BlockNotFound(parent_block_id))?; let parent_output = &parent_block.output; info!( - LogSchema::new(LogEntry::BlockExecutor).block_id(block_id), + block_id = block_id, + first_version = parent_output.execution_output.next_version(), "execute_block" ); let committed_block_id = self.committed_block_id(); - let (state, epoch_state, state_checkpoint_output) = + let (execution_output, state_checkpoint_output) = if parent_block_id != committed_block_id && parent_output.has_reconfiguration() { // ignore reconfiguration suffix, even if the block is non-empty info!( @@ -244,56 +235,59 @@ where "reconfig_descendant_block_received" ); ( - parent_output.state().clone(), - parent_output.epoch_state().clone(), - StateCheckpointOutput::default(), + parent_output.execution_output.reconfig_suffix(), + parent_output + .expect_state_checkpoint_output() + .reconfig_suffix(), ) } else { let state_view = { let _timer = OTHER_TIMERS.timer_with(&["verified_state_view"]); - info!("next_version: {}", parent_output.next_version()); CachedStateView::new( StateViewId::BlockExecution { block_id }, Arc::clone(&self.db.reader), - parent_output.next_version(), - parent_output.state().current.clone(), + parent_output.execution_output.next_version(), + parent_output.expect_result_state().current.clone(), Arc::new(AsyncProofFetcher::new(self.db.reader.clone())), )? }; - let chunk_output = { + let execution_output = { let _timer = VM_EXECUTE_BLOCK.start_timer(); fail_point!("executor::vm_execute_block", |_| { Err(ExecutorError::from(anyhow::anyhow!( "Injected error in vm_execute_block" ))) }); - V::execute_transaction_block(transactions, state_view, onchain_config.clone())? + V::execute_transaction_block( + transactions, + state_view, + onchain_config.clone(), + Some(block_id), + )? }; let _timer = OTHER_TIMERS.timer_with(&["state_checkpoint"]); - THREAD_MANAGER.get_exe_cpu_pool().install(|| { + let state_checkpoint_output = THREAD_MANAGER.get_exe_cpu_pool().install(|| { fail_point!("executor::block_state_checkpoint", |_| { Err(anyhow::anyhow!("Injected error in block state checkpoint.")) }); - DoStateCheckpoint::run( - chunk_output, - parent_output.state(), - Some(block_id), - None, - /*is_block=*/ true, + &execution_output, + parent_output.expect_result_state(), + Option::>::None, ) - })? + })?; + (execution_output, state_checkpoint_output) }; + let output = PartialStateComputeResult::new(execution_output); + output.set_state_checkpoint_output(state_checkpoint_output.clone()); - let _ = self.block_tree.add_block( - parent_block_id, - block_id, - PartialStateComputeResult::new(parent_output.result_state.clone(), state, epoch_state), - )?; + let _ = self + .block_tree + .add_block(parent_block_id, block_id, output)?; Ok(state_checkpoint_output) } @@ -301,7 +295,7 @@ where &self, block_id: HashValue, parent_block_id: HashValue, - state_checkpoint_output: StateCheckpointOutput, + _state_checkpoint_output: StateCheckpointOutput, ) -> ExecutorResult { let _timer = UPDATE_LEDGER.start_timer(); info!( @@ -322,6 +316,7 @@ where let parent_output = parent_block.output.expect_ledger_update_output(); let parent_accumulator = parent_output.txn_accumulator(); let block = block_vec.pop().expect("Must exist").unwrap(); + let output = &block.output; parent_block.ensure_has_child(block_id)?; if let Some(complete_result) = block.output.get_complete_result() { return Ok(complete_result); @@ -335,16 +330,15 @@ where ); parent_output.reconfig_suffix() } else { - let (output, _, _) = THREAD_MANAGER.get_non_exe_cpu_pool().install(|| { - DoLedgerUpdate::run(state_checkpoint_output, parent_accumulator.clone()) - })?; - output + THREAD_MANAGER.get_non_exe_cpu_pool().install(|| { + DoLedgerUpdate::run( + &output.execution_output, + output.expect_state_checkpoint_output(), + parent_accumulator.clone(), + ) + })? }; - if !block.output.has_reconfiguration() { - output.ensure_ends_with_state_checkpoint()?; - } - block.output.set_ledger_update_output(output); Ok(block.output.expect_complete_result()) } @@ -363,7 +357,7 @@ where }); let output = block.output.expect_complete_result(); - let num_txns = output.transactions_to_commit_len(); + let num_txns = output.num_transactions_to_commit(); if num_txns != 0 { let _timer = SAVE_TRANSACTIONS.start_timer(); self.db diff --git a/execution/executor/src/chunk_executor/chunk_commit_queue.rs b/execution/executor/src/chunk_executor/chunk_commit_queue.rs index b14f4853bc3ac..1af1e28908f00 100644 --- a/execution/executor/src/chunk_executor/chunk_commit_queue.rs +++ b/execution/executor/src/chunk_executor/chunk_commit_queue.rs @@ -11,15 +11,12 @@ use crate::{ }, }; use anyhow::{anyhow, ensure, Result}; -use aptos_executor_types::state_checkpoint_output::StateCheckpointOutput; use aptos_storage_interface::{state_delta::StateDelta, DbReader, ExecutedTrees}; use aptos_types::{proof::accumulator::InMemoryTransactionAccumulator, transaction::Version}; use std::{collections::VecDeque, sync::Arc}; pub(crate) struct ChunkToUpdateLedger { pub output: PartialStateComputeResult, - /// transactions sorted by status, state roots, state updates - pub state_checkpoint_output: StateCheckpointOutput, /// from the input -- can be checked / used only after the transaction accumulator /// is updated. @@ -68,7 +65,7 @@ impl ChunkCommitQueue { &mut self, chunk_to_update_ledger: ChunkToUpdateLedger, ) -> Result<()> { - self.latest_state = chunk_to_update_ledger.output.result_state.clone(); + self.latest_state = chunk_to_update_ledger.output.expect_result_state().clone(); self.to_update_ledger .push_back(Some(chunk_to_update_ledger)); Ok(()) diff --git a/execution/executor/src/chunk_executor/chunk_result_verifier.rs b/execution/executor/src/chunk_executor/chunk_result_verifier.rs index 5cc222163c99f..d9c347e6374de 100644 --- a/execution/executor/src/chunk_executor/chunk_result_verifier.rs +++ b/execution/executor/src/chunk_executor/chunk_result_verifier.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 use anyhow::{ensure, Result}; -use aptos_crypto::HashValue; use aptos_executor_types::LedgerUpdateOutput; use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER; use aptos_types::{ @@ -21,13 +20,6 @@ pub trait ChunkResultVerifier { fn transaction_infos(&self) -> &[TransactionInfo]; - fn state_checkpoint_hashes(&self) -> Vec> { - self.transaction_infos() - .iter() - .map(|t| t.state_checkpoint_hash()) - .collect() - } - fn maybe_select_chunk_ending_ledger_info( &self, ledger_update_output: &LedgerUpdateOutput, diff --git a/execution/executor/src/chunk_executor/mod.rs b/execution/executor/src/chunk_executor/mod.rs index f13a36fc4d624..1a541c88247b3 100644 --- a/execution/executor/src/chunk_executor/mod.rs +++ b/execution/executor/src/chunk_executor/mod.rs @@ -258,7 +258,7 @@ impl ChunkExecutorInner { }; let output = chunk.output.expect_complete_result(); - let num_txns = output.transactions_to_commit_len(); + let num_txns = output.num_transactions_to_commit(); if chunk.ledger_info_opt.is_some() || num_txns != 0 { let _timer = CHUNK_OTHER_TIMERS.timer_with(&["commit_chunk_impl__save_txns"]); fail_point!("executor::commit_chunk", |_| { @@ -302,27 +302,27 @@ impl ChunkExecutorInner { let num_txns = chunk.len(); let state_view = self.latest_state_view(&parent_state)?; - let chunk_output = chunk.into_output::(state_view)?; + let execution_output = chunk.into_output::(state_view)?; // Calculate state snapshot - let (result_state, next_epoch_state, state_checkpoint_output) = DoStateCheckpoint::run( - chunk_output, + let state_checkpoint_output = DoStateCheckpoint::run( + &execution_output, &self.commit_queue.lock().latest_state(), - None, // append_state_checkpoint_to_block - Some(chunk_verifier.state_checkpoint_hashes()), - false, // is_block + Some( + chunk_verifier + .transaction_infos() + .iter() + .map(|t| t.state_checkpoint_hash()), + ), )?; + let output = PartialStateComputeResult::new(execution_output); + output.set_state_checkpoint_output(state_checkpoint_output); // Enqueue for next stage. self.commit_queue .lock() .enqueue_for_ledger_update(ChunkToUpdateLedger { - output: PartialStateComputeResult::new( - parent_state.clone(), - result_state, - next_epoch_state, - ), - state_checkpoint_output, + output, chunk_verifier, })?; @@ -346,23 +346,24 @@ impl ChunkExecutorInner { }; let ChunkToUpdateLedger { output, - state_checkpoint_output, chunk_verifier, } = chunk; let first_version = parent_accumulator.num_leaves(); - let (ledger_update_output, to_discard, to_retry) = { + let ledger_update_output = { let _timer = CHUNK_OTHER_TIMERS.timer_with(&["chunk_update_ledger__calculate"]); - DoLedgerUpdate::run(state_checkpoint_output, parent_accumulator.clone())? + DoLedgerUpdate::run( + &output.execution_output, + output.expect_state_checkpoint_output(), + parent_accumulator.clone(), + )? }; - ensure!(to_discard.is_empty(), "Unexpected discard."); - ensure!(to_retry.is_empty(), "Unexpected retry."); chunk_verifier.verify_chunk_result(&parent_accumulator, &ledger_update_output)?; let ledger_info_opt = chunk_verifier.maybe_select_chunk_ending_ledger_info( &ledger_update_output, - output.next_epoch_state.as_ref(), + output.execution_output.next_epoch_state.as_ref(), )?; output.set_ledger_update_output(ledger_update_output); @@ -373,7 +374,7 @@ impl ChunkExecutorInner { let num_txns = executed_chunk .output .expect_complete_result() - .transactions_to_commit_len(); + .num_transactions_to_commit(); let _timer = CHUNK_OTHER_TIMERS.timer_with(&["chunk_update_ledger__save"]); self.commit_queue @@ -497,14 +498,14 @@ impl ChunkExecutorInner { let chunk = self.commit_chunk_impl()?; let output = chunk.output.expect_complete_result(); - let num_committed = output.transactions_to_commit_len(); + let num_committed = output.num_transactions_to_commit(); info!( num_committed = num_committed, tps = num_committed as f64 / started.elapsed().as_secs_f64(), "TransactionReplayer::commit() OK" ); - Ok(output.version()) + Ok(output.expect_last_version()) } /// Remove `end_version - begin_version` transactions from the mutable input arguments and replay. @@ -600,15 +601,16 @@ impl ChunkExecutorInner { .collect::>(); // State sync executor shouldn't have block gas limit. - let chunk_output = DoGetExecutionOutput::by_transaction_execution::( + let execution_output = DoGetExecutionOutput::by_transaction_execution::( txns.into(), state_view, BlockExecutorConfigFromOnchain::new_no_block_limit(), + None, )?; // not `zip_eq`, deliberately for (version, txn_out, txn_info, write_set, events) in multizip(( begin_version..end_version, - chunk_output.transaction_outputs.iter(), + execution_output.to_commit.parsed_outputs().iter(), transaction_infos.iter(), write_sets.iter(), event_vecs.iter(), @@ -619,13 +621,13 @@ impl ChunkExecutorInner { Some(write_set), Some(events), ) { - if verify_execution_mode.is_lazy_quit() { + return if verify_execution_mode.is_lazy_quit() { error!("(Not quitting right away.) {}", err); verify_execution_mode.mark_seen_error(); - return Ok(version + 1); + Ok(version + 1) } else { - return Err(err); - } + Err(err) + }; } } Ok(end_version) diff --git a/execution/executor/src/chunk_executor/transaction_chunk.rs b/execution/executor/src/chunk_executor/transaction_chunk.rs index b9f3b08205f16..1e3533a0b9491 100644 --- a/execution/executor/src/chunk_executor/transaction_chunk.rs +++ b/execution/executor/src/chunk_executor/transaction_chunk.rs @@ -81,6 +81,7 @@ impl TransactionChunk for ChunkToExecute { sig_verified_txns.into(), state_view, BlockExecutorConfigFromOnchain::new_no_block_limit(), + None, ) } } diff --git a/execution/executor/src/db_bootstrapper/mod.rs b/execution/executor/src/db_bootstrapper/mod.rs index e7b0922a1a3d2..dcf1b04690c0f 100644 --- a/execution/executor/src/db_bootstrapper/mod.rs +++ b/execution/executor/src/db_bootstrapper/mod.rs @@ -135,22 +135,27 @@ pub fn calculate_genesis( vec![genesis_txn.clone().into()].into(), base_state_view, BlockExecutorConfigFromOnchain::new_no_block_limit(), + None, )?; - - let output = ApplyExecutionOutput::run(execution_output, &executed_trees, None)?; - ensure!( - output.expect_ledger_update_output().num_txns() != 0, + execution_output.num_transactions_to_commit() != 0, "Genesis txn execution failed." ); + ensure!( + execution_output.next_epoch_state.is_some(), + "Genesis txn didn't output reconfig event." + ); + let output = ApplyExecutionOutput::run(execution_output, &executed_trees)?; let timestamp_usecs = if genesis_version == 0 { // TODO(aldenhu): fix existing tests before using real timestamp and check on-chain epoch. GENESIS_TIMESTAMP_USECS } else { - let state_view = output.verified_state_view( + let state_view = CachedStateView::new( StateViewId::Miscellaneous, Arc::clone(&db.reader), + output.execution_output.next_version(), + output.expect_result_state().current.clone(), Arc::new(AsyncProofFetcher::new(db.reader.clone())), )?; let next_epoch = epoch @@ -162,10 +167,6 @@ pub fn calculate_genesis( ); get_state_timestamp(&state_view)? }; - ensure!( - output.next_epoch_state.is_some(), - "Genesis txn didn't output reconfig event." - ); let ledger_info_with_sigs = LedgerInfoWithSignatures::new( LedgerInfo::new( @@ -179,7 +180,7 @@ pub fn calculate_genesis( .root_hash(), genesis_version, timestamp_usecs, - output.next_epoch_state.clone(), + output.execution_output.next_epoch_state.clone(), ), genesis_block_id(), /* consensus_data_hash */ ), diff --git a/execution/executor/src/fuzzing.rs b/execution/executor/src/fuzzing.rs index 3b656889dc844..0430a7706d442 100644 --- a/execution/executor/src/fuzzing.rs +++ b/execution/executor/src/fuzzing.rs @@ -72,11 +72,13 @@ impl TransactionBlockExecutor for FakeVM { transactions: ExecutableTransactions, state_view: CachedStateView, onchain_config: BlockExecutorConfigFromOnchain, + append_state_checkpoint_to_block: Option, ) -> Result { DoGetExecutionOutput::by_transaction_execution::( transactions, state_view, onchain_config, + append_state_checkpoint_to_block, ) } } diff --git a/execution/executor/src/tests/mock_vm/mod.rs b/execution/executor/src/tests/mock_vm/mod.rs index b307a6029910c..4bfc0a525c99d 100644 --- a/execution/executor/src/tests/mock_vm/mod.rs +++ b/execution/executor/src/tests/mock_vm/mod.rs @@ -10,7 +10,7 @@ use crate::{ workflow::do_get_execution_output::DoGetExecutionOutput, }; use anyhow::Result; -use aptos_crypto::{ed25519::Ed25519PrivateKey, PrivateKey, Uniform}; +use aptos_crypto::{ed25519::Ed25519PrivateKey, HashValue, PrivateKey, Uniform}; use aptos_executor_types::execution_output::ExecutionOutput; use aptos_storage_interface::cached_state_view::CachedStateView; use aptos_types::{ @@ -70,11 +70,13 @@ impl TransactionBlockExecutor for MockVM { transactions: ExecutableTransactions, state_view: CachedStateView, onchain_config: BlockExecutorConfigFromOnchain, + append_state_checkpoint_to_block: Option, ) -> Result { DoGetExecutionOutput::by_transaction_execution::( transactions, state_view, onchain_config, + append_state_checkpoint_to_block, ) } } diff --git a/execution/executor/src/tests/mod.rs b/execution/executor/src/tests/mod.rs index 4a6118a488bc9..e57d7e67582ae 100644 --- a/execution/executor/src/tests/mod.rs +++ b/execution/executor/src/tests/mod.rs @@ -61,7 +61,7 @@ fn execute_and_commit_block( ) .unwrap(); let version = 2 * (txn_index + 1); - assert_eq!(output.version(), version); + assert_eq!(output.expect_last_version(), version); let ledger_info = gen_ledger_info(version, output.root_hash(), id, txn_index + 1); executor.commit_blocks(vec![id], ledger_info).unwrap(); @@ -218,7 +218,7 @@ fn test_executor_one_block() { ) .unwrap(); let version = num_user_txns + 1; - assert_eq!(output.version(), version); + assert_eq!(output.expect_last_version(), version); let block_root_hash = output.root_hash(); let ledger_info = gen_ledger_info(version, block_root_hash, block_id, 1); @@ -387,7 +387,7 @@ fn create_blocks_and_chunks( TEST_BLOCK_EXECUTOR_ONCHAIN_CONFIG, ) .unwrap(); - assert_eq!(output.version(), version); + assert_eq!(output.expect_last_version(), version); block_executor.pre_commit_block(block_id).unwrap(); let ledger_info = gen_ledger_info(version, output.root_hash(), block_id, version); out_blocks.push((txns, ledger_info)); @@ -491,7 +491,7 @@ fn apply_transaction_by_writeset( let chunk_output = DoGetExecutionOutput::by_transaction_output(txns, txn_outs, state_view).unwrap(); - let output = ApplyExecutionOutput::run(chunk_output, &ledger_view, None).unwrap(); + let output = ApplyExecutionOutput::run(chunk_output, &ledger_view).unwrap(); db.writer .save_transactions( @@ -687,9 +687,10 @@ fn run_transactions_naive( ) .unwrap(), block_executor_onchain_config.clone(), + None, ) .unwrap(); - let output = ApplyExecutionOutput::run(out, &ledger_view, None).unwrap(); + let output = ApplyExecutionOutput::run(out, &ledger_view).unwrap(); db.writer .save_transactions( output.expect_complete_result().as_chunk_to_commit(), diff --git a/execution/executor/src/types/in_memory_state_calculator_v2.rs b/execution/executor/src/types/in_memory_state_calculator_v2.rs index fa39929928bf0..775e615804596 100644 --- a/execution/executor/src/types/in_memory_state_calculator_v2.rs +++ b/execution/executor/src/types/in_memory_state_calculator_v2.rs @@ -2,9 +2,12 @@ // SPDX-License-Identifier: Apache-2.0 use crate::metrics::OTHER_TIMERS; -use anyhow::{anyhow, ensure, Result}; +use anyhow::{ensure, Result}; use aptos_crypto::{hash::CryptoHash, HashValue}; -use aptos_executor_types::{parsed_transaction_output::TransactionsWithParsedOutput, ProofReader}; +use aptos_executor_types::{ + execution_output::ExecutionOutput, parsed_transaction_output::TransactionsWithParsedOutput, + state_checkpoint_output::StateCheckpointOutput, ProofReader, +}; use aptos_logger::info; use aptos_metrics_core::TimerHelper; use aptos_scratchpad::FrozenSparseMerkleTree; @@ -13,12 +16,9 @@ use aptos_storage_interface::{ state_delta::StateDelta, }; use aptos_types::{ - epoch_state::EpochState, - on_chain_config::{ConfigurationResource, OnChainConfig, ValidatorSet}, state_store::{ create_empty_sharded_state_updates, state_key::StateKey, state_storage_usage::StateStorageUsage, state_value::StateValue, ShardedStateUpdates, - TStateView, }, transaction::Version, write_set::{TransactionWrite, WriteSet}, @@ -27,122 +27,66 @@ use arr_macro::arr; use dashmap::DashMap; use itertools::zip_eq; use rayon::prelude::*; -use std::{collections::HashMap, ops::Deref}; - -struct StateCacheView<'a> { - base: &'a ShardedStateCache, - updates: &'a ShardedStateUpdates, -} - -impl<'a> StateCacheView<'a> { - pub fn new(base: &'a ShardedStateCache, updates: &'a ShardedStateUpdates) -> Self { - Self { base, updates } - } -} - -impl<'a> TStateView for StateCacheView<'a> { - type Key = StateKey; - - fn get_state_value( - &self, - state_key: &Self::Key, - ) -> aptos_types::state_store::Result> { - if let Some(v_opt) = self.updates[state_key.get_shard_id() as usize].get(state_key) { - return Ok(v_opt.clone()); - } - if let Some(entry) = self - .base - .shard(state_key.get_shard_id()) - .get(state_key) - .as_ref() - { - let state_value = entry.value().1.as_ref(); - return Ok(state_value.cloned()); - } - Ok(None) - } - - fn get_usage(&self) -> aptos_types::state_store::Result { - unreachable!("not supposed to be used.") - } -} +use std::{collections::HashMap, ops::Deref, sync::Arc}; /// Helper class for calculating state changes after a block of transactions are executed. pub struct InMemoryStateCalculatorV2 {} impl InMemoryStateCalculatorV2 { pub fn calculate_for_transactions( - base: &StateDelta, - state_cache: StateCache, - to_commit: &TransactionsWithParsedOutput, - new_epoch: bool, - is_block: bool, - ) -> Result<( - Vec, - Vec>, - StateDelta, - Option, - Option, - ShardedStateCache, - )> { - if is_block { - Self::validate_input_for_block(base, to_commit)?; + execution_output: &ExecutionOutput, + parent_state: &Arc, + known_state_checkpoints: Option>>, + ) -> Result { + if execution_output.is_block { + Self::validate_input_for_block(parent_state, &execution_output.to_commit)?; } - let state_updates_vec = - Self::get_sharded_state_updates(to_commit.parsed_outputs(), |txn_output| { - txn_output.write_set() - }); + let state_updates_vec = Self::get_sharded_state_updates( + execution_output.to_commit.parsed_outputs(), + |txn_output| txn_output.write_set(), + ); // If there are multiple checkpoints in the chunk, we only calculate the SMT (and its root // hash) for the last one. - let last_checkpoint_index = to_commit.get_last_checkpoint_index(); + let last_checkpoint_index = execution_output.to_commit.get_last_checkpoint_index(); Self::calculate_impl( - base, - state_cache, + parent_state, + &execution_output.state_cache, state_updates_vec, last_checkpoint_index, - new_epoch, - is_block, + execution_output.is_block, + known_state_checkpoints, ) } pub fn calculate_for_write_sets_after_snapshot( - base: &StateDelta, - state_cache: StateCache, + parent_state: &Arc, + state_cache: &StateCache, last_checkpoint_index: Option, write_sets: &[WriteSet], - ) -> Result<(Option, StateDelta)> { + ) -> Result { let state_updates_vec = Self::get_sharded_state_updates(write_sets, |write_set| write_set); - let (_, _, result_state, _, updates_until_latest_checkpoint, _) = Self::calculate_impl( - base, + Self::calculate_impl( + parent_state, state_cache, state_updates_vec, last_checkpoint_index, - /*new_epoch=*/ false, - /*is_block=*/ false, - )?; - - Ok((updates_until_latest_checkpoint, result_state)) + false, + Option::>::None, + ) } fn calculate_impl( - base: &StateDelta, - state_cache: StateCache, + parent_state: &Arc, + state_cache: &StateCache, state_updates_vec: Vec, last_checkpoint_index: Option, - new_epoch: bool, is_block: bool, - ) -> Result<( - Vec, - Vec>, - StateDelta, - Option, - Option, - ShardedStateCache, - )> { + known_state_checkpoints: Option>>, + ) -> Result { let StateCache { // This makes sure all in-mem nodes seen while proofs were fetched stays in mem during the // calculation @@ -150,7 +94,7 @@ impl InMemoryStateCalculatorV2 { sharded_state_cache, proofs, } = state_cache; - assert!(frozen_base.smt.is_the_same(&base.current)); + assert!(frozen_base.smt.is_the_same(&parent_state.current)); let (updates_before_last_checkpoint, updates_after_last_checkpoint) = if let Some(index) = last_checkpoint_index { @@ -167,30 +111,16 @@ impl InMemoryStateCalculatorV2 { let num_txns = state_updates_vec.len(); - let next_epoch_state = if new_epoch { - // Assumes chunk doesn't cross epoch boundary here. - ensure!( - last_checkpoint_index == Some(num_txns - 1), - "The last txn must be a reconfig for epoch change." - ); - Some(Self::get_epoch_state( - &sharded_state_cache, - &updates_before_last_checkpoint, - )?) - } else { - None - }; - - let usage = Self::calculate_usage(base.current.usage(), &sharded_state_cache, &[ + let usage = Self::calculate_usage(parent_state.current.usage(), sharded_state_cache, &[ &updates_before_last_checkpoint, &updates_after_last_checkpoint, ]); - let first_version = base.current_version.map_or(0, |v| v + 1); + let first_version = parent_state.current_version.map_or(0, |v| v + 1); let proof_reader = ProofReader::new(proofs); let latest_checkpoint = if let Some(index) = last_checkpoint_index { Self::make_checkpoint( - base.current.freeze(&frozen_base.base_smt), + parent_state.current.freeze(&frozen_base.base_smt), &updates_before_last_checkpoint, if index == num_txns - 1 { usage @@ -202,13 +132,25 @@ impl InMemoryStateCalculatorV2 { } else { // If there is no checkpoint in this chunk, the latest checkpoint will be the existing // one. - base.base.freeze(&frozen_base.base_smt) + parent_state.base.freeze(&frozen_base.base_smt) }; - let mut latest_checkpoint_version = base.base_version; - let mut state_checkpoint_hashes = vec![None; num_txns]; + let mut latest_checkpoint_version = parent_state.base_version; + let mut state_checkpoint_hashes = known_state_checkpoints + .map_or_else(|| vec![None; num_txns], |v| v.into_iter().collect()); + ensure!( + state_checkpoint_hashes.len() == num_txns, + "Bad number of known hashes." + ); if let Some(index) = last_checkpoint_index { - state_checkpoint_hashes[index] = Some(latest_checkpoint.root_hash()); + if let Some(h) = state_checkpoint_hashes[index] { + ensure!( + h == latest_checkpoint.root_hash(), + "Last checkpoint not expected." + ); + } else { + state_checkpoint_hashes[index] = Some(latest_checkpoint.root_hash()); + } latest_checkpoint_version = Some(first_version + index as u64); } @@ -223,7 +165,7 @@ impl InMemoryStateCalculatorV2 { let latest_tree = if last_checkpoint_index.is_some() { latest_checkpoint.clone() } else { - base.current.freeze(&frozen_base.base_smt) + parent_state.current.freeze(&frozen_base.base_smt) }; Self::make_checkpoint( latest_tree, @@ -238,7 +180,7 @@ impl InMemoryStateCalculatorV2 { updates_after_last_checkpoint } else { let mut updates_since_latest_checkpoint = - base.updates_since_base.deref().deref().clone(); + parent_state.updates_since_base.deref().deref().clone(); zip_eq( updates_since_latest_checkpoint.iter_mut(), updates_after_last_checkpoint, @@ -263,15 +205,12 @@ impl InMemoryStateCalculatorV2 { updates_since_latest_checkpoint, ); - let updates_until_latest_checkpoint = - last_checkpoint_index.map(|_| updates_before_last_checkpoint); - Ok(( + Ok(StateCheckpointOutput::new( + parent_state.clone(), + Arc::new(result_state), + last_checkpoint_index.map(|_| updates_before_last_checkpoint), state_updates_vec, state_checkpoint_hashes, - result_state, - next_epoch_state, - updates_until_latest_checkpoint, - sharded_state_cache, )) } @@ -413,22 +352,6 @@ impl InMemoryStateCalculatorV2 { Ok(new_checkpoint) } - fn get_epoch_state( - base: &ShardedStateCache, - updates: &ShardedStateUpdates, - ) -> Result { - let state_cache_view = StateCacheView::new(base, updates); - let validator_set = ValidatorSet::fetch_config(&state_cache_view) - .ok_or_else(|| anyhow!("ValidatorSet not touched on epoch change"))?; - let configuration = ConfigurationResource::fetch_config(&state_cache_view) - .ok_or_else(|| anyhow!("Configuration resource not touched on epoch change"))?; - - Ok(EpochState::new( - configuration.epoch(), - (&validator_set).into(), - )) - } - fn validate_input_for_block( base: &StateDelta, to_commit: &TransactionsWithParsedOutput, diff --git a/execution/executor/src/types/partial_state_compute_result.rs b/execution/executor/src/types/partial_state_compute_result.rs index f2e87565daab9..a3167de3f65c8 100644 --- a/execution/executor/src/types/partial_state_compute_result.rs +++ b/execution/executor/src/types/partial_state_compute_result.rs @@ -3,66 +3,74 @@ #![forbid(unsafe_code)] -use anyhow::Result; -use aptos_executor_types::{state_compute_result::StateComputeResult, LedgerUpdateOutput}; -use aptos_storage_interface::{ - async_proof_fetcher::AsyncProofFetcher, cached_state_view::CachedStateView, - state_delta::StateDelta, DbReader, -}; -use aptos_types::{ - epoch_state::EpochState, proof::accumulator::InMemoryTransactionAccumulator, - state_store::StateViewId, +use aptos_executor_types::{ + execution_output::ExecutionOutput, state_checkpoint_output::StateCheckpointOutput, + state_compute_result::StateComputeResult, LedgerUpdateOutput, }; +use aptos_storage_interface::state_delta::StateDelta; +use aptos_types::proof::accumulator::InMemoryTransactionAccumulator; use once_cell::sync::OnceCell; use std::sync::Arc; -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct PartialStateComputeResult { - pub parent_state: Arc, - pub result_state: Arc, - /// If set, this is the new epoch info that should be changed to if this is committed. - pub next_epoch_state: Option, + pub execution_output: ExecutionOutput, + pub state_checkpoint_output: OnceCell, pub ledger_update_output: OnceCell, } impl PartialStateComputeResult { - pub fn new( - parent_state: Arc, - result_state: Arc, - next_epoch_state: Option, - ) -> Self { + pub fn new(execution_output: ExecutionOutput) -> Self { Self { - parent_state, - result_state, - next_epoch_state, + execution_output, + state_checkpoint_output: OnceCell::new(), ledger_update_output: OnceCell::new(), } } - pub fn new_empty_completed( + pub fn new_empty( state: Arc, txn_accumulator: Arc, - next_epoch_state: Option, ) -> Self { + let execution_output = ExecutionOutput::new_empty(state.clone()); let ledger_update_output = OnceCell::new(); ledger_update_output .set(LedgerUpdateOutput::new_empty(txn_accumulator)) .expect("First set."); + let state_checkpoint_output = OnceCell::new(); + state_checkpoint_output + .set(StateCheckpointOutput::new_empty(state)) + .expect("First set."); Self { - parent_state: state.clone(), - result_state: state, - next_epoch_state, + execution_output, + state_checkpoint_output, ledger_update_output, } } - pub fn epoch_state(&self) -> &Option { - &self.next_epoch_state + pub fn has_reconfiguration(&self) -> bool { + self.execution_output.next_epoch_state.is_some() } - pub fn has_reconfiguration(&self) -> bool { - self.next_epoch_state.is_some() + pub fn get_state_checkpoint_output(&self) -> Option<&StateCheckpointOutput> { + self.state_checkpoint_output.get() + } + + pub fn expect_state_checkpoint_output(&self) -> &StateCheckpointOutput { + self.state_checkpoint_output + .get() + .expect("StateCheckpointOutput not set") + } + + pub fn expect_result_state(&self) -> &Arc { + &self.expect_state_checkpoint_output().result_state + } + + pub fn set_state_checkpoint_output(&self, state_checkpoint_output: StateCheckpointOutput) { + self.state_checkpoint_output + .set(state_checkpoint_output) + .expect("StateCheckpointOutput already set"); } pub fn get_ledger_update_output(&self) -> Option<&LedgerUpdateOutput> { @@ -81,25 +89,12 @@ impl PartialStateComputeResult { .expect("LedgerUpdateOutput already set"); } - pub fn next_version(&self) -> u64 { - self.state().current_version.unwrap() + 1 - } - - pub fn is_same_state(&self, rhs: &Self) -> bool { - self.state().has_same_current_state(rhs.state()) - } - - pub fn state(&self) -> &Arc { - &self.result_state - } - pub fn get_complete_result(&self) -> Option { self.ledger_update_output.get().map(|ledger_update_output| { StateComputeResult::new( - self.parent_state.clone(), - self.result_state.clone(), + self.execution_output.clone(), + self.expect_state_checkpoint_output().clone(), ledger_update_output.clone(), - self.next_epoch_state.clone(), ) }) } @@ -107,19 +102,4 @@ impl PartialStateComputeResult { pub fn expect_complete_result(&self) -> StateComputeResult { self.get_complete_result().expect("Result is not complete.") } - - pub fn verified_state_view( - &self, - id: StateViewId, - reader: Arc, - proof_fetcher: Arc, - ) -> Result { - Ok(CachedStateView::new( - id, - reader, - self.next_version(), - self.result_state.current.clone(), - proof_fetcher, - )?) - } } diff --git a/execution/executor/src/workflow/do_get_execution_output.rs b/execution/executor/src/workflow/do_get_execution_output.rs index 4245cf1e9252f..305f347947fec 100644 --- a/execution/executor/src/workflow/do_get_execution_output.rs +++ b/execution/executor/src/workflow/do_get_execution_output.rs @@ -1,31 +1,47 @@ // Copyright (c) Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::metrics; -use anyhow::Result; +use crate::{ + metrics, + metrics::{EXECUTOR_ERRORS, OTHER_TIMERS}, +}; +use anyhow::{anyhow, Result}; +use aptos_crypto::HashValue; use aptos_executor_service::{ local_executor_helper::SHARDED_BLOCK_EXECUTOR, remote_executor_client::{get_remote_addresses, REMOTE_SHARDED_BLOCK_EXECUTOR}, }; -use aptos_executor_types::execution_output::ExecutionOutput; +use aptos_executor_types::{ + execution_output::ExecutionOutput, parsed_transaction_output::TransactionsWithParsedOutput, + should_forward_to_subscription_service, ParsedTransactionOutput, +}; use aptos_logger::prelude::*; -use aptos_storage_interface::cached_state_view::CachedStateView; +use aptos_metrics_core::TimerHelper; +use aptos_storage_interface::cached_state_view::{CachedStateView, StateCache}; use aptos_types::{ block_executor::{ config::BlockExecutorConfigFromOnchain, partitioner::{ExecutableTransactions, PartitionedTransactions}, }, contract_event::ContractEvent, + epoch_state::EpochState, + on_chain_config::{ConfigurationResource, OnChainConfig, ValidatorSet}, + state_store::{ + state_key::StateKey, state_storage_usage::StateStorageUsage, state_value::StateValue, + TStateView, + }, transaction::{ authenticator::AccountAuthenticator, signature_verified_transaction::{SignatureVerifiedTransaction, TransactionProvider}, - BlockOutput, ExecutionStatus, Transaction, TransactionOutput, TransactionOutputProvider, - TransactionStatus, + BlockEndInfo, BlockOutput, ExecutionStatus, Transaction, TransactionOutput, + TransactionOutputProvider, TransactionStatus, Version, }, + write_set::{TransactionWrite, WriteSet}, }; use aptos_vm::{AptosVM, VMExecutor}; +use itertools::Itertools; use move_core_types::{language_storage::CORE_CODE_ADDRESS, vm_status::StatusCode}; -use std::{sync::Arc, time::Duration}; +use std::{iter, sync::Arc, time::Duration}; pub struct DoGetExecutionOutput; @@ -34,14 +50,23 @@ impl DoGetExecutionOutput { transactions: ExecutableTransactions, state_view: CachedStateView, onchain_config: BlockExecutorConfigFromOnchain, + append_state_checkpoint_to_block: Option, ) -> Result { match transactions { ExecutableTransactions::Unsharded(txns) => { - Self::by_transaction_execution_unsharded::(txns, state_view, onchain_config) - }, - ExecutableTransactions::Sharded(txns) => { - Self::by_transaction_execution_sharded::(txns, state_view, onchain_config) + Self::by_transaction_execution_unsharded::( + txns, + state_view, + onchain_config, + append_state_checkpoint_to_block, + ) }, + ExecutableTransactions::Sharded(txns) => Self::by_transaction_execution_sharded::( + txns, + state_view, + onchain_config, + append_state_checkpoint_to_block, + ), } } @@ -49,22 +74,26 @@ impl DoGetExecutionOutput { transactions: Vec, state_view: CachedStateView, onchain_config: BlockExecutorConfigFromOnchain, + append_state_checkpoint_to_block: Option, ) -> Result { let block_output = Self::execute_block::(&transactions, &state_view, onchain_config)?; - let (transaction_outputs, block_end_info) = block_output.into_inner(); - Ok(ExecutionOutput { - transactions: transactions.into_iter().map(|t| t.into_inner()).collect(), + + Parser::parse( + state_view.next_version(), + transactions.into_iter().map(|t| t.into_inner()).collect(), transaction_outputs, - state_cache: state_view.into_state_cache(), + state_view.into_state_cache(), block_end_info, - }) + append_state_checkpoint_to_block, + ) } pub fn by_transaction_execution_sharded( transactions: PartitionedTransactions, state_view: CachedStateView, onchain_config: BlockExecutorConfigFromOnchain, + append_state_checkpoint_to_block: Option, ) -> Result { let state_view_arc = Arc::new(state_view); let transaction_outputs = Self::execute_block_sharded::( @@ -78,15 +107,17 @@ impl DoGetExecutionOutput { // Unwrapping here is safe because the execution has finished and it is guaranteed that // the state view is not used anymore. let state_view = Arc::try_unwrap(state_view_arc).unwrap(); - Ok(ExecutionOutput { - transactions: PartitionedTransactions::flatten(transactions) + Parser::parse( + state_view.next_version(), + PartitionedTransactions::flatten(transactions) .into_iter() .map(|t| t.into_txn().into_inner()) .collect(), transaction_outputs, - state_cache: state_view.into_state_cache(), - block_end_info: None, - }) + state_view.into_state_cache(), + None, // block end info + append_state_checkpoint_to_block, + ) } pub fn by_transaction_output( @@ -105,12 +136,14 @@ impl DoGetExecutionOutput { // prime the state cache by fetching all touched accounts state_view.prime_cache_by_write_set(write_set)?; - Ok(ExecutionOutput { + Parser::parse( + state_view.next_version(), transactions, transaction_outputs, - state_cache: state_view.into_state_cache(), - block_end_info: None, - }) + state_view.into_state_cache(), + None, // block end info + None, // append state checkpoint to block + ) } fn execute_block_sharded( @@ -428,3 +461,276 @@ pub fn update_counters_for_processed_chunk( } } } + +struct Parser; + +impl Parser { + fn parse( + first_version: Version, + mut transactions: Vec, + transaction_outputs: Vec, + state_cache: StateCache, + block_end_info: Option, + append_state_checkpoint_to_block: Option, + ) -> Result { + let _timer = OTHER_TIMERS.timer_with(&["parse_raw_output"]); + let is_block = append_state_checkpoint_to_block.is_some(); + + // Parse all outputs. + let mut transaction_outputs: Vec = + { transaction_outputs.into_iter().map(Into::into).collect() }; + + // Isolate retries. + let (to_retry, has_reconfig) = + Self::extract_retries(&mut transactions, &mut transaction_outputs); + + // Collect all statuses. + let statuses_for_input_txns = { + let keeps_and_discards = transaction_outputs.iter().map(|t| t.status()).cloned(); + // Forcibly overwriting statuses for retries, since VM can output otherwise. + let retries = iter::repeat(TransactionStatus::Retry).take(to_retry.len()); + keeps_and_discards.chain(retries).collect() + }; + + // Isolate discards. + let to_discard = Self::extract_discards(&mut transactions, &mut transaction_outputs); + + // The rest is to be committed, attach block epilogue as needed and optionally get next EpochState. + let to_commit = TransactionsWithParsedOutput::new(transactions, transaction_outputs); + let to_commit = Self::maybe_add_block_epilogue( + to_commit, + has_reconfig, + block_end_info.as_ref(), + append_state_checkpoint_to_block, + ); + let next_epoch_state = has_reconfig + .then(|| Self::ensure_next_epoch_state(&to_commit)) + .transpose()?; + let subscribable_events = to_commit + .parsed_outputs() + .iter() + .flat_map(|o| { + o.events() + .iter() + .filter(|e| should_forward_to_subscription_service(e)) + }) + .cloned() + .collect_vec(); + + { + let _timer = OTHER_TIMERS.timer_with(&["update_counters__by_execution"]); + for x in [&to_commit, &to_discard, &to_retry] { + update_counters_for_processed_chunk(x.txns(), x.parsed_outputs(), "execution"); + } + } + + Ok(ExecutionOutput::new( + is_block, + first_version, + statuses_for_input_txns, + to_commit, + to_discard, + to_retry, + state_cache, + block_end_info, + next_epoch_state, + subscribable_events, + )) + } + + fn extract_retries( + transactions: &mut Vec, + transaction_outputs: &mut Vec, + ) -> (TransactionsWithParsedOutput, bool) { + // N.B. off-by-1 intentionally, for exclusive index + let new_epoch_marker = transaction_outputs + .iter() + .position(|o| o.is_reconfig()) + .map(|idx| idx + 1); + + let block_gas_limit_marker = transaction_outputs + .iter() + .position(|o| matches!(o.status(), TransactionStatus::Retry)); + + // Transactions after the epoch ending txn are all to be retried. + // Transactions after the txn that exceeded per-block gas limit are also to be retried. + if let Some(pos) = new_epoch_marker { + ( + TransactionsWithParsedOutput::new( + transactions.drain(pos..).collect(), + transaction_outputs.drain(pos..).collect(), + ), + true, + ) + } else if let Some(pos) = block_gas_limit_marker { + ( + TransactionsWithParsedOutput::new( + transactions.drain(pos..).collect(), + transaction_outputs.drain(pos..).collect(), + ), + false, + ) + } else { + (TransactionsWithParsedOutput::new_empty(), false) + } + } + + fn extract_discards( + transactions: &mut Vec, + transaction_outputs: &mut Vec, + ) -> TransactionsWithParsedOutput { + let to_discard = { + let mut res = TransactionsWithParsedOutput::new_empty(); + for idx in 0..transactions.len() { + if transaction_outputs[idx].status().is_discarded() { + res.push(transactions[idx].clone(), transaction_outputs[idx].clone()); + } else if !res.is_empty() { + transactions[idx - res.len()] = transactions[idx].clone(); + transaction_outputs[idx - res.len()] = transaction_outputs[idx].clone(); + } + } + if !res.is_empty() { + let remaining = transactions.len() - res.len(); + transactions.truncate(remaining); + transaction_outputs.truncate(remaining); + } + res + }; + + // Sanity check transactions with the Discard status: + to_discard.iter().for_each(|(t, o)| { + // In case a new status other than Retry, Keep and Discard is added: + if !matches!(o.status(), TransactionStatus::Discard(_)) { + error!("Status other than Retry, Keep or Discard; Transaction discarded."); + } + // VM shouldn't have output anything for discarded transactions, log if it did. + if !o.write_set().is_empty() || !o.events().is_empty() { + error!( + "Discarded transaction has non-empty write set or events. \ + Transaction: {:?}. Status: {:?}.", + t, + o.status(), + ); + EXECUTOR_ERRORS.inc(); + } + }); + + to_discard + } + + fn maybe_add_block_epilogue( + mut to_commit: TransactionsWithParsedOutput, + is_reconfig: bool, + block_end_info: Option<&BlockEndInfo>, + append_state_checkpoint_to_block: Option, + ) -> TransactionsWithParsedOutput { + if !is_reconfig { + // Append the StateCheckpoint transaction to the end + if let Some(block_id) = append_state_checkpoint_to_block { + let state_checkpoint_txn = match block_end_info { + None => Transaction::StateCheckpoint(block_id), + Some(block_end_info) => { + Transaction::block_epilogue(block_id, block_end_info.clone()) + }, + }; + + to_commit.push( + state_checkpoint_txn, + ParsedTransactionOutput::from(TransactionOutput::new_empty_success()), + ); + } + }; // else: not adding block epilogue at epoch ending. + + to_commit + } + + fn ensure_next_epoch_state(to_commit: &TransactionsWithParsedOutput) -> Result { + let last_write_set = to_commit + .parsed_outputs() + .last() + .ok_or_else(|| anyhow!("to_commit is empty."))? + .write_set(); + + let write_set_view = WriteSetStateView { + write_set: last_write_set, + }; + + let validator_set = ValidatorSet::fetch_config(&write_set_view) + .ok_or_else(|| anyhow!("ValidatorSet not touched on epoch change"))?; + let configuration = ConfigurationResource::fetch_config(&write_set_view) + .ok_or_else(|| anyhow!("Configuration resource not touched on epoch change"))?; + + Ok(EpochState::new( + configuration.epoch(), + (&validator_set).into(), + )) + } +} + +struct WriteSetStateView<'a> { + write_set: &'a WriteSet, +} + +impl<'a> TStateView for WriteSetStateView<'a> { + type Key = StateKey; + + fn get_state_value( + &self, + state_key: &Self::Key, + ) -> aptos_types::state_store::Result> { + Ok(self + .write_set + .get(state_key) + .and_then(|write_op| write_op.as_state_value())) + } + + fn get_usage(&self) -> aptos_types::state_store::Result { + unreachable!("Not supposed to be called on WriteSetStateView.") + } +} +#[cfg(test)] +mod tests { + use super::Parser; + use aptos_storage_interface::cached_state_view::StateCache; + use aptos_types::{ + contract_event::ContractEvent, + transaction::{ + ExecutionStatus, Transaction, TransactionAuxiliaryData, TransactionOutput, + TransactionStatus, + }, + write_set::WriteSet, + }; + + #[test] + fn should_filter_subscribable_events() { + let event_0 = + ContractEvent::new_v2_with_type_tag_str("0x1::dkg::DKGStartEvent", b"dkg_1".to_vec()); + let event_1 = ContractEvent::new_v2_with_type_tag_str( + "0x2345::random_module::RandomEvent", + b"random_x".to_vec(), + ); + let event_2 = + ContractEvent::new_v2_with_type_tag_str("0x1::dkg::DKGStartEvent", b"dkg_2".to_vec()); + + let txns = vec![Transaction::dummy(), Transaction::dummy()]; + let txn_outs = vec![ + TransactionOutput::new( + WriteSet::default(), + vec![event_0.clone()], + 0, + TransactionStatus::Keep(ExecutionStatus::Success), + TransactionAuxiliaryData::default(), + ), + TransactionOutput::new( + WriteSet::default(), + vec![event_1.clone(), event_2.clone()], + 0, + TransactionStatus::Keep(ExecutionStatus::Success), + TransactionAuxiliaryData::default(), + ), + ]; + let execution_output = + Parser::parse(0, txns, txn_outs, StateCache::new_dummy(), None, None).unwrap(); + assert_eq!(vec![event_0, event_2], execution_output.subscribable_events); + } +} diff --git a/execution/executor/src/workflow/do_ledger_update.rs b/execution/executor/src/workflow/do_ledger_update.rs index 0a9e6988a5503..9a198584029c5 100644 --- a/execution/executor/src/workflow/do_ledger_update.rs +++ b/execution/executor/src/workflow/do_ledger_update.rs @@ -1,22 +1,18 @@ // Copyright (c) Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::{ - metrics::OTHER_TIMERS, workflow::do_get_execution_output::update_counters_for_processed_chunk, -}; +use crate::metrics::OTHER_TIMERS; use anyhow::Result; use aptos_crypto::{hash::CryptoHash, HashValue}; use aptos_executor_types::{ - parsed_transaction_output::TransactionsWithParsedOutput, - should_forward_to_subscription_service, state_checkpoint_output::StateCheckpointOutput, - LedgerUpdateOutput, ParsedTransactionOutput, + execution_output::ExecutionOutput, parsed_transaction_output::TransactionsWithParsedOutput, + state_checkpoint_output::StateCheckpointOutput, LedgerUpdateOutput, ParsedTransactionOutput, }; use aptos_experimental_runtimes::thread_manager::optimal_min_len; use aptos_metrics_core::TimerHelper; use aptos_types::{ - contract_event::ContractEvent, proof::accumulator::{InMemoryEventAccumulator, InMemoryTransactionAccumulator}, - transaction::{Transaction, TransactionInfo}, + transaction::TransactionInfo, }; use itertools::{izip, Itertools}; use rayon::prelude::*; @@ -26,61 +22,35 @@ pub struct DoLedgerUpdate; impl DoLedgerUpdate { pub fn run( - state_checkpoint_output: StateCheckpointOutput, - base_txn_accumulator: Arc, - ) -> Result<(LedgerUpdateOutput, Vec, Vec)> { - let ( - txns, - state_updates_vec, - state_checkpoint_hashes, - state_updates_before_last_checkpoint, - sharded_state_cache, - block_end_info, - ) = state_checkpoint_output.into_inner(); - - let (statuses_for_input_txns, to_commit, to_discard, to_retry) = txns.into_inner(); - for group in [&to_commit, &to_discard, &to_retry] { - update_counters_for_processed_chunk(group.txns(), group.parsed_outputs(), "execution"); - } - - // these are guaranteed by caller side logic - assert_eq!(to_commit.len(), state_updates_vec.len()); - assert_eq!(to_commit.len(), state_checkpoint_hashes.len()); - - let (event_hashes, write_set_hashes) = - Self::calculate_events_and_writeset_hashes(to_commit.parsed_outputs()); - - let (transaction_infos, subscribible_events) = Self::assemble_transaction_infos( - &to_commit, - state_checkpoint_hashes, + execution_output: &ExecutionOutput, + state_checkpoint_output: &StateCheckpointOutput, + parent_accumulator: Arc, + ) -> Result { + let _timer = OTHER_TIMERS.timer_with(&["assemble_ledger_diff_for_block"]); + + // Calculate hashes + let to_commit = &execution_output.to_commit; + let txn_outs = to_commit.parsed_outputs(); + + let (event_hashes, writeset_hashes) = Self::calculate_events_and_writeset_hashes(txn_outs); + + // Assemble `TransactionInfo`s + let transaction_infos = Self::assemble_transaction_infos( + &execution_output.to_commit, + state_checkpoint_output.state_checkpoint_hashes.clone(), event_hashes, - write_set_hashes, + writeset_hashes, ); - let transaction_info_hashes = transaction_infos.iter().map(CryptoHash::hash).collect_vec(); - let transaction_accumulator = - Arc::new(base_txn_accumulator.append(&transaction_info_hashes)); - let (transactions, transaction_outputs) = to_commit.into_inner(); + // Calculate root hash + let transaction_info_hashes = transaction_infos.iter().map(CryptoHash::hash).collect_vec(); + let transaction_accumulator = Arc::new(parent_accumulator.append(&transaction_info_hashes)); - let ledger_update_output = LedgerUpdateOutput::new( - statuses_for_input_txns, - transactions, - transaction_outputs, + Ok(LedgerUpdateOutput::new( transaction_infos, - state_updates_vec, - subscribible_events, transaction_info_hashes, - state_updates_before_last_checkpoint, - sharded_state_cache, transaction_accumulator, - base_txn_accumulator, - block_end_info, - ); - - Ok(( - ledger_update_output, - to_discard.into_txns(), - to_retry.into_txns(), + parent_accumulator, )) } @@ -113,95 +83,27 @@ impl DoLedgerUpdate { state_checkpoint_hashes: Vec>, event_hashes: Vec, writeset_hashes: Vec, - ) -> (Vec, Vec) { + ) -> Vec { let _timer = OTHER_TIMERS.timer_with(&["process_events_and_writeset_hashes"]); - let mut txn_infos = Vec::with_capacity(to_commit.len()); - let mut subscribable_events = Vec::new(); izip!( to_commit.iter(), state_checkpoint_hashes, event_hashes, writeset_hashes ) - .for_each( + .map( |((txn, txn_out), state_checkpoint_hash, event_root_hash, write_set_hash)| { - subscribable_events.extend( - txn_out - .events() - .iter() - .filter(|evt| should_forward_to_subscription_service(evt)) - .cloned(), - ); - txn_infos.push(TransactionInfo::new( + TransactionInfo::new( txn.hash(), write_set_hash, event_root_hash, state_checkpoint_hash, txn_out.gas_used(), txn_out.status().as_kept_status().expect("Already sorted."), - )); + ) }, - ); - - (txn_infos, subscribable_events) - } -} - -#[cfg(test)] -mod tests { - use super::DoLedgerUpdate; - use aptos_crypto::hash::HashValue; - use aptos_executor_types::parsed_transaction_output::{ - ParsedTransactionOutput, TransactionsWithParsedOutput, - }; - use aptos_types::{ - contract_event::ContractEvent, - transaction::{ - ExecutionStatus, Transaction, TransactionAuxiliaryData, TransactionOutput, - TransactionStatus, - }, - write_set::WriteSet, - }; - - #[test] - fn assemble_ledger_diff_should_filter_subscribable_events() { - let event_0 = - ContractEvent::new_v2_with_type_tag_str("0x1::dkg::DKGStartEvent", b"dkg_1".to_vec()); - let event_1 = ContractEvent::new_v2_with_type_tag_str( - "0x2345::random_module::RandomEvent", - b"random_x".to_vec(), - ); - let event_2 = - ContractEvent::new_v2_with_type_tag_str("0x1::dkg::DKGStartEvent", b"dkg_2".to_vec()); - let txns_n_outputs = TransactionsWithParsedOutput::new( - vec![Transaction::dummy(), Transaction::dummy()], - vec![ - ParsedTransactionOutput::from(TransactionOutput::new( - WriteSet::default(), - vec![event_0.clone()], - 0, - TransactionStatus::Keep(ExecutionStatus::Success), - TransactionAuxiliaryData::default(), - )), - ParsedTransactionOutput::from(TransactionOutput::new( - WriteSet::default(), - vec![event_1.clone(), event_2.clone()], - 0, - TransactionStatus::Keep(ExecutionStatus::Success), - TransactionAuxiliaryData::default(), - )), - ], - ); - let state_checkpoint_hashes = vec![Some(HashValue::zero()); 2]; - let event_hashes = vec![HashValue::zero(); 2]; - let write_set_hashes = vec![HashValue::zero(); 2]; - let (_transaction_infos, subscribable_events) = DoLedgerUpdate::assemble_transaction_infos( - &txns_n_outputs, - state_checkpoint_hashes, - event_hashes, - write_set_hashes, - ); - assert_eq!(vec![event_0, event_2], subscribable_events); + ) + .collect() } } diff --git a/execution/executor/src/workflow/do_state_checkpoint.rs b/execution/executor/src/workflow/do_state_checkpoint.rs index b0f8550e5c05b..a971ca2130a3a 100644 --- a/execution/executor/src/workflow/do_state_checkpoint.rs +++ b/execution/executor/src/workflow/do_state_checkpoint.rs @@ -1,221 +1,28 @@ // Copyright (c) Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::{ - metrics::{EXECUTOR_ERRORS, OTHER_TIMERS}, - types::in_memory_state_calculator_v2::InMemoryStateCalculatorV2, -}; +use crate::types::in_memory_state_calculator_v2::InMemoryStateCalculatorV2; +use anyhow::Result; use aptos_crypto::HashValue; use aptos_executor_types::{ - execution_output::ExecutionOutput, - parsed_transaction_output::TransactionsWithParsedOutput, - state_checkpoint_output::{StateCheckpointOutput, TransactionsByStatus}, - ParsedTransactionOutput, + execution_output::ExecutionOutput, state_checkpoint_output::StateCheckpointOutput, }; -use aptos_logger::error; -use aptos_metrics_core::TimerHelper; use aptos_storage_interface::state_delta::StateDelta; -use aptos_types::{ - epoch_state::EpochState, - transaction::{ - BlockEndInfo, BlockEpiloguePayload, ExecutionStatus, Transaction, TransactionAuxiliaryData, - TransactionOutput, TransactionStatus, - }, - write_set::WriteSet, -}; -use std::{iter::repeat, sync::Arc}; +use std::sync::Arc; pub struct DoStateCheckpoint; impl DoStateCheckpoint { pub fn run( - chunk_output: ExecutionOutput, - parent_state: &StateDelta, - append_state_checkpoint_to_block: Option, - known_state_checkpoints: Option>>, - is_block: bool, - ) -> anyhow::Result<(Arc, Option, StateCheckpointOutput)> { - let ExecutionOutput { - state_cache, - transactions, - transaction_outputs, - block_end_info, - } = chunk_output; - let (new_epoch, statuses_for_input_txns, to_commit, to_discard, to_retry) = { - let _timer = OTHER_TIMERS.timer_with(&["sort_transactions"]); - - // Separate transactions with different VM statuses, i.e., Keep, Discard and Retry. - // Will return transactions with Retry txns sorted after Keep/Discard txns. - Self::sort_transactions_with_state_checkpoint( - transactions, - transaction_outputs, - append_state_checkpoint_to_block, - block_end_info.clone(), - )? - }; - + execution_output: &ExecutionOutput, + parent_state: &Arc, + known_state_checkpoints: Option>>, + ) -> Result { // Apply the write set, get the latest state. - let ( - state_updates_vec, - state_checkpoint_hashes, - result_state, - next_epoch_state, - state_updates_before_last_checkpoint, - sharded_state_cache, - ) = { - let _timer = OTHER_TIMERS - .with_label_values(&["calculate_for_transactions"]) - .start_timer(); - InMemoryStateCalculatorV2::calculate_for_transactions( - parent_state, - state_cache, - &to_commit, - new_epoch, - is_block, - )? - }; - - let mut state_checkpoint_output = StateCheckpointOutput::new( - TransactionsByStatus::new(statuses_for_input_txns, to_commit, to_discard, to_retry), - state_updates_vec, - state_checkpoint_hashes, - state_updates_before_last_checkpoint, - sharded_state_cache, - block_end_info, - ); - - // On state sync/replay, we generate state checkpoints only periodically, for the - // last state checkpoint of each chunk. - // A mismatch in the SMT will be detected at that occasion too. Here we just copy - // in the state root from the TxnInfo in the proof. - if let Some(state_checkpoint_hashes) = known_state_checkpoints { - state_checkpoint_output - .check_and_update_state_checkpoint_hashes(state_checkpoint_hashes)?; - } - - Ok(( - Arc::new(result_state), - next_epoch_state, - state_checkpoint_output, - )) - } - - fn sort_transactions_with_state_checkpoint( - mut transactions: Vec, - transaction_outputs: Vec, - append_state_checkpoint_to_block: Option, - block_end_info: Option, - ) -> anyhow::Result<( - bool, - Vec, - TransactionsWithParsedOutput, - TransactionsWithParsedOutput, - TransactionsWithParsedOutput, - )> { - let mut transaction_outputs: Vec = - transaction_outputs.into_iter().map(Into::into).collect(); - // N.B. off-by-1 intentionally, for exclusive index - let new_epoch_marker = transaction_outputs - .iter() - .position(|o| o.is_reconfig()) - .map(|idx| idx + 1); - - let block_gas_limit_marker = transaction_outputs - .iter() - .position(|o| matches!(o.status(), TransactionStatus::Retry)); - - // Transactions after the epoch ending txn are all to be retried. - // Transactions after the txn that exceeded per-block gas limit are also to be retried. - let to_retry = if let Some(pos) = new_epoch_marker { - TransactionsWithParsedOutput::new( - transactions.drain(pos..).collect(), - transaction_outputs.drain(pos..).collect(), - ) - } else if let Some(pos) = block_gas_limit_marker { - TransactionsWithParsedOutput::new( - transactions.drain(pos..).collect(), - transaction_outputs.drain(pos..).collect(), - ) - } else { - TransactionsWithParsedOutput::new_empty() - }; - - let state_checkpoint_to_add = - new_epoch_marker.map_or_else(|| append_state_checkpoint_to_block, |_| None); - - let keeps_and_discards = transaction_outputs.iter().map(|t| t.status()).cloned(); - let retries = repeat(TransactionStatus::Retry).take(to_retry.len()); - - let status = keeps_and_discards.chain(retries).collect(); - - let to_discard = { - let mut res = TransactionsWithParsedOutput::new_empty(); - for idx in 0..transactions.len() { - if transaction_outputs[idx].status().is_discarded() { - res.push(transactions[idx].clone(), transaction_outputs[idx].clone()); - } else if !res.is_empty() { - transactions[idx - res.len()] = transactions[idx].clone(); - transaction_outputs[idx - res.len()] = transaction_outputs[idx].clone(); - } - } - if !res.is_empty() { - let remaining = transactions.len() - res.len(); - transactions.truncate(remaining); - transaction_outputs.truncate(remaining); - } - res - }; - let to_keep = { - let mut res = TransactionsWithParsedOutput::new(transactions, transaction_outputs); - - // Append the StateCheckpoint transaction to the end of to_keep - if let Some(block_id) = state_checkpoint_to_add { - let state_checkpoint_txn = block_end_info.map_or( - Transaction::StateCheckpoint(block_id), - |block_end_info| { - Transaction::BlockEpilogue(BlockEpiloguePayload::V0 { - block_id, - block_end_info, - }) - }, - ); - let state_checkpoint_txn_output: ParsedTransactionOutput = - Into::into(TransactionOutput::new( - WriteSet::default(), - Vec::new(), - 0, - TransactionStatus::Keep(ExecutionStatus::Success), - TransactionAuxiliaryData::default(), - )); - res.push(state_checkpoint_txn, state_checkpoint_txn_output); - } - res - }; - - // Sanity check transactions with the Discard status: - to_discard.iter().for_each(|(t, o)| { - // In case a new status other than Retry, Keep and Discard is added: - if !matches!(o.status(), TransactionStatus::Discard(_)) { - error!("Status other than Retry, Keep or Discard; Transaction discarded."); - } - // VM shouldn't have output anything for discarded transactions, log if it did. - if !o.write_set().is_empty() || !o.events().is_empty() { - error!( - "Discarded transaction has non-empty write set or events. \ - Transaction: {:?}. Status: {:?}.", - t, - o.status(), - ); - EXECUTOR_ERRORS.inc(); - } - }); - - Ok(( - new_epoch_marker.is_some(), - status, - to_keep, - to_discard, - to_retry, - )) + InMemoryStateCalculatorV2::calculate_for_transactions( + execution_output, + parent_state, + known_state_checkpoints, + ) } } diff --git a/execution/executor/src/workflow/mod.rs b/execution/executor/src/workflow/mod.rs index a9eb9ec5f7d66..deef85de89d53 100644 --- a/execution/executor/src/workflow/mod.rs +++ b/execution/executor/src/workflow/mod.rs @@ -6,7 +6,6 @@ use crate::types::partial_state_compute_result::PartialStateComputeResult; use anyhow::Result; -use aptos_crypto::HashValue; use aptos_executor_types::execution_output::ExecutionOutput; use aptos_storage_interface::ExecutedTrees; use do_ledger_update::DoLedgerUpdate; @@ -20,24 +19,21 @@ pub struct ApplyExecutionOutput; impl ApplyExecutionOutput { pub fn run( - chunk_output: ExecutionOutput, + execution_output: ExecutionOutput, base_view: &ExecutedTrees, - known_state_checkpoint_hashes: Option>>, ) -> Result { - let (result_state, next_epoch_state, state_checkpoint_output) = DoStateCheckpoint::run( - chunk_output, + let state_checkpoint_output = DoStateCheckpoint::run( + &execution_output, base_view.state(), - None, // append_state_checkpoint_to_block - known_state_checkpoint_hashes, - /*is_block=*/ false, + Option::>::None, // known_state_checkpoint_hashes )?; - let (ledger_update_output, _to_discard, _to_retry) = - DoLedgerUpdate::run(state_checkpoint_output, base_view.txn_accumulator().clone())?; - let output = PartialStateComputeResult::new( - base_view.state().clone(), - result_state, - next_epoch_state, - ); + let ledger_update_output = DoLedgerUpdate::run( + &execution_output, + &state_checkpoint_output, + base_view.txn_accumulator().clone(), + )?; + let output = PartialStateComputeResult::new(execution_output); + output.set_state_checkpoint_output(state_checkpoint_output); output.set_ledger_update_output(ledger_update_output); Ok(output) diff --git a/execution/executor/tests/db_bootstrapper_test.rs b/execution/executor/tests/db_bootstrapper_test.rs index ed3de49f3b499..dd6dc01e537ff 100644 --- a/execution/executor/tests/db_bootstrapper_test.rs +++ b/execution/executor/tests/db_bootstrapper_test.rs @@ -90,7 +90,7 @@ fn execute_and_commit(txns: Vec, db: &DbReaderWriter, signer: &Vali TEST_BLOCK_EXECUTOR_ONCHAIN_CONFIG, ) .unwrap(); - assert_eq!(output.num_leaves(), target_version + 1); + assert_eq!(output.next_version(), target_version + 1); let ledger_info_with_sigs = gen_ledger_info_with_sigs(epoch, &output, block_id, &[signer.clone()]); executor diff --git a/experimental/execution/ptx-executor/Cargo.toml b/experimental/execution/ptx-executor/Cargo.toml index 5e4b035116772..121f8f6541f62 100644 --- a/experimental/execution/ptx-executor/Cargo.toml +++ b/experimental/execution/ptx-executor/Cargo.toml @@ -14,6 +14,7 @@ rust-version = { workspace = true } [dependencies] anyhow = { workspace = true } +aptos-crypto = { workspace = true } aptos-executor = { workspace = true } aptos-executor-types = { workspace = true } aptos-experimental-runtimes = { workspace = true } diff --git a/experimental/execution/ptx-executor/src/lib.rs b/experimental/execution/ptx-executor/src/lib.rs index c1f48b7d5634a..84172a4f86f08 100644 --- a/experimental/execution/ptx-executor/src/lib.rs +++ b/experimental/execution/ptx-executor/src/lib.rs @@ -21,6 +21,7 @@ use crate::{ analyzer::PtxAnalyzer, finalizer::PtxFinalizer, metrics::TIMER, runner::PtxRunner, scheduler::PtxScheduler, sorter::PtxSorter, state_reader::PtxStateReader, }; +use aptos_crypto::HashValue; use aptos_executor::{ block_executor::TransactionBlockExecutor, workflow::do_get_execution_output::DoGetExecutionOutput, @@ -121,11 +122,13 @@ impl TransactionBlockExecutor for PtxBlockExecutor { transactions: ExecutableTransactions, state_view: CachedStateView, onchain_config: BlockExecutorConfigFromOnchain, + append_state_checkpoint_to_block: Option, ) -> anyhow::Result { DoGetExecutionOutput::by_transaction_execution::( transactions, state_view, onchain_config, + append_state_checkpoint_to_block, ) } } diff --git a/storage/aptosdb/src/db/include/aptosdb_testonly.rs b/storage/aptosdb/src/db/include/aptosdb_testonly.rs index e6d4edc08ccea..ed08607fbd2dc 100644 --- a/storage/aptosdb/src/db/include/aptosdb_testonly.rs +++ b/storage/aptosdb/src/db/include/aptosdb_testonly.rs @@ -4,9 +4,11 @@ use crate::state_store::buffered_state::BufferedState; use aptos_config::config::{ BUFFERED_STATE_TARGET_ITEMS_FOR_TEST, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD}; use aptos_infallible::Mutex; -use aptos_types::state_store::create_empty_sharded_state_updates; +use aptos_types::state_store::{create_empty_sharded_state_updates, ShardedStateUpdates}; use std::default::Default; -use aptos_types::transaction::TransactionStatus; +use aptos_storage_interface::cached_state_view::ShardedStateCache; +use aptos_storage_interface::state_delta::StateDelta; +use aptos_types::transaction::{TransactionStatus, TransactionToCommit}; impl AptosDB { /// This opens db in non-readonly mode, without the pruner. @@ -171,7 +173,7 @@ impl ChunkToCommitOwned { ChunkToCommit { first_version: self.first_version, transactions: &self.transactions, - transaction_outputs: &self.transaction_outputs, + transaction_outputs: self.transaction_outputs.clone(), transaction_infos: &self.transaction_infos, base_state_version: self.base_state_version, latest_in_memory_state: &self.latest_in_memory_state, diff --git a/storage/aptosdb/src/db/include/aptosdb_writer.rs b/storage/aptosdb/src/db/include/aptosdb_writer.rs index 56b8fb7373f82..3c9e687fa0ed7 100644 --- a/storage/aptosdb/src/db/include/aptosdb_writer.rs +++ b/storage/aptosdb/src/db/include/aptosdb_writer.rs @@ -280,7 +280,7 @@ impl AptosDB { // // TODO(grao): Consider propagating the error instead of panic, if necessary. s.spawn(|_| { - self.commit_events(chunk.first_version, chunk.transaction_outputs, skip_index_and_usage) + self.commit_events(chunk.first_version, &chunk.transaction_outputs, skip_index_and_usage) .unwrap() }); s.spawn(|_| { @@ -661,7 +661,7 @@ impl AptosDB { // n.b. txns_to_commit can be partial, when the control was handed over from consensus to state sync // where state sync won't send the pre-committed part to the DB again. if chunk_opt.is_some() && chunk_opt.as_ref().unwrap().len() == num_txns as usize { - let write_sets = chunk_opt.unwrap().transaction_outputs.iter().map(|t| t.write_set()).collect_vec(); + let write_sets = chunk_opt.as_ref().unwrap().transaction_outputs.iter().map(|t| t.write_set()).collect_vec(); indexer.index(self.state_store.clone(), first_version, &write_sets)?; } else { let write_sets: Vec<_> = self.ledger_db.write_set_db().get_write_set_iter(first_version, num_txns as usize)?.try_collect()?; diff --git a/storage/aptosdb/src/db/mod.rs b/storage/aptosdb/src/db/mod.rs index 2a11b754f5c8f..ba80d0c04f3e9 100644 --- a/storage/aptosdb/src/db/mod.rs +++ b/storage/aptosdb/src/db/mod.rs @@ -40,9 +40,8 @@ use aptos_resource_viewer::AptosValueAnnotator; use aptos_schemadb::SchemaBatch; use aptos_scratchpad::SparseMerkleTree; use aptos_storage_interface::{ - cached_state_view::ShardedStateCache, db_ensure as ensure, db_other_bail as bail, - state_delta::StateDelta, AptosDbError, DbReader, DbWriter, ExecutedTrees, Order, Result, - StateSnapshotReceiver, MAX_REQUEST_LIMIT, + db_ensure as ensure, db_other_bail as bail, AptosDbError, DbReader, DbWriter, ExecutedTrees, + Order, Result, StateSnapshotReceiver, MAX_REQUEST_LIMIT, }; use aptos_types::{ account_address::AccountAddress, @@ -63,12 +62,11 @@ use aptos_types::{ state_storage_usage::StateStorageUsage, state_value::{StateValue, StateValueChunkWithProof}, table::{TableHandle, TableInfo}, - ShardedStateUpdates, }, transaction::{ AccountTransactionsWithProof, Transaction, TransactionAuxiliaryData, TransactionInfo, TransactionListWithProof, TransactionOutput, TransactionOutputListWithProof, - TransactionToCommit, TransactionWithProof, Version, + TransactionWithProof, Version, }, write_set::WriteSet, }; diff --git a/storage/aptosdb/src/state_store/mod.rs b/storage/aptosdb/src/state_store/mod.rs index 657ff90e7d265..3419582601439 100644 --- a/storage/aptosdb/src/state_store/mod.rs +++ b/storage/aptosdb/src/state_store/mod.rs @@ -543,6 +543,7 @@ impl StateStore { .freeze(&buffered_state.current_state().base); let latest_snapshot_state_view = CachedStateView::new_impl( StateViewId::Miscellaneous, + num_transactions, snapshot, speculative_state, Arc::new(AsyncProofFetcher::new(state_db.clone())), @@ -565,18 +566,21 @@ impl StateStore { .map(|(idx, _)| idx); latest_snapshot_state_view.prime_cache_by_write_set(&write_sets)?; - let (updates_until_last_checkpoint, state_after_last_checkpoint) = + let state_checkpoint_output = InMemoryStateCalculatorV2::calculate_for_write_sets_after_snapshot( - buffered_state.current_state(), - latest_snapshot_state_view.into_state_cache(), + // TODO(aldenhu): avoid cloning the HashMap inside. + &Arc::new(buffered_state.current_state().clone()), + &latest_snapshot_state_view.into_state_cache(), last_checkpoint_index, &write_sets, )?; // synchronously commit the snapshot at the last checkpoint here if not committed to disk yet. buffered_state.update( - updates_until_last_checkpoint.as_ref(), - &state_after_last_checkpoint, + state_checkpoint_output + .state_updates_before_last_checkpoint + .as_ref(), + &state_checkpoint_output.result_state, true, /* sync_commit */ )?; } diff --git a/storage/storage-interface/src/cached_state_view.rs b/storage/storage-interface/src/cached_state_view.rs index 9bde8daf493a3..cf9571cf10c44 100644 --- a/storage/storage-interface/src/cached_state_view.rs +++ b/storage/storage-interface/src/cached_state_view.rs @@ -86,6 +86,8 @@ pub struct CachedStateView { /// For logging and debugging purpose, identifies what this view is for. id: StateViewId, + next_version: Version, + /// A readable snapshot in the persistent storage. snapshot: Option<(Version, HashValue)>, @@ -165,6 +167,7 @@ impl CachedStateView { Ok(Self::new_impl( id, + next_version, snapshot, speculative_state, proof_fetcher, @@ -173,12 +176,14 @@ impl CachedStateView { pub fn new_impl( id: StateViewId, + next_version: Version, snapshot: Option<(Version, HashValue)>, speculative_state: FrozenSparseMerkleTree, proof_fetcher: Arc, ) -> Self { Self { id, + next_version, snapshot, speculative_state, sharded_state_cache: ShardedStateCache::default(), @@ -263,14 +268,34 @@ impl CachedStateView { Some((version, value)) => (Some(version), Some(value)), }) } + + pub fn next_version(&self) -> Version { + self.next_version + } } +#[derive(Debug)] pub struct StateCache { pub frozen_base: FrozenSparseMerkleTree, pub sharded_state_cache: ShardedStateCache, pub proofs: HashMap, } +impl StateCache { + pub fn new_empty(smt: SparseMerkleTree) -> Self { + let frozen_base = smt.freeze(&smt); + Self { + frozen_base, + sharded_state_cache: ShardedStateCache::default(), + proofs: HashMap::new(), + } + } + + pub fn new_dummy() -> Self { + Self::new_empty(SparseMerkleTree::new_empty()) + } +} + impl TStateView for CachedStateView { type Key = StateKey; diff --git a/storage/storage-interface/src/chunk_to_commit.rs b/storage/storage-interface/src/chunk_to_commit.rs index 67c4e778741e4..603844115605f 100644 --- a/storage/storage-interface/src/chunk_to_commit.rs +++ b/storage/storage-interface/src/chunk_to_commit.rs @@ -7,11 +7,12 @@ use aptos_types::{ transaction::{Transaction, TransactionInfo, TransactionOutput, Version}, }; -#[derive(Copy, Clone)] +#[derive(Clone)] pub struct ChunkToCommit<'a> { pub first_version: Version, pub transactions: &'a [Transaction], - pub transaction_outputs: &'a [TransactionOutput], + // TODO(aldenhu): make it a ref + pub transaction_outputs: Vec, pub transaction_infos: &'a [TransactionInfo], pub base_state_version: Option, pub latest_in_memory_state: &'a StateDelta, diff --git a/storage/storage-interface/src/lib.rs b/storage/storage-interface/src/lib.rs index e99b1e017f0dd..d553fbed04080 100644 --- a/storage/storage-interface/src/lib.rs +++ b/storage/storage-interface/src/lib.rs @@ -551,7 +551,7 @@ pub trait DbWriter: Send + Sync { } if !chunk.is_empty() { - self.pre_commit_ledger(chunk, sync_commit)?; + self.pre_commit_ledger(chunk.clone(), sync_commit)?; } let version_to_commit = if let Some(ledger_info_with_sigs) = ledger_info_with_sigs { ledger_info_with_sigs.ledger_info().version() diff --git a/types/src/transaction/mod.rs b/types/src/transaction/mod.rs index a76817793e90c..e67fe6a31344d 100644 --- a/types/src/transaction/mod.rs +++ b/types/src/transaction/mod.rs @@ -1202,6 +1202,16 @@ impl TransactionOutput { } } + pub fn new_empty_success() -> Self { + Self { + write_set: WriteSet::default(), + events: vec![], + gas_used: 0, + status: TransactionStatus::Keep(ExecutionStatus::Success), + auxiliary_data: TransactionAuxiliaryData::None, + } + } + pub fn into(self) -> (WriteSet, Vec) { (self.write_set, self.events) } @@ -2000,6 +2010,13 @@ impl From for Transaction { } impl Transaction { + pub fn block_epilogue(block_id: HashValue, block_end_info: BlockEndInfo) -> Self { + Self::BlockEpilogue(BlockEpiloguePayload::V0 { + block_id, + block_end_info, + }) + } + pub fn try_as_signed_user_txn(&self) -> Option<&SignedTransaction> { match self { Transaction::UserTransaction(txn) => Some(txn),