From d906106815117851bf5e2e0fc5d4b501768d79d2 Mon Sep 17 00:00:00 2001 From: aldenhu Date: Wed, 23 Oct 2024 03:40:34 +0000 Subject: [PATCH] Remove ParsedTransactionOutput --- Cargo.lock | 1 - execution/executor-types/Cargo.toml | 1 - .../executor-types/src/execution_output.rs | 60 +++--- execution/executor-types/src/lib.rs | 3 +- .../src/parsed_transaction_output.rs | 188 ------------------ .../src/state_checkpoint_output.rs | 20 +- .../src/state_compute_result.rs | 2 +- .../src/transactions_with_output.rs | 94 +++++++++ execution/executor/src/chunk_executor/mod.rs | 9 +- .../types/in_memory_state_calculator_v2.rs | 14 +- .../src/workflow/do_get_execution_output.rs | 110 ++++++---- .../executor/src/workflow/do_ledger_update.rs | 19 +- .../src/db/include/aptosdb_testonly.rs | 2 +- .../aptosdb/src/db/include/aptosdb_writer.rs | 2 +- .../storage-interface/src/chunk_to_commit.rs | 2 +- types/src/contract_event.rs | 5 +- types/src/transaction/mod.rs | 10 +- 17 files changed, 233 insertions(+), 309 deletions(-) delete mode 100644 execution/executor-types/src/parsed_transaction_output.rs create mode 100644 execution/executor-types/src/transactions_with_output.rs diff --git a/Cargo.lock b/Cargo.lock index e107e0c3914076..b16484a67ece90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1557,7 +1557,6 @@ dependencies = [ "criterion", "derive_more", "itertools 0.13.0", - "once_cell", "serde", "thiserror", ] diff --git a/execution/executor-types/Cargo.toml b/execution/executor-types/Cargo.toml index b27f6f09efbdcb..9a449959bd01a9 100644 --- a/execution/executor-types/Cargo.toml +++ b/execution/executor-types/Cargo.toml @@ -24,7 +24,6 @@ bcs = { workspace = true } criterion = { workspace = true } derive_more = { workspace = true } itertools = { workspace = true } -once_cell = { workspace = true } serde = { workspace = true } thiserror = { workspace = true } diff --git a/execution/executor-types/src/execution_output.rs b/execution/executor-types/src/execution_output.rs index eeded76a4aed24..9da0e20aaeb11f 100644 --- a/execution/executor-types/src/execution_output.rs +++ b/execution/executor-types/src/execution_output.rs @@ -4,15 +4,14 @@ #![forbid(unsafe_code)] -use crate::{parsed_transaction_output::TransactionsWithParsedOutput, ParsedTransactionOutput}; +use crate::transactions_with_output::TransactionsWithOutput; use aptos_drop_helper::DropHelper; use aptos_storage_interface::{cached_state_view::StateCache, state_delta::StateDelta}; use aptos_types::{ contract_event::ContractEvent, epoch_state::EpochState, transaction::{ - block_epilogue::BlockEndInfo, ExecutionStatus, Transaction, TransactionOutput, - TransactionStatus, Version, + block_epilogue::BlockEndInfo, ExecutionStatus, Transaction, TransactionStatus, Version, }, }; use derive_more::Deref; @@ -29,9 +28,9 @@ impl ExecutionOutput { is_block: bool, first_version: Version, statuses_for_input_txns: Vec, - to_commit: TransactionsWithParsedOutput, - to_discard: TransactionsWithParsedOutput, - to_retry: TransactionsWithParsedOutput, + to_commit: TransactionsWithOutput, + to_discard: TransactionsWithOutput, + to_retry: TransactionsWithOutput, state_cache: StateCache, block_end_info: Option, next_epoch_state: Option, @@ -68,9 +67,9 @@ impl ExecutionOutput { is_block: false, first_version: state.next_version(), statuses_for_input_txns: vec![], - to_commit: TransactionsWithParsedOutput::new_empty(), - to_discard: TransactionsWithParsedOutput::new_empty(), - to_retry: TransactionsWithParsedOutput::new_empty(), + to_commit: TransactionsWithOutput::new_empty(), + to_discard: TransactionsWithOutput::new_empty(), + to_retry: TransactionsWithOutput::new_empty(), state_cache: StateCache::new_empty(state.current.clone()), block_end_info: None, next_epoch_state: None, @@ -81,14 +80,13 @@ impl ExecutionOutput { pub fn new_dummy_with_input_txns(txns: Vec) -> Self { let num_txns = txns.len(); let success_status = TransactionStatus::Keep(ExecutionStatus::Success); - let success_output = ParsedTransactionOutput::from(TransactionOutput::new_empty_success()); Self::new_impl(Inner { is_block: false, first_version: 0, statuses_for_input_txns: vec![success_status; num_txns], - to_commit: TransactionsWithParsedOutput::new(txns, vec![success_output; num_txns]), - to_discard: TransactionsWithParsedOutput::new_empty(), - to_retry: TransactionsWithParsedOutput::new_empty(), + to_commit: TransactionsWithOutput::new_dummy_success(txns), + to_discard: TransactionsWithOutput::new_empty(), + to_retry: TransactionsWithOutput::new_empty(), state_cache: StateCache::new_dummy(), block_end_info: None, next_epoch_state: None, @@ -105,9 +103,9 @@ impl ExecutionOutput { is_block: false, first_version: self.next_version(), statuses_for_input_txns: vec![], - to_commit: TransactionsWithParsedOutput::new_empty(), - to_discard: TransactionsWithParsedOutput::new_empty(), - to_retry: TransactionsWithParsedOutput::new_empty(), + to_commit: TransactionsWithOutput::new_empty(), + to_discard: TransactionsWithOutput::new_empty(), + to_retry: TransactionsWithOutput::new_empty(), state_cache: StateCache::new_dummy(), block_end_info: None, next_epoch_state: self.next_epoch_state.clone(), @@ -143,9 +141,9 @@ pub struct Inner { // but doesn't contain StateCheckpoint/BlockEpilogue, as those get added during execution pub statuses_for_input_txns: Vec, // List of all transactions to be committed, including StateCheckpoint/BlockEpilogue if needed. - pub to_commit: TransactionsWithParsedOutput, - pub to_discard: TransactionsWithParsedOutput, - pub to_retry: TransactionsWithParsedOutput, + pub to_commit: TransactionsWithOutput, + pub to_discard: TransactionsWithOutput, + pub to_retry: TransactionsWithOutput, /// Carries the frozen base state view, so all in-mem nodes involved won't drop before the /// execution result is processed; as well as all the accounts touched during execution, together @@ -170,29 +168,31 @@ impl Inner { let aborts = self .to_commit .iter() - .flat_map(|(txn, output)| match output.status().status() { - Ok(execution_status) => { - if execution_status.is_success() { - None - } else { - Some(format!("{:?}: {:?}", txn, output.status())) - } + .flat_map( + |(txn, output, _is_reconfig)| match output.status().status() { + Ok(execution_status) => { + if execution_status.is_success() { + None + } else { + Some(format!("{:?}: {:?}", txn, output.status())) + } + }, + Err(_) => None, }, - Err(_) => None, - }) + ) .collect::>(); let discards_3 = self .to_discard .iter() .take(3) - .map(|(txn, output)| format!("{:?}: {:?}", txn, output.status())) + .map(|(txn, output, _is_reconfig)| format!("{:?}: {:?}", txn, output.status())) .collect::>(); let retries_3 = self .to_retry .iter() .take(3) - .map(|(txn, output)| format!("{:?}: {:?}", txn, output.status())) + .map(|(txn, output, _is_reconfig)| format!("{:?}: {:?}", txn, output.status())) .collect::>(); if !aborts.is_empty() || !discards_3.is_empty() || !retries_3.is_empty() { diff --git a/execution/executor-types/src/lib.rs b/execution/executor-types/src/lib.rs index e56cd4e0acefec..ea9918f5a9f416 100644 --- a/execution/executor-types/src/lib.rs +++ b/execution/executor-types/src/lib.rs @@ -23,7 +23,6 @@ use aptos_types::{ }; pub use error::{ExecutorError, ExecutorResult}; pub use ledger_update_output::LedgerUpdateOutput; -pub use parsed_transaction_output::ParsedTransactionOutput; use state_compute_result::StateComputeResult; use std::{ collections::{BTreeSet, HashMap}, @@ -37,9 +36,9 @@ use std::{ mod error; pub mod execution_output; mod ledger_update_output; -pub mod parsed_transaction_output; pub mod state_checkpoint_output; pub mod state_compute_result; +pub mod transactions_with_output; pub trait ChunkExecutorTrait: Send + Sync { /// Verifies the transactions based on the provided proofs and ledger info. If the transactions diff --git a/execution/executor-types/src/parsed_transaction_output.rs b/execution/executor-types/src/parsed_transaction_output.rs deleted file mode 100644 index b5e329f9578219..00000000000000 --- a/execution/executor-types/src/parsed_transaction_output.rs +++ /dev/null @@ -1,188 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use aptos_types::{ - contract_event::ContractEvent, - event::EventKey, - on_chain_config, - transaction::{ - Transaction, TransactionAuxiliaryData, TransactionOutput, TransactionOutputProvider, - TransactionStatus, - }, - write_set::WriteSet, -}; -use itertools::zip_eq; -use once_cell::sync::Lazy; -use std::ops::Deref; - -pub static NEW_EPOCH_EVENT_KEY: Lazy = Lazy::new(on_chain_config::new_epoch_event_key); - -#[derive(Clone, Debug)] -pub struct ParsedTransactionOutput { - output: TransactionOutput, - reconfig_events: Vec, -} - -impl ParsedTransactionOutput { - pub fn parse_reconfig_events(events: &[ContractEvent]) -> impl Iterator { - events - .iter() - .filter(|e| e.event_key().cloned() == Some(*NEW_EPOCH_EVENT_KEY)) - } -} - -impl TransactionOutputProvider for ParsedTransactionOutput { - fn get_transaction_output(&self) -> &TransactionOutput { - &self.output - } -} - -impl From for ParsedTransactionOutput { - fn from(output: TransactionOutput) -> Self { - let reconfig_events = Self::parse_reconfig_events(output.events()) - .cloned() - .collect(); - Self { - output, - reconfig_events, - } - } -} - -impl Deref for ParsedTransactionOutput { - type Target = TransactionOutput; - - fn deref(&self) -> &Self::Target { - &self.output - } -} - -impl ParsedTransactionOutput { - pub fn is_reconfig(&self) -> bool { - !self.reconfig_events.is_empty() - } - - pub fn unpack( - self, - ) -> ( - WriteSet, - Vec, - Vec, - u64, - TransactionStatus, - TransactionAuxiliaryData, - ) { - let Self { - output, - reconfig_events, - } = self; - let (write_set, events, gas_used, status, auxiliary_data) = output.unpack(); - - ( - write_set, - events, - reconfig_events, - gas_used, - status, - auxiliary_data, - ) - } -} - -#[derive(Debug, Default)] -pub struct TransactionsWithParsedOutput { - pub transactions: Vec, - pub parsed_output: Vec, -} - -impl TransactionsWithParsedOutput { - pub fn new(transaction: Vec, parsed_output: Vec) -> Self { - assert_eq!( - transaction.len(), - parsed_output.len(), - "transaction.len(): {}, parsed_output.len(): {}", - transaction.len(), - parsed_output.len() - ); - Self { - transactions: transaction, - parsed_output, - } - } - - pub fn new_empty() -> Self { - Self::default() - } - - pub fn push(&mut self, transaction: Transaction, parsed_output: ParsedTransactionOutput) { - self.transactions.push(transaction); - self.parsed_output.push(parsed_output); - } - - pub fn len(&self) -> usize { - self.transactions.len() - } - - pub fn is_empty(&self) -> bool { - self.transactions.is_empty() - } - - pub fn txns(&self) -> &Vec { - &self.transactions - } - - pub fn make_transaction_outputs(&self) -> Vec { - self.parsed_output - .iter() - .map(|t| &t.output) - .cloned() - .collect() - } - - pub fn parsed_outputs(&self) -> &Vec { - &self.parsed_output - } - - pub fn get_last_checkpoint_index(&self) -> Option { - (0..self.len()) - .rev() - .find(|&i| Self::need_checkpoint(&self.transactions[i], &self.parsed_output[i])) - } - - pub fn need_checkpoint(txn: &Transaction, txn_output: &ParsedTransactionOutput) -> bool { - if txn_output.is_reconfig() { - return true; - } - match txn { - Transaction::BlockMetadata(_) - | Transaction::BlockMetadataExt(_) - | Transaction::UserTransaction(_) - | Transaction::ValidatorTransaction(_) => false, - Transaction::GenesisTransaction(_) - | Transaction::StateCheckpoint(_) - | Transaction::BlockEpilogue(_) => true, - } - } - - pub fn into_txns(self) -> Vec { - self.transactions - } - - pub fn into_inner(self) -> (Vec, Vec) { - ( - self.transactions, - self.parsed_output.into_iter().map(|t| t.output).collect(), - ) - } - - pub fn iter(&self) -> impl Iterator { - zip_eq(self.transactions.iter(), self.parsed_output.iter()) - } -} - -impl From> for TransactionsWithParsedOutput { - fn from(value: Vec<(Transaction, ParsedTransactionOutput)>) -> Self { - let (transaction, parsed_output) = value.into_iter().unzip(); - Self::new(transaction, parsed_output) - } -} diff --git a/execution/executor-types/src/state_checkpoint_output.rs b/execution/executor-types/src/state_checkpoint_output.rs index 5e3c32cc277492..c611d6c2c90a00 100644 --- a/execution/executor-types/src/state_checkpoint_output.rs +++ b/execution/executor-types/src/state_checkpoint_output.rs @@ -3,7 +3,7 @@ #![forbid(unsafe_code)] -use crate::parsed_transaction_output::TransactionsWithParsedOutput; +use crate::transactions_with_output::TransactionsWithOutput; use aptos_crypto::HashValue; use aptos_drop_helper::DropHelper; use aptos_storage_interface::state_delta::StateDelta; @@ -18,17 +18,17 @@ pub struct TransactionsByStatus { // but doesn't contain StateCheckpoint/BlockEpilogue, as those get added during execution statuses_for_input_txns: Vec, // List of all transactions to be committed, including StateCheckpoint/BlockEpilogue if needed. - to_commit: TransactionsWithParsedOutput, - to_discard: TransactionsWithParsedOutput, - to_retry: TransactionsWithParsedOutput, + to_commit: TransactionsWithOutput, + to_discard: TransactionsWithOutput, + to_retry: TransactionsWithOutput, } impl TransactionsByStatus { pub fn new( statuses_for_input_txns: Vec, - to_commit: TransactionsWithParsedOutput, - to_discard: TransactionsWithParsedOutput, - to_retry: TransactionsWithParsedOutput, + to_commit: TransactionsWithOutput, + to_discard: TransactionsWithOutput, + to_retry: TransactionsWithOutput, ) -> Self { Self { statuses_for_input_txns, @@ -46,9 +46,9 @@ impl TransactionsByStatus { self, ) -> ( Vec, - TransactionsWithParsedOutput, - TransactionsWithParsedOutput, - TransactionsWithParsedOutput, + TransactionsWithOutput, + TransactionsWithOutput, + TransactionsWithOutput, ) { ( self.statuses_for_input_txns, diff --git a/execution/executor-types/src/state_compute_result.rs b/execution/executor-types/src/state_compute_result.rs index 66517fd15f97f3..c47ceeb97c4205 100644 --- a/execution/executor-types/src/state_compute_result.rs +++ b/execution/executor-types/src/state_compute_result.rs @@ -155,7 +155,7 @@ impl StateComputeResult { ChunkToCommit { first_version: self.ledger_update_output.first_version(), transactions: self.execution_output.to_commit.txns(), - transaction_outputs: self.execution_output.to_commit.make_transaction_outputs(), + transaction_outputs: self.execution_output.to_commit.transaction_outputs(), transaction_infos: &self.ledger_update_output.transaction_infos, per_version_state_updates: &self.state_checkpoint_output.per_version_state_updates, base_state_version: self.state_checkpoint_output.parent_state.base_version, diff --git a/execution/executor-types/src/transactions_with_output.rs b/execution/executor-types/src/transactions_with_output.rs new file mode 100644 index 00000000000000..54204f6608d96d --- /dev/null +++ b/execution/executor-types/src/transactions_with_output.rs @@ -0,0 +1,94 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_types::transaction::{Transaction, TransactionOutput}; +use itertools::izip; + +#[derive(Debug, Default)] +pub struct TransactionsWithOutput { + pub transactions: Vec, + pub transaction_outputs: Vec, + pub epoch_ending_flags: Vec, +} + +impl TransactionsWithOutput { + pub fn new( + transactions: Vec, + transaction_outputs: Vec, + epoch_ending_flags: Vec, + ) -> Self { + assert_eq!(transactions.len(), transaction_outputs.len()); + assert_eq!(transactions.len(), epoch_ending_flags.len()); + Self { + transactions, + transaction_outputs, + epoch_ending_flags, + } + } + + pub fn new_empty() -> Self { + Self::default() + } + + pub fn new_dummy_success(txns: Vec) -> Self { + let txn_outputs = vec![TransactionOutput::new_empty_success(); txns.len()]; + let epoch_ending_flags = vec![false; txns.len()]; + Self::new(txns, txn_outputs, epoch_ending_flags) + } + + pub fn push( + &mut self, + transaction: Transaction, + transaction_output: TransactionOutput, + is_reconfig: bool, + ) { + self.transactions.push(transaction); + self.transaction_outputs.push(transaction_output); + self.epoch_ending_flags.push(is_reconfig); + } + + pub fn len(&self) -> usize { + self.transactions.len() + } + + pub fn is_empty(&self) -> bool { + self.transactions.is_empty() + } + + pub fn txns(&self) -> &Vec { + &self.transactions + } + + pub fn transaction_outputs(&self) -> &[TransactionOutput] { + &self.transaction_outputs + } + + pub fn get_last_checkpoint_index(&self) -> Option { + (0..self.len()) + .rev() + .find(|&i| Self::need_checkpoint(&self.transactions[i], self.epoch_ending_flags[i])) + } + + pub fn need_checkpoint(txn: &Transaction, is_reconfig: bool) -> bool { + if is_reconfig { + return true; + } + match txn { + Transaction::BlockMetadata(_) + | Transaction::BlockMetadataExt(_) + | Transaction::UserTransaction(_) + | Transaction::ValidatorTransaction(_) => false, + Transaction::GenesisTransaction(_) + | Transaction::StateCheckpoint(_) + | Transaction::BlockEpilogue(_) => true, + } + } + + pub fn iter(&self) -> impl Iterator { + izip!( + self.transactions.iter(), + self.transaction_outputs.iter(), + self.epoch_ending_flags.iter().cloned() + ) + } +} diff --git a/execution/executor/src/chunk_executor/mod.rs b/execution/executor/src/chunk_executor/mod.rs index 1a541c88247b3a..c489131b44c699 100644 --- a/execution/executor/src/chunk_executor/mod.rs +++ b/execution/executor/src/chunk_executor/mod.rs @@ -17,8 +17,7 @@ use crate::{ }; use anyhow::{anyhow, ensure, Result}; use aptos_executor_types::{ - ChunkCommitNotification, ChunkExecutorTrait, ParsedTransactionOutput, TransactionReplayer, - VerifyExecutionMode, + ChunkCommitNotification, ChunkExecutorTrait, TransactionReplayer, VerifyExecutionMode, }; use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER; use aptos_infallible::{Mutex, RwLock}; @@ -457,9 +456,7 @@ impl ChunkExecutorInner { let mut epochs = Vec::new(); let mut epoch_begin = chunk_begin; // epoch begin version for (version, events) in multizip((chunk_begin..chunk_end, event_vecs.iter())) { - let is_epoch_ending = ParsedTransactionOutput::parse_reconfig_events(events) - .next() - .is_some(); + let is_epoch_ending = events.iter().any(ContractEvent::is_new_epoch_event); if is_epoch_ending { epochs.push((epoch_begin, version + 1)); epoch_begin = version + 1; @@ -610,7 +607,7 @@ impl ChunkExecutorInner { // not `zip_eq`, deliberately for (version, txn_out, txn_info, write_set, events) in multizip(( begin_version..end_version, - execution_output.to_commit.parsed_outputs().iter(), + execution_output.to_commit.transaction_outputs(), transaction_infos.iter(), write_sets.iter(), event_vecs.iter(), diff --git a/execution/executor/src/types/in_memory_state_calculator_v2.rs b/execution/executor/src/types/in_memory_state_calculator_v2.rs index 775e6158045965..21987a51c39b3b 100644 --- a/execution/executor/src/types/in_memory_state_calculator_v2.rs +++ b/execution/executor/src/types/in_memory_state_calculator_v2.rs @@ -5,8 +5,8 @@ use crate::metrics::OTHER_TIMERS; use anyhow::{ensure, Result}; use aptos_crypto::{hash::CryptoHash, HashValue}; use aptos_executor_types::{ - execution_output::ExecutionOutput, parsed_transaction_output::TransactionsWithParsedOutput, - state_checkpoint_output::StateCheckpointOutput, ProofReader, + execution_output::ExecutionOutput, state_checkpoint_output::StateCheckpointOutput, + transactions_with_output::TransactionsWithOutput, ProofReader, }; use aptos_logger::info; use aptos_metrics_core::TimerHelper; @@ -43,7 +43,7 @@ impl InMemoryStateCalculatorV2 { } let state_updates_vec = Self::get_sharded_state_updates( - execution_output.to_commit.parsed_outputs(), + execution_output.to_commit.transaction_outputs(), |txn_output| txn_output.write_set(), ); @@ -354,7 +354,7 @@ impl InMemoryStateCalculatorV2 { fn validate_input_for_block( base: &StateDelta, - to_commit: &TransactionsWithParsedOutput, + to_commit: &TransactionsWithOutput, ) -> Result<()> { let num_txns = to_commit.len(); ensure!(num_txns != 0, "Empty block is not allowed."); @@ -369,12 +369,12 @@ impl InMemoryStateCalculatorV2 { "Base state is corrupted, updates_since_base is not empty at a checkpoint." ); - for (i, (txn, txn_output)) in to_commit.iter().enumerate() { + for (i, (txn, _txn_out, is_reconfig)) in to_commit.iter().enumerate() { ensure!( - TransactionsWithParsedOutput::need_checkpoint(txn, txn_output) ^ (i != num_txns - 1), + TransactionsWithOutput::need_checkpoint(txn, is_reconfig) ^ (i != num_txns - 1), "Checkpoint is allowed iff it's the last txn in the block. index: {i}, num_txns: {num_txns}, is_last: {}, txn: {txn:?}, is_reconfig: {}", i == num_txns - 1, - txn_output.is_reconfig() + is_reconfig, ); } Ok(()) diff --git a/execution/executor/src/workflow/do_get_execution_output.rs b/execution/executor/src/workflow/do_get_execution_output.rs index 305f347947fec3..f5626182ef7070 100644 --- a/execution/executor/src/workflow/do_get_execution_output.rs +++ b/execution/executor/src/workflow/do_get_execution_output.rs @@ -12,8 +12,8 @@ use aptos_executor_service::{ remote_executor_client::{get_remote_addresses, REMOTE_SHARDED_BLOCK_EXECUTOR}, }; use aptos_executor_types::{ - execution_output::ExecutionOutput, parsed_transaction_output::TransactionsWithParsedOutput, - should_forward_to_subscription_service, ParsedTransactionOutput, + execution_output::ExecutionOutput, should_forward_to_subscription_service, + transactions_with_output::TransactionsWithOutput, }; use aptos_logger::prelude::*; use aptos_metrics_core::TimerHelper; @@ -34,7 +34,7 @@ use aptos_types::{ authenticator::AccountAuthenticator, signature_verified_transaction::{SignatureVerifiedTransaction, TransactionProvider}, BlockEndInfo, BlockOutput, ExecutionStatus, Transaction, TransactionOutput, - TransactionOutputProvider, TransactionStatus, Version, + TransactionStatus, Version, }, write_set::{TransactionWrite, WriteSet}, }; @@ -52,22 +52,31 @@ impl DoGetExecutionOutput { onchain_config: BlockExecutorConfigFromOnchain, append_state_checkpoint_to_block: Option, ) -> Result { - match transactions { + let res = match transactions { ExecutableTransactions::Unsharded(txns) => { Self::by_transaction_execution_unsharded::( txns, state_view, onchain_config, append_state_checkpoint_to_block, - ) + )? }, ExecutableTransactions::Sharded(txns) => Self::by_transaction_execution_sharded::( txns, state_view, onchain_config, append_state_checkpoint_to_block, - ), + )?, + }; + + { + let _timer = OTHER_TIMERS.timer_with(&["update_counters__by_execution"]); + for x in [&res.to_commit, &res.to_discard, &res.to_retry] { + update_counters_for_processed_chunk(x.txns(), x.transaction_outputs(), "execution"); + } } + + Ok(res) } fn by_transaction_execution_unsharded( @@ -219,13 +228,12 @@ impl DoGetExecutionOutput { } } -pub fn update_counters_for_processed_chunk( +pub fn update_counters_for_processed_chunk( transactions: &[T], - transaction_outputs: &[O], + transaction_outputs: &[TransactionOutput], process_type: &str, ) where T: TransactionProvider, - O: TransactionOutputProvider, { let detailed_counters = AptosVM::get_processed_transactions_detailed_counters(); let detailed_counters_label = if detailed_counters { "true" } else { "false" }; @@ -239,14 +247,14 @@ pub fn update_counters_for_processed_chunk( for (txn, output) in transactions.iter().zip(transaction_outputs.iter()) { if detailed_counters { - if let Ok(size) = bcs::serialized_size(output.get_transaction_output()) { + if let Ok(size) = bcs::serialized_size(output) { metrics::PROCESSED_TXNS_OUTPUT_SIZE .with_label_values(&[process_type]) .observe(size as f64); } } - let (state, reason, error_code) = match output.get_transaction_output().status() { + let (state, reason, error_code) = match output.status() { TransactionStatus::Keep(execution_status) => match execution_status { ExecutionStatus::Success => ("keep_success", "", "".to_string()), ExecutionStatus::OutOfGas => ("keep_rejected", "OutOfGas", "error".to_string()), @@ -438,7 +446,7 @@ pub fn update_counters_for_processed_chunk( } } - for event in output.get_transaction_output().events() { + for event in output.events() { let (is_core, creation_number) = match event { ContractEvent::V1(v1) => ( v1.key().get_creator_address() == CORE_CODE_ADDRESS, @@ -468,7 +476,7 @@ impl Parser { fn parse( first_version: Version, mut transactions: Vec, - transaction_outputs: Vec, + mut transaction_outputs: Vec, state_cache: StateCache, block_end_info: Option, append_state_checkpoint_to_block: Option, @@ -477,12 +485,17 @@ impl Parser { let is_block = append_state_checkpoint_to_block.is_some(); // Parse all outputs. - let mut transaction_outputs: Vec = - { transaction_outputs.into_iter().map(Into::into).collect() }; + let mut epoch_ending_flags = transaction_outputs + .iter() + .map(TransactionOutput::has_new_epoch_event) + .collect_vec(); // Isolate retries. - let (to_retry, has_reconfig) = - Self::extract_retries(&mut transactions, &mut transaction_outputs); + let (to_retry, has_reconfig) = Self::extract_retries( + &mut transactions, + &mut transaction_outputs, + &mut epoch_ending_flags, + ); // Collect all statuses. let statuses_for_input_txns = { @@ -493,10 +506,15 @@ impl Parser { }; // Isolate discards. - let to_discard = Self::extract_discards(&mut transactions, &mut transaction_outputs); + let to_discard = Self::extract_discards( + &mut transactions, + &mut transaction_outputs, + &mut epoch_ending_flags, + ); // The rest is to be committed, attach block epilogue as needed and optionally get next EpochState. - let to_commit = TransactionsWithParsedOutput::new(transactions, transaction_outputs); + let to_commit = + TransactionsWithOutput::new(transactions, transaction_outputs, epoch_ending_flags); let to_commit = Self::maybe_add_block_epilogue( to_commit, has_reconfig, @@ -507,7 +525,7 @@ impl Parser { .then(|| Self::ensure_next_epoch_state(&to_commit)) .transpose()?; let subscribable_events = to_commit - .parsed_outputs() + .transaction_outputs() .iter() .flat_map(|o| { o.events() @@ -517,13 +535,6 @@ impl Parser { .cloned() .collect_vec(); - { - let _timer = OTHER_TIMERS.timer_with(&["update_counters__by_execution"]); - for x in [&to_commit, &to_discard, &to_retry] { - update_counters_for_processed_chunk(x.txns(), x.parsed_outputs(), "execution"); - } - } - Ok(ExecutionOutput::new( is_block, first_version, @@ -540,12 +551,13 @@ impl Parser { fn extract_retries( transactions: &mut Vec, - transaction_outputs: &mut Vec, - ) -> (TransactionsWithParsedOutput, bool) { + transaction_outputs: &mut Vec, + epoch_ending_flags: &mut Vec, + ) -> (TransactionsWithOutput, bool) { // N.B. off-by-1 intentionally, for exclusive index - let new_epoch_marker = transaction_outputs + let new_epoch_marker = epoch_ending_flags .iter() - .position(|o| o.is_reconfig()) + .rposition(|f| *f) .map(|idx| idx + 1); let block_gas_limit_marker = transaction_outputs @@ -556,49 +568,58 @@ impl Parser { // Transactions after the txn that exceeded per-block gas limit are also to be retried. if let Some(pos) = new_epoch_marker { ( - TransactionsWithParsedOutput::new( + TransactionsWithOutput::new( transactions.drain(pos..).collect(), transaction_outputs.drain(pos..).collect(), + epoch_ending_flags.drain(pos..).collect(), ), true, ) } else if let Some(pos) = block_gas_limit_marker { ( - TransactionsWithParsedOutput::new( + TransactionsWithOutput::new( transactions.drain(pos..).collect(), transaction_outputs.drain(pos..).collect(), + epoch_ending_flags.drain(pos..).collect(), ), false, ) } else { - (TransactionsWithParsedOutput::new_empty(), false) + (TransactionsWithOutput::new_empty(), false) } } fn extract_discards( transactions: &mut Vec, - transaction_outputs: &mut Vec, - ) -> TransactionsWithParsedOutput { + transaction_outputs: &mut Vec, + epoch_ending_flags: &mut Vec, + ) -> TransactionsWithOutput { let to_discard = { - let mut res = TransactionsWithParsedOutput::new_empty(); + let mut res = TransactionsWithOutput::new_empty(); for idx in 0..transactions.len() { if transaction_outputs[idx].status().is_discarded() { - res.push(transactions[idx].clone(), transaction_outputs[idx].clone()); + res.push( + transactions[idx].clone(), + transaction_outputs[idx].clone(), + epoch_ending_flags[idx], + ); } else if !res.is_empty() { transactions[idx - res.len()] = transactions[idx].clone(); transaction_outputs[idx - res.len()] = transaction_outputs[idx].clone(); + epoch_ending_flags[idx - res.len()] = epoch_ending_flags[idx]; } } if !res.is_empty() { let remaining = transactions.len() - res.len(); transactions.truncate(remaining); transaction_outputs.truncate(remaining); + epoch_ending_flags.truncate(remaining); } res }; // Sanity check transactions with the Discard status: - to_discard.iter().for_each(|(t, o)| { + to_discard.iter().for_each(|(t, o, _flag)| { // In case a new status other than Retry, Keep and Discard is added: if !matches!(o.status(), TransactionStatus::Discard(_)) { error!("Status other than Retry, Keep or Discard; Transaction discarded."); @@ -619,11 +640,11 @@ impl Parser { } fn maybe_add_block_epilogue( - mut to_commit: TransactionsWithParsedOutput, + mut to_commit: TransactionsWithOutput, is_reconfig: bool, block_end_info: Option<&BlockEndInfo>, append_state_checkpoint_to_block: Option, - ) -> TransactionsWithParsedOutput { + ) -> TransactionsWithOutput { if !is_reconfig { // Append the StateCheckpoint transaction to the end if let Some(block_id) = append_state_checkpoint_to_block { @@ -636,7 +657,8 @@ impl Parser { to_commit.push( state_checkpoint_txn, - ParsedTransactionOutput::from(TransactionOutput::new_empty_success()), + TransactionOutput::new_empty_success(), + false, ); } }; // else: not adding block epilogue at epoch ending. @@ -644,9 +666,9 @@ impl Parser { to_commit } - fn ensure_next_epoch_state(to_commit: &TransactionsWithParsedOutput) -> Result { + fn ensure_next_epoch_state(to_commit: &TransactionsWithOutput) -> Result { let last_write_set = to_commit - .parsed_outputs() + .transaction_outputs() .last() .ok_or_else(|| anyhow!("to_commit is empty."))? .write_set(); diff --git a/execution/executor/src/workflow/do_ledger_update.rs b/execution/executor/src/workflow/do_ledger_update.rs index 9a198584029c5c..6f9824ab27e4ea 100644 --- a/execution/executor/src/workflow/do_ledger_update.rs +++ b/execution/executor/src/workflow/do_ledger_update.rs @@ -5,14 +5,14 @@ use crate::metrics::OTHER_TIMERS; use anyhow::Result; use aptos_crypto::{hash::CryptoHash, HashValue}; use aptos_executor_types::{ - execution_output::ExecutionOutput, parsed_transaction_output::TransactionsWithParsedOutput, - state_checkpoint_output::StateCheckpointOutput, LedgerUpdateOutput, ParsedTransactionOutput, + execution_output::ExecutionOutput, state_checkpoint_output::StateCheckpointOutput, + transactions_with_output::TransactionsWithOutput, LedgerUpdateOutput, }; use aptos_experimental_runtimes::thread_manager::optimal_min_len; use aptos_metrics_core::TimerHelper; use aptos_types::{ proof::accumulator::{InMemoryEventAccumulator, InMemoryTransactionAccumulator}, - transaction::TransactionInfo, + transaction::{TransactionInfo, TransactionOutput}, }; use itertools::{izip, Itertools}; use rayon::prelude::*; @@ -30,7 +30,7 @@ impl DoLedgerUpdate { // Calculate hashes let to_commit = &execution_output.to_commit; - let txn_outs = to_commit.parsed_outputs(); + let txn_outs = to_commit.transaction_outputs(); let (event_hashes, writeset_hashes) = Self::calculate_events_and_writeset_hashes(txn_outs); @@ -55,7 +55,7 @@ impl DoLedgerUpdate { } fn calculate_events_and_writeset_hashes( - to_commit: &[ParsedTransactionOutput], + to_commit: &[TransactionOutput], ) -> (Vec, Vec) { let _timer = OTHER_TIMERS.timer_with(&["calculate_events_and_writeset_hashes"]); @@ -79,7 +79,7 @@ impl DoLedgerUpdate { } fn assemble_transaction_infos( - to_commit: &TransactionsWithParsedOutput, + to_commit: &TransactionsWithOutput, state_checkpoint_hashes: Vec>, event_hashes: Vec, writeset_hashes: Vec, @@ -93,7 +93,12 @@ impl DoLedgerUpdate { writeset_hashes ) .map( - |((txn, txn_out), state_checkpoint_hash, event_root_hash, write_set_hash)| { + |( + (txn, txn_out, _is_reconfig), + state_checkpoint_hash, + event_root_hash, + write_set_hash, + )| { TransactionInfo::new( txn.hash(), write_set_hash, diff --git a/storage/aptosdb/src/db/include/aptosdb_testonly.rs b/storage/aptosdb/src/db/include/aptosdb_testonly.rs index ed08607fbd2dc3..58a629b61c7207 100644 --- a/storage/aptosdb/src/db/include/aptosdb_testonly.rs +++ b/storage/aptosdb/src/db/include/aptosdb_testonly.rs @@ -173,7 +173,7 @@ impl ChunkToCommitOwned { ChunkToCommit { first_version: self.first_version, transactions: &self.transactions, - transaction_outputs: self.transaction_outputs.clone(), + transaction_outputs: &self.transaction_outputs, transaction_infos: &self.transaction_infos, base_state_version: self.base_state_version, latest_in_memory_state: &self.latest_in_memory_state, diff --git a/storage/aptosdb/src/db/include/aptosdb_writer.rs b/storage/aptosdb/src/db/include/aptosdb_writer.rs index 3c9e687fa0ed76..5a7452d458beb9 100644 --- a/storage/aptosdb/src/db/include/aptosdb_writer.rs +++ b/storage/aptosdb/src/db/include/aptosdb_writer.rs @@ -280,7 +280,7 @@ impl AptosDB { // // TODO(grao): Consider propagating the error instead of panic, if necessary. s.spawn(|_| { - self.commit_events(chunk.first_version, &chunk.transaction_outputs, skip_index_and_usage) + self.commit_events(chunk.first_version, chunk.transaction_outputs, skip_index_and_usage) .unwrap() }); s.spawn(|_| { diff --git a/storage/storage-interface/src/chunk_to_commit.rs b/storage/storage-interface/src/chunk_to_commit.rs index 603844115605fd..77a013f26524fb 100644 --- a/storage/storage-interface/src/chunk_to_commit.rs +++ b/storage/storage-interface/src/chunk_to_commit.rs @@ -12,7 +12,7 @@ pub struct ChunkToCommit<'a> { pub first_version: Version, pub transactions: &'a [Transaction], // TODO(aldenhu): make it a ref - pub transaction_outputs: Vec, + pub transaction_outputs: &'a [TransactionOutput], pub transaction_infos: &'a [TransactionInfo], pub base_state_version: Option, pub latest_in_memory_state: &'a StateDelta, diff --git a/types/src/contract_event.rs b/types/src/contract_event.rs index 59d9a27fae4060..ba02652965441e 100644 --- a/types/src/contract_event.rs +++ b/types/src/contract_event.rs @@ -7,6 +7,7 @@ use crate::{ dkg::DKGStartEvent, event::EventKey, jwks::ObservedJWKsUpdated, + on_chain_config, on_chain_config::new_epoch_event_key, transaction::Version, }; @@ -32,6 +33,8 @@ pub static FEE_STATEMENT_EVENT_TYPE: Lazy = Lazy::new(|| { })) }); +pub static NEW_EPOCH_EVENT_KEY: Lazy = Lazy::new(on_chain_config::new_epoch_event_key); + /// This trait is used by block executor to abstractly represent an event, /// and update its data. pub trait TransactionEvent { @@ -157,7 +160,7 @@ impl ContractEvent { pub fn is_new_epoch_event(&self) -> bool { match self { - ContractEvent::V1(event) => *event.key() == new_epoch_event_key(), + ContractEvent::V1(event) => event.key() == &*NEW_EPOCH_EVENT_KEY, ContractEvent::V2(_event) => false, } } diff --git a/types/src/transaction/mod.rs b/types/src/transaction/mod.rs index e67fe6a31344d9..02085985d088c8 100644 --- a/types/src/transaction/mod.rs +++ b/types/src/transaction/mod.rs @@ -1344,15 +1344,9 @@ impl TransactionOutput { } Ok(None) } -} - -pub trait TransactionOutputProvider { - fn get_transaction_output(&self) -> &TransactionOutput; -} -impl TransactionOutputProvider for TransactionOutput { - fn get_transaction_output(&self) -> &TransactionOutput { - self + pub fn has_new_epoch_event(&self) -> bool { + self.events.iter().any(ContractEvent::is_new_epoch_event) } }