diff --git a/consensus/src/state_computer.rs b/consensus/src/state_computer.rs index fb8230e013c00..94e5bc3e968ff 100644 --- a/consensus/src/state_computer.rs +++ b/consensus/src/state_computer.rs @@ -144,7 +144,10 @@ impl ExecutionProxy { Box::pin(async move { pre_commit_notifier .send(Box::pin(async move { - let txns = state_compute_result.transactions_to_commit(); + let txns = state_compute_result + .ledger_update_output + .transactions + .clone(); let subscribable_events = state_compute_result.subscribable_events().to_vec(); if let Err(e) = monitor!( diff --git a/execution/executor-types/src/ledger_update_output.rs b/execution/executor-types/src/ledger_update_output.rs index d5f0f101c998c..4187f0e80245a 100644 --- a/execution/executor-types/src/ledger_update_output.rs +++ b/execution/executor-types/src/ledger_update_output.rs @@ -12,8 +12,8 @@ use aptos_types::{ proof::accumulator::InMemoryTransactionAccumulator, state_store::ShardedStateUpdates, transaction::{ - block_epilogue::BlockEndInfo, TransactionInfo, TransactionStatus, TransactionToCommit, - Version, + block_epilogue::BlockEndInfo, Transaction, TransactionInfo, TransactionOutput, + TransactionStatus, Version, }, }; use derive_more::Deref; @@ -32,7 +32,7 @@ impl LedgerUpdateOutput { } #[cfg(any(test, feature = "fuzzing"))] - pub fn new_dummy_with_input_txns(txns: Vec) -> Self { + pub fn new_dummy_with_input_txns(txns: Vec) -> Self { Self::new_impl(Inner::new_dummy_with_input_txns(txns)) } @@ -46,7 +46,10 @@ impl LedgerUpdateOutput { pub fn new( statuses_for_input_txns: Vec, - to_commit: Vec, + transactions: Vec, + transaction_outputs: Vec, + transaction_infos: Vec, + per_version_state_updates: Vec, subscribable_events: Vec, transaction_info_hashes: Vec, state_updates_until_last_checkpoint: Option, @@ -57,7 +60,10 @@ impl LedgerUpdateOutput { ) -> Self { Self::new_impl(Inner { statuses_for_input_txns, - to_commit, + transactions, + transaction_outputs, + transaction_infos, + per_version_state_updates, subscribable_events, transaction_info_hashes, state_updates_until_last_checkpoint, @@ -78,7 +84,10 @@ impl LedgerUpdateOutput { #[derive(Default, Debug)] pub struct Inner { pub statuses_for_input_txns: Vec, - pub to_commit: Vec, + pub transactions: Vec, + pub transaction_outputs: Vec, + pub transaction_infos: Vec, + pub per_version_state_updates: Vec, pub subscribable_events: Vec, pub transaction_info_hashes: Vec, pub state_updates_until_last_checkpoint: Option, @@ -100,17 +109,10 @@ impl Inner { } #[cfg(any(test, feature = "fuzzing"))] - pub fn new_dummy_with_input_txns(txns: Vec) -> Self { - let num_txns = txns.len(); - let to_commit = txns - .into_iter() - .chain(std::iter::once( - aptos_types::transaction::Transaction::StateCheckpoint(HashValue::zero()), - )) - .map(TransactionToCommit::dummy_with_transaction) - .collect(); + pub fn new_dummy_with_input_txns(transactions: Vec) -> Self { + let num_txns = transactions.len(); Self { - to_commit, + transactions, statuses_for_input_txns: vec![ TransactionStatus::Keep( aptos_types::transaction::ExecutionStatus::Success @@ -136,19 +138,15 @@ impl Inner { &self.transaction_accumulator } - pub fn transactions_to_commit(&self) -> &Vec { - &self.to_commit - } - /// Ensure that every block committed by consensus ends with a state checkpoint. That can be /// one of the two cases: 1. a reconfiguration (txns in the proposed block after the txn caused /// the reconfiguration will be retried) 2. a Transaction::StateCheckpoint at the end of the /// block. pub fn ensure_ends_with_state_checkpoint(&self) -> Result<()> { ensure!( - self.to_commit + self.transactions .last() - .map_or(true, |txn| txn.transaction().is_non_reconfig_block_ending()), + .map_or(true, |t| t.is_non_reconfig_block_ending()), "Block not ending with a state checkpoint.", ); Ok(()) @@ -158,20 +156,17 @@ impl Inner { &self, transaction_infos: &[TransactionInfo], ) -> Result<()> { - let first_version = - self.transaction_accumulator.version() + 1 - self.to_commit.len() as Version; ensure!( - self.transactions_to_commit().len() == transaction_infos.len(), + self.transaction_infos.len() == transaction_infos.len(), "Lengths don't match. {} vs {}", - self.transactions_to_commit().len(), + self.transaction_infos.len(), transaction_infos.len(), ); - let mut version = first_version; - for (txn_to_commit, expected_txn_info) in - zip_eq(self.to_commit.iter(), transaction_infos.iter()) + let mut version = self.first_version(); + for (txn_info, expected_txn_info) in + zip_eq(self.transaction_infos.iter(), transaction_infos.iter()) { - let txn_info = txn_to_commit.transaction_info(); ensure!( txn_info == expected_txn_info, "Transaction infos don't match. version:{version}, txn_info:{txn_info}, expected_txn_info:{expected_txn_info}", @@ -186,10 +181,10 @@ impl Inner { } pub fn first_version(&self) -> Version { - self.transaction_accumulator.num_leaves() - self.to_commit.len() as Version + self.parent_accumulator.num_leaves } pub fn num_txns(&self) -> usize { - self.to_commit.len() + self.transactions.len() } } diff --git a/execution/executor-types/src/parsed_transaction_output.rs b/execution/executor-types/src/parsed_transaction_output.rs index bce63c825ed1a..ddf81e5a2873f 100644 --- a/execution/executor-types/src/parsed_transaction_output.rs +++ b/execution/executor-types/src/parsed_transaction_output.rs @@ -160,8 +160,11 @@ impl TransactionsWithParsedOutput { self.transactions } - pub fn into_inner(self) -> (Vec, Vec) { - (self.transactions, self.parsed_output) + pub fn into_inner(self) -> (Vec, Vec) { + ( + self.transactions, + self.parsed_output.into_iter().map(|t| t.output).collect(), + ) } pub fn iter(&self) -> impl Iterator { diff --git a/execution/executor-types/src/state_compute_result.rs b/execution/executor-types/src/state_compute_result.rs index 8d67930555726..0fd5f85ea24d6 100644 --- a/execution/executor-types/src/state_compute_result.rs +++ b/execution/executor-types/src/state_compute_result.rs @@ -104,17 +104,7 @@ impl StateComputeResult { } pub fn transactions_to_commit_len(&self) -> usize { - self.ledger_update_output.to_commit.len() - } - - /// On top of input transactions (which contain BlockMetadata and Validator txns), - /// filter out those that should be committed, and add StateCheckpoint/BlockEpilogue if needed. - pub fn transactions_to_commit(&self) -> Vec { - self.ledger_update_output - .to_commit - .iter() - .map(|t| t.transaction.clone()) - .collect() + self.ledger_update_output.transactions.len() } pub fn epoch_state(&self) -> &Option { @@ -155,7 +145,7 @@ impl StateComputeResult { pub fn make_chunk_commit_notification(&self) -> ChunkCommitNotification { ChunkCommitNotification { subscribable_events: self.ledger_update_output.subscribable_events.clone(), - committed_transactions: self.transactions_to_commit(), + committed_transactions: self.ledger_update_output.transactions.clone(), reconfiguration_occurred: self.has_reconfiguration(), } } @@ -163,14 +153,18 @@ impl StateComputeResult { pub fn as_chunk_to_commit(&self) -> ChunkToCommit { ChunkToCommit { first_version: self.ledger_update_output.first_version(), + transactions: &self.ledger_update_output.transactions, + transaction_outputs: &self.ledger_update_output.transaction_outputs, + transaction_infos: &self.ledger_update_output.transaction_infos, + per_version_state_updates: &self.ledger_update_output.per_version_state_updates, base_state_version: self.parent_state.base_version, - txns_to_commit: &self.ledger_update_output.to_commit, latest_in_memory_state: &self.result_state, state_updates_until_last_checkpoint: self .ledger_update_output .state_updates_until_last_checkpoint .as_ref(), sharded_state_cache: Some(&self.ledger_update_output.sharded_state_cache), + is_reconfig: self.ledger_update_output.block_end_info.is_some(), } } } diff --git a/execution/executor/src/block_executor.rs b/execution/executor/src/block_executor.rs index 72426a15dd951..73ed2811a64d7 100644 --- a/execution/executor/src/block_executor.rs +++ b/execution/executor/src/block_executor.rs @@ -6,7 +6,7 @@ use crate::{ components::{ - apply_chunk_output::ApplyChunkOutput, block_tree::BlockTree, chunk_output::ChunkOutput, + block_tree::BlockTree, chunk_output::ChunkOutput, do_ledger_update::DoLedgerUpdate, partial_state_compute_result::PartialStateComputeResult, }, logging::{LogEntry, LogSchema}, @@ -318,10 +318,7 @@ where parent_output.reconfig_suffix() } else { let (output, _, _) = THREAD_MANAGER.get_non_exe_cpu_pool().install(|| { - ApplyChunkOutput::calculate_ledger_update( - state_checkpoint_output, - parent_accumulator.clone(), - ) + DoLedgerUpdate::run(state_checkpoint_output, parent_accumulator.clone()) })?; output }; diff --git a/execution/executor/src/chunk_executor.rs b/execution/executor/src/chunk_executor.rs index 4feb6a4bcb076..c5854affe53e0 100644 --- a/execution/executor/src/chunk_executor.rs +++ b/execution/executor/src/chunk_executor.rs @@ -10,6 +10,7 @@ use crate::{ chunk_commit_queue::{ChunkCommitQueue, ChunkToUpdateLedger}, chunk_output::ChunkOutput, chunk_result_verifier::{ChunkResultVerifier, ReplayChunkVerifier, StateSyncChunkVerifier}, + do_ledger_update::DoLedgerUpdate, executed_chunk::ExecutedChunk, partial_state_compute_result::PartialStateComputeResult, transaction_chunk::{ChunkToApply, ChunkToExecute, TransactionChunk}, @@ -349,10 +350,7 @@ impl ChunkExecutorInner { let first_version = parent_accumulator.num_leaves(); let (ledger_update_output, to_discard, to_retry) = { let _timer = CHUNK_OTHER_TIMERS.timer_with(&["chunk_update_ledger__calculate"]); - ApplyChunkOutput::calculate_ledger_update( - state_checkpoint_output, - parent_accumulator.clone(), - )? + DoLedgerUpdate::run(state_checkpoint_output, parent_accumulator.clone())? }; ensure!(to_discard.is_empty(), "Unexpected discard."); @@ -494,19 +492,16 @@ impl ChunkExecutorInner { let started = Instant::now(); let chunk = self.commit_chunk_impl()?; + let output = chunk.output.expect_complete_result(); - let num_committed = chunk.transactions_to_commit().len(); + let num_committed = output.transactions_to_commit_len(); info!( num_committed = num_committed, tps = num_committed as f64 / started.elapsed().as_secs_f64(), "TransactionReplayer::commit() OK" ); - Ok(chunk - .output - .result_state - .current_version - .expect("Version must exist after commit.")) + Ok(output.version()) } /// Remove `end_version - begin_version` transactions from the mutable input arguments and replay. diff --git a/execution/executor/src/components/apply_chunk_output.rs b/execution/executor/src/components/apply_chunk_output.rs index 321579363ba02..42f091b5564fd 100644 --- a/execution/executor/src/components/apply_chunk_output.rs +++ b/execution/executor/src/components/apply_chunk_output.rs @@ -6,38 +6,31 @@ use crate::{ components::{ - chunk_output::{update_counters_for_processed_chunk, ChunkOutput}, - executed_chunk::ExecutedChunk, + chunk_output::ChunkOutput, do_ledger_update::DoLedgerUpdate, executed_chunk::ExecutedChunk, in_memory_state_calculator_v2::InMemoryStateCalculatorV2, partial_state_compute_result::PartialStateComputeResult, }, metrics::{EXECUTOR_ERRORS, OTHER_TIMERS}, }; -use anyhow::{ensure, Result}; -use aptos_crypto::{hash::CryptoHash, HashValue}; +use anyhow::Result; +use aptos_crypto::HashValue; use aptos_executor_types::{ parsed_transaction_output::TransactionsWithParsedOutput, - should_forward_to_subscription_service, state_checkpoint_output::{StateCheckpointOutput, TransactionsByStatus}, - LedgerUpdateOutput, ParsedTransactionOutput, + ParsedTransactionOutput, }; -use aptos_experimental_runtimes::thread_manager::optimal_min_len; use aptos_logger::error; use aptos_metrics_core::TimerHelper; use aptos_storage_interface::{state_delta::StateDelta, ExecutedTrees}; use aptos_types::{ - contract_event::ContractEvent, epoch_state::EpochState, - proof::accumulator::{InMemoryEventAccumulator, InMemoryTransactionAccumulator}, - state_store::ShardedStateUpdates, transaction::{ block_epilogue::{BlockEndInfo, BlockEpiloguePayload}, - ExecutionStatus, Transaction, TransactionAuxiliaryData, TransactionInfo, TransactionOutput, - TransactionStatus, TransactionToCommit, + ExecutionStatus, Transaction, TransactionAuxiliaryData, TransactionOutput, + TransactionStatus, }, write_set::WriteSet, }; -use rayon::prelude::*; use std::{iter::repeat, sync::Arc}; pub struct ApplyChunkOutput; @@ -115,61 +108,6 @@ impl ApplyChunkOutput { )) } - pub fn calculate_ledger_update( - state_checkpoint_output: StateCheckpointOutput, - base_txn_accumulator: Arc, - ) -> Result<(LedgerUpdateOutput, Vec, Vec)> { - let ( - txns, - state_updates_vec, - state_checkpoint_hashes, - state_updates_before_last_checkpoint, - sharded_state_cache, - block_end_info, - ) = state_checkpoint_output.into_inner(); - - let (statuses_for_input_txns, to_commit, to_discard, to_retry) = txns.into_inner(); - - update_counters_for_processed_chunk( - to_commit.txns(), - to_commit.parsed_outputs(), - "execution", - ); - update_counters_for_processed_chunk( - to_discard.txns(), - to_discard.parsed_outputs(), - "execution", - ); - update_counters_for_processed_chunk( - to_retry.txns(), - to_retry.parsed_outputs(), - "execution", - ); - - // Calculate TransactionData and TransactionInfo, i.e. the ledger history diff. - let _timer = OTHER_TIMERS.timer_with(&["assemble_ledger_diff_for_block"]); - - let (txns_to_commit, transaction_info_hashes, subscribable_events) = - Self::assemble_ledger_diff(to_commit, state_updates_vec, state_checkpoint_hashes); - let transaction_accumulator = - Arc::new(base_txn_accumulator.append(&transaction_info_hashes)); - Ok(( - LedgerUpdateOutput::new( - statuses_for_input_txns, - txns_to_commit, - subscribable_events, - transaction_info_hashes, - state_updates_before_last_checkpoint, - sharded_state_cache, - transaction_accumulator, - base_txn_accumulator, - block_end_info, - ), - to_discard.into_txns(), - to_retry.into_txns(), - )) - } - pub fn apply_chunk( chunk_output: ChunkOutput, base_view: &ExecutedTrees, @@ -183,10 +121,8 @@ impl ApplyChunkOutput { known_state_checkpoint_hashes, /*is_block=*/ false, )?; - let (ledger_update_output, to_discard, to_retry) = Self::calculate_ledger_update( - state_checkpoint_output, - base_view.txn_accumulator().clone(), - )?; + let (ledger_update_output, to_discard, to_retry) = + DoLedgerUpdate::run(state_checkpoint_output, base_view.txn_accumulator().clone())?; let output = PartialStateComputeResult::new( base_view.state().clone(), result_state, @@ -322,159 +258,4 @@ impl ApplyChunkOutput { to_retry, )) } - - fn assemble_ledger_diff( - to_commit_from_execution: TransactionsWithParsedOutput, - state_updates_vec: Vec, - state_checkpoint_hashes: Vec>, - ) -> (Vec, Vec, Vec) { - // these are guaranteed by caller side logic - assert_eq!(to_commit_from_execution.len(), state_updates_vec.len()); - assert_eq!( - to_commit_from_execution.len(), - state_checkpoint_hashes.len() - ); - - let num_txns = to_commit_from_execution.len(); - let mut to_commit = Vec::with_capacity(num_txns); - let mut txn_info_hashes = Vec::with_capacity(num_txns); - let hashes_vec = - Self::calculate_events_and_writeset_hashes(to_commit_from_execution.parsed_outputs()); - - let _timer = OTHER_TIMERS.timer_with(&["process_events_and_writeset_hashes"]); - let hashes_vec: Vec<(HashValue, HashValue)> = hashes_vec - .into_par_iter() - .map(|(event_hashes, write_set_hash)| { - ( - InMemoryEventAccumulator::from_leaves(&event_hashes).root_hash(), - write_set_hash, - ) - }) - .collect(); - - let mut all_subscribable_events = Vec::new(); - let (to_commit_txns, to_commit_outputs) = to_commit_from_execution.into_inner(); - for ( - txn, - txn_output, - state_checkpoint_hash, - state_updates, - (event_root_hash, write_set_hash), - ) in itertools::izip!( - to_commit_txns, - to_commit_outputs, - state_checkpoint_hashes, - state_updates_vec, - hashes_vec - ) { - let (write_set, events, per_txn_reconfig_events, gas_used, status, auxiliary_data) = - txn_output.unpack(); - - let subscribable_events: Vec = events - .iter() - .filter(|evt| should_forward_to_subscription_service(evt)) - .cloned() - .collect(); - let txn_info = match &status { - TransactionStatus::Keep(status) => TransactionInfo::new( - txn.hash(), - write_set_hash, - event_root_hash, - state_checkpoint_hash, - gas_used, - status.clone(), - ), - _ => unreachable!("Transaction sorted by status already."), - }; - let txn_info_hash = txn_info.hash(); - txn_info_hashes.push(txn_info_hash); - let txn_to_commit = TransactionToCommit::new( - txn, - txn_info, - state_updates, - write_set, - events, - !per_txn_reconfig_events.is_empty(), - auxiliary_data, - ); - all_subscribable_events.extend(subscribable_events); - to_commit.push(txn_to_commit); - } - (to_commit, txn_info_hashes, all_subscribable_events) - } - - fn calculate_events_and_writeset_hashes( - to_commit_from_execution: &[ParsedTransactionOutput], - ) -> Vec<(Vec, HashValue)> { - let _timer = OTHER_TIMERS.timer_with(&["calculate_events_and_writeset_hashes"]); - - let num_txns = to_commit_from_execution.len(); - to_commit_from_execution - .par_iter() - .with_min_len(optimal_min_len(num_txns, 64)) - .map(|txn_output| { - ( - txn_output - .events() - .iter() - .map(CryptoHash::hash) - .collect::>(), - CryptoHash::hash(txn_output.write_set()), - ) - }) - .collect::>() - } -} - -pub fn ensure_no_discard(to_discard: Vec) -> Result<()> { - ensure!(to_discard.is_empty(), "Syncing discarded transactions"); - Ok(()) -} - -pub fn ensure_no_retry(to_retry: Vec) -> Result<()> { - ensure!( - to_retry.is_empty(), - "Seeing retries when syncing, did it crosses epoch boundary?", - ); - Ok(()) -} - -#[test] -fn assemble_ledger_diff_should_filter_subscribable_events() { - let event_0 = - ContractEvent::new_v2_with_type_tag_str("0x1::dkg::DKGStartEvent", b"dkg_1".to_vec()); - let event_1 = ContractEvent::new_v2_with_type_tag_str( - "0x2345::random_module::RandomEvent", - b"random_x".to_vec(), - ); - let event_2 = - ContractEvent::new_v2_with_type_tag_str("0x1::dkg::DKGStartEvent", b"dkg_2".to_vec()); - let txns_n_outputs = - TransactionsWithParsedOutput::new(vec![Transaction::dummy(), Transaction::dummy()], vec![ - ParsedTransactionOutput::from(TransactionOutput::new( - WriteSet::default(), - vec![event_0.clone()], - 0, - TransactionStatus::Keep(ExecutionStatus::Success), - TransactionAuxiliaryData::default(), - )), - ParsedTransactionOutput::from(TransactionOutput::new( - WriteSet::default(), - vec![event_1.clone(), event_2.clone()], - 0, - TransactionStatus::Keep(ExecutionStatus::Success), - TransactionAuxiliaryData::default(), - )), - ]); - let state_updates_vec = vec![ - ShardedStateUpdates::default(), - ShardedStateUpdates::default(), - ]; - let state_checkpoint_hashes = vec![Some(HashValue::zero()), Some(HashValue::zero())]; - let (_, _, subscribable_events) = ApplyChunkOutput::assemble_ledger_diff( - txns_n_outputs, - state_updates_vec, - state_checkpoint_hashes, - ); - assert_eq!(vec![event_0, event_2], subscribable_events); } diff --git a/execution/executor/src/components/do_ledger_update.rs b/execution/executor/src/components/do_ledger_update.rs new file mode 100644 index 0000000000000..2fe989faea05b --- /dev/null +++ b/execution/executor/src/components/do_ledger_update.rs @@ -0,0 +1,205 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{components::chunk_output::update_counters_for_processed_chunk, metrics::OTHER_TIMERS}; +use anyhow::Result; +use aptos_crypto::{hash::CryptoHash, HashValue}; +use aptos_executor_types::{ + parsed_transaction_output::TransactionsWithParsedOutput, + should_forward_to_subscription_service, state_checkpoint_output::StateCheckpointOutput, + LedgerUpdateOutput, ParsedTransactionOutput, +}; +use aptos_experimental_runtimes::thread_manager::optimal_min_len; +use aptos_metrics_core::TimerHelper; +use aptos_types::{ + contract_event::ContractEvent, + proof::accumulator::{InMemoryEventAccumulator, InMemoryTransactionAccumulator}, + transaction::{Transaction, TransactionInfo}, +}; +use itertools::{izip, Itertools}; +use rayon::prelude::*; +use std::sync::Arc; + +pub struct DoLedgerUpdate; + +impl DoLedgerUpdate { + pub fn run( + state_checkpoint_output: StateCheckpointOutput, + base_txn_accumulator: Arc, + ) -> Result<(LedgerUpdateOutput, Vec, Vec)> { + let ( + txns, + state_updates_vec, + state_checkpoint_hashes, + state_updates_before_last_checkpoint, + sharded_state_cache, + block_end_info, + ) = state_checkpoint_output.into_inner(); + + let (statuses_for_input_txns, to_commit, to_discard, to_retry) = txns.into_inner(); + for group in [&to_commit, &to_discard, &to_retry] { + update_counters_for_processed_chunk(group.txns(), group.parsed_outputs(), "execution"); + } + + // these are guaranteed by caller side logic + assert_eq!(to_commit.len(), state_updates_vec.len()); + assert_eq!(to_commit.len(), state_checkpoint_hashes.len()); + + let (event_hashes, write_set_hashes) = + Self::calculate_events_and_writeset_hashes(to_commit.parsed_outputs()); + + let (transaction_infos, subscribible_events) = Self::assemble_transaction_infos( + &to_commit, + state_checkpoint_hashes, + event_hashes, + write_set_hashes, + ); + let transaction_info_hashes = transaction_infos.iter().map(CryptoHash::hash).collect_vec(); + let transaction_accumulator = + Arc::new(base_txn_accumulator.append(&transaction_info_hashes)); + + let (transactions, transaction_outputs) = to_commit.into_inner(); + + let ledger_update_output = LedgerUpdateOutput::new( + statuses_for_input_txns, + transactions, + transaction_outputs, + transaction_infos, + state_updates_vec, + subscribible_events, + transaction_info_hashes, + state_updates_before_last_checkpoint, + sharded_state_cache, + transaction_accumulator, + base_txn_accumulator, + block_end_info, + ); + + Ok(( + ledger_update_output, + to_discard.into_txns(), + to_retry.into_txns(), + )) + } + + fn calculate_events_and_writeset_hashes( + to_commit: &[ParsedTransactionOutput], + ) -> (Vec, Vec) { + let _timer = OTHER_TIMERS.timer_with(&["calculate_events_and_writeset_hashes"]); + + let num_txns = to_commit.len(); + to_commit + .par_iter() + .with_min_len(optimal_min_len(num_txns, 64)) + .map(|txn_output| { + let event_hashes = txn_output + .events() + .iter() + .map(CryptoHash::hash) + .collect::>(); + + ( + InMemoryEventAccumulator::from_leaves(&event_hashes).root_hash(), + CryptoHash::hash(txn_output.write_set()), + ) + }) + .unzip() + } + + fn assemble_transaction_infos( + to_commit: &TransactionsWithParsedOutput, + state_checkpoint_hashes: Vec>, + event_hashes: Vec, + writeset_hashes: Vec, + ) -> (Vec, Vec) { + let _timer = OTHER_TIMERS.timer_with(&["process_events_and_writeset_hashes"]); + + let mut txn_infos = Vec::with_capacity(to_commit.len()); + let mut subscribable_events = Vec::new(); + izip!( + to_commit.iter(), + state_checkpoint_hashes, + event_hashes, + writeset_hashes + ) + .for_each( + |((txn, txn_out), state_checkpoint_hash, event_root_hash, write_set_hash)| { + subscribable_events.extend( + txn_out + .events() + .iter() + .filter(|evt| should_forward_to_subscription_service(evt)) + .cloned(), + ); + txn_infos.push(TransactionInfo::new( + txn.hash(), + write_set_hash, + event_root_hash, + state_checkpoint_hash, + txn_out.gas_used(), + txn_out.status().as_kept_status().expect("Already sorted."), + )); + }, + ); + + (txn_infos, subscribable_events) + } +} + +#[cfg(test)] +mod tests { + use super::DoLedgerUpdate; + use aptos_crypto::hash::HashValue; + use aptos_executor_types::parsed_transaction_output::{ + ParsedTransactionOutput, TransactionsWithParsedOutput, + }; + use aptos_types::{ + contract_event::ContractEvent, + transaction::{ + ExecutionStatus, Transaction, TransactionAuxiliaryData, TransactionOutput, + TransactionStatus, + }, + write_set::WriteSet, + }; + + #[test] + fn assemble_ledger_diff_should_filter_subscribable_events() { + let event_0 = + ContractEvent::new_v2_with_type_tag_str("0x1::dkg::DKGStartEvent", b"dkg_1".to_vec()); + let event_1 = ContractEvent::new_v2_with_type_tag_str( + "0x2345::random_module::RandomEvent", + b"random_x".to_vec(), + ); + let event_2 = + ContractEvent::new_v2_with_type_tag_str("0x1::dkg::DKGStartEvent", b"dkg_2".to_vec()); + let txns_n_outputs = TransactionsWithParsedOutput::new( + vec![Transaction::dummy(), Transaction::dummy()], + vec![ + ParsedTransactionOutput::from(TransactionOutput::new( + WriteSet::default(), + vec![event_0.clone()], + 0, + TransactionStatus::Keep(ExecutionStatus::Success), + TransactionAuxiliaryData::default(), + )), + ParsedTransactionOutput::from(TransactionOutput::new( + WriteSet::default(), + vec![event_1.clone(), event_2.clone()], + 0, + TransactionStatus::Keep(ExecutionStatus::Success), + TransactionAuxiliaryData::default(), + )), + ], + ); + let state_checkpoint_hashes = vec![Some(HashValue::zero()); 2]; + let event_hashes = vec![HashValue::zero(); 2]; + let write_set_hashes = vec![HashValue::zero(); 2]; + let (_transaction_infos, subscribable_events) = DoLedgerUpdate::assemble_transaction_infos( + &txns_n_outputs, + state_checkpoint_hashes, + event_hashes, + write_set_hashes, + ); + assert_eq!(vec![event_0, event_2], subscribable_events); + } +} diff --git a/execution/executor/src/components/executed_chunk.rs b/execution/executor/src/components/executed_chunk.rs index f1d8654729d6e..50e8f0b185290 100644 --- a/execution/executor/src/components/executed_chunk.rs +++ b/execution/executor/src/components/executed_chunk.rs @@ -5,16 +5,10 @@ #![forbid(unsafe_code)] use crate::components::partial_state_compute_result::PartialStateComputeResult; -use aptos_types::{ledger_info::LedgerInfoWithSignatures, transaction::TransactionToCommit}; +use aptos_types::ledger_info::LedgerInfoWithSignatures; #[derive(Debug)] pub struct ExecutedChunk { pub output: PartialStateComputeResult, pub ledger_info_opt: Option, } - -impl ExecutedChunk { - pub fn transactions_to_commit(&self) -> &[TransactionToCommit] { - &self.output.expect_ledger_update_output().to_commit - } -} diff --git a/execution/executor/src/components/mod.rs b/execution/executor/src/components/mod.rs index 27c9b6ca74ec1..da78a4b5dff16 100644 --- a/execution/executor/src/components/mod.rs +++ b/execution/executor/src/components/mod.rs @@ -11,6 +11,7 @@ pub mod chunk_output; pub mod in_memory_state_calculator_v2; pub mod chunk_result_verifier; +pub mod do_ledger_update; pub mod executed_chunk; pub mod partial_state_compute_result; pub mod transaction_chunk; diff --git a/execution/executor/src/db_bootstrapper.rs b/execution/executor/src/db_bootstrapper.rs index 02f74c12118c6..d4bfeae878975 100644 --- a/execution/executor/src/db_bootstrapper.rs +++ b/execution/executor/src/db_bootstrapper.rs @@ -136,7 +136,7 @@ pub fn calculate_genesis( .apply_to_ledger(&executed_trees, None)?; let output = &chunk.output; ensure!( - !chunk.transactions_to_commit().is_empty(), + output.expect_ledger_update_output().num_txns() != 0, "Genesis txn execution failed." ); diff --git a/execution/executor/src/fuzzing.rs b/execution/executor/src/fuzzing.rs index 2621e4b3fcd7e..88500a7db1d00 100644 --- a/execution/executor/src/fuzzing.rs +++ b/execution/executor/src/fuzzing.rs @@ -25,7 +25,7 @@ use aptos_types::{ signature_verified_transaction::{ into_signature_verified_block, SignatureVerifiedTransaction, }, - BlockOutput, Transaction, TransactionOutput, TransactionToCommit, Version, + BlockOutput, Transaction, TransactionOutput, Version, }, vm_status::VMStatus, }; @@ -124,7 +124,7 @@ impl DbWriter for FakeDb { &self, _version: Version, _ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>, - _txns_to_commit: Option<&[TransactionToCommit]>, + _chunk: Option, ) -> aptos_storage_interface::Result<()> { Ok(()) } diff --git a/storage/aptosdb/src/db/include/aptosdb_testonly.rs b/storage/aptosdb/src/db/include/aptosdb_testonly.rs index 73d5e10d13f5b..e6d4edc08ccea 100644 --- a/storage/aptosdb/src/db/include/aptosdb_testonly.rs +++ b/storage/aptosdb/src/db/include/aptosdb_testonly.rs @@ -6,6 +6,7 @@ use aptos_config::config::{ BUFFERED_STATE_TARGET_ITEMS_FOR_TEST, DEFAULT_MAX_NU use aptos_infallible::Mutex; use aptos_types::state_store::create_empty_sharded_state_updates; use std::default::Default; +use aptos_types::transaction::TransactionStatus; impl AptosDB { /// This opens db in non-readonly mode, without the pruner. @@ -98,35 +99,6 @@ impl AptosDB { } } -pub fn gather_state_updates_until_last_checkpoint( - first_version: Version, - latest_in_memory_state: &StateDelta, - txns_to_commit: &[TransactionToCommit], -) -> Option { - if let Some(latest_checkpoint_version) = latest_in_memory_state.base_version { - if latest_checkpoint_version >= first_version { - let idx = (latest_checkpoint_version - first_version) as usize; - assert!( - txns_to_commit[idx].has_state_checkpoint_hash(), - "The new latest snapshot version passed in {:?} does not match with the last checkpoint version in txns_to_commit {:?}", - latest_checkpoint_version, - first_version + idx as u64 - ); - let mut sharded_state_updates = create_empty_sharded_state_updates(); - sharded_state_updates.par_iter_mut().enumerate().for_each( - |(shard_id, state_updates_shard)| { - txns_to_commit[..=idx].iter().for_each(|txn_to_commit| { - state_updates_shard.extend(txn_to_commit.state_updates()[shard_id].clone()); - }) - }, - ); - return Some(sharded_state_updates); - } - } - - None -} - /// Test only methods for the DB impl AptosDB { pub fn save_transactions_for_test( @@ -138,23 +110,126 @@ impl AptosDB { sync_commit: bool, latest_in_memory_state: &StateDelta, ) -> Result<()> { - let state_updates_until_last_checkpoint = gather_state_updates_until_last_checkpoint( - first_version, - latest_in_memory_state, + let chunk = ChunkToCommitOwned::from_test_txns_to_commit( txns_to_commit, - ); - let chunk = ChunkToCommit { first_version, base_state_version, - txns_to_commit, latest_in_memory_state, - state_updates_until_last_checkpoint: state_updates_until_last_checkpoint.as_ref(), - sharded_state_cache: None, - }; + ); self.save_transactions( - chunk, + chunk.as_ref(), ledger_info_with_sigs, sync_commit, ) } } + +pub struct ChunkToCommitOwned { + first_version: Version, + transactions: Vec, + transaction_outputs: Vec, + transaction_infos: Vec, + base_state_version: Option, + latest_in_memory_state: Arc, + per_version_state_updates: Vec, + state_updates_until_last_checkpoint: Option, + sharded_state_cache: Option, + is_reconfig: bool, +} + +impl ChunkToCommitOwned { + pub fn from_test_txns_to_commit( + txns_to_commit: &[TransactionToCommit], + first_version: Version, + base_state_version: Option, + latest_in_memory_state: &StateDelta, + ) -> Self { + let (transactions, transaction_outputs, transaction_infos, per_version_state_updates) = Self::disassemble_txns_to_commit(txns_to_commit); + + let state_updates_until_last_checkpoint = Self::gather_state_updates_until_last_checkpoint( + first_version, + latest_in_memory_state, + &per_version_state_updates, + &transaction_infos, + ); + + Self { + first_version, + transactions, + transaction_outputs, + transaction_infos, + base_state_version, + latest_in_memory_state: Arc::new(latest_in_memory_state.clone()), + per_version_state_updates, + state_updates_until_last_checkpoint, + sharded_state_cache: None, + is_reconfig: false, + } + } + + pub fn as_ref(&self) -> ChunkToCommit { + ChunkToCommit { + first_version: self.first_version, + transactions: &self.transactions, + transaction_outputs: &self.transaction_outputs, + transaction_infos: &self.transaction_infos, + base_state_version: self.base_state_version, + latest_in_memory_state: &self.latest_in_memory_state, + per_version_state_updates: &self.per_version_state_updates, + state_updates_until_last_checkpoint: self.state_updates_until_last_checkpoint.as_ref(), + sharded_state_cache: self.sharded_state_cache.as_ref(), + is_reconfig: self.is_reconfig, + } + } + + fn disassemble_txns_to_commit(txns_to_commit: &[TransactionToCommit]) -> ( + Vec, Vec, Vec, Vec, + ) { + txns_to_commit.iter().map(|txn_to_commit| { + let TransactionToCommit { + transaction, transaction_info, state_updates, write_set, events, is_reconfig: _, transaction_auxiliary_data + } = txn_to_commit; + + let transaction_output = TransactionOutput::new( + write_set.clone(), + events.clone(), + transaction_info.gas_used(), + TransactionStatus::Keep(transaction_info.status().clone()), + transaction_auxiliary_data.clone(), + ); + + (transaction.clone(), transaction_output, transaction_info.clone(), state_updates.clone()) + }).multiunzip() + } + + pub fn gather_state_updates_until_last_checkpoint( + first_version: Version, + latest_in_memory_state: &StateDelta, + per_version_state_updates: &[ShardedStateUpdates], + transaction_infos: &[TransactionInfo], + ) -> Option { + if let Some(latest_checkpoint_version) = latest_in_memory_state.base_version { + if latest_checkpoint_version >= first_version { + let idx = (latest_checkpoint_version - first_version) as usize; + assert!( + transaction_infos[idx].state_checkpoint_hash().is_some(), + "The new latest snapshot version passed in {:?} does not match with the last checkpoint version in txns_to_commit {:?}", + latest_checkpoint_version, + first_version + idx as u64 + ); + let mut sharded_state_updates = create_empty_sharded_state_updates(); + sharded_state_updates.par_iter_mut().enumerate().for_each( + |(shard_id, state_updates_shard)| { + per_version_state_updates[..=idx].iter().for_each(|updates| { + state_updates_shard.extend(updates[shard_id].clone()); + }) + }, + ); + return Some(sharded_state_updates); + } + } + + None + } + +} diff --git a/storage/aptosdb/src/db/include/aptosdb_writer.rs b/storage/aptosdb/src/db/include/aptosdb_writer.rs index dadffda470ce0..56b8fb7373f82 100644 --- a/storage/aptosdb/src/db/include/aptosdb_writer.rs +++ b/storage/aptosdb/src/db/include/aptosdb_writer.rs @@ -21,30 +21,11 @@ impl DbWriter for AptosDB { .expect("Concurrent committing detected."); let _timer = OTHER_TIMERS_SECONDS.timer_with(&["pre_commit_ledger"]); - let ChunkToCommit { - txns_to_commit, - first_version, - base_state_version, - state_updates_until_last_checkpoint, - latest_in_memory_state, - sharded_state_cache, - } = chunk; - - latest_in_memory_state.current.log_generation("db_save"); - - self.pre_commit_validation( - txns_to_commit, - first_version, - base_state_version, - latest_in_memory_state, - )?; - let last_version = first_version + txns_to_commit.len() as u64 - 1; + chunk.latest_in_memory_state.current.log_generation("db_save"); + self.pre_commit_validation(&chunk)?; let _new_root_hash = self.calculate_and_commit_ledger_and_state_kv( - txns_to_commit, - first_version, - latest_in_memory_state.current.usage(), - sharded_state_cache, + &chunk, self.skip_index_and_usage, )?; @@ -54,12 +35,12 @@ impl DbWriter for AptosDB { let _timer = OTHER_TIMERS_SECONDS.timer_with(&["buffered_state___update"]); buffered_state.update( - state_updates_until_last_checkpoint, - latest_in_memory_state, - sync_commit || txns_to_commit.last().unwrap().is_reconfig(), + chunk.state_updates_until_last_checkpoint, + chunk.latest_in_memory_state, + sync_commit || chunk.is_reconfig, )?; } - self.ledger_db.metadata_db().set_pre_committed_version(last_version); + self.ledger_db.metadata_db().set_pre_committed_version(chunk.expect_last_version()); Ok(()) }) } @@ -68,7 +49,7 @@ impl DbWriter for AptosDB { &self, version: Version, ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>, - txns_to_commit: Option<&[TransactionToCommit]>, + chunk_opt: Option, ) -> Result<()> { gauged_api("commit_ledger", || { // Pre-committing and committing in concurrency is allowed but not pre-committing at the @@ -100,7 +81,7 @@ impl DbWriter for AptosDB { old_committed_ver, version, ledger_info_with_sigs, - txns_to_commit, + chunk_opt, ) }) } @@ -238,35 +219,30 @@ impl DbWriter for AptosDB { impl AptosDB { fn pre_commit_validation( &self, - txns_to_commit: &[TransactionToCommit], - first_version: Version, - base_state_version: Option, - latest_in_memory_state: &StateDelta, + chunk: &ChunkToCommit, ) -> Result<()> { let _timer = OTHER_TIMERS_SECONDS .with_label_values(&["save_transactions_validation"]) .start_timer(); - let num_txns = txns_to_commit.len() as u64; ensure!( - num_txns > 0, - "txns_to_commit is empty, nothing to save.", + !chunk.is_empty(), + "chunk is empty, nothing to save.", ); - let last_version = first_version + num_txns - 1; ensure!( - Some(last_version) == latest_in_memory_state.current_version, + Some(chunk.expect_last_version()) == chunk.latest_in_memory_state.current_version, "the last_version {:?} to commit doesn't match the current_version {:?} in latest_in_memory_state", - last_version, - latest_in_memory_state.current_version.expect("Must exist"), + chunk.expect_last_version(), + chunk.latest_in_memory_state.current_version.expect("Must exist"), ); let num_transactions_in_db = self.get_pre_committed_version()?.map_or(0, |v| v + 1); { let buffered_state = self.state_store.buffered_state().lock(); ensure!( - base_state_version == buffered_state.current_state().base_version, + chunk.base_state_version == buffered_state.current_state().base_version, "base_state_version {:?} does not equal to the base_version {:?} in buffered state with current version {:?}", - base_state_version, + chunk.base_state_version, buffered_state.current_state().base_version, buffered_state.current_state().current_version, ); @@ -278,9 +254,9 @@ impl AptosDB { .current_version .map(|version| version + 1) .unwrap_or(0); - ensure!(num_transactions_in_db == first_version && num_transactions_in_db == next_version_in_buffered_state, + ensure!(num_transactions_in_db == chunk.first_version && num_transactions_in_db == next_version_in_buffered_state, "The first version passed in ({}), the next version in buffered state ({}) and the next version in db ({}) are inconsistent.", - first_version, + chunk.first_version, next_version_in_buffered_state, num_transactions_in_db, ); @@ -291,10 +267,7 @@ impl AptosDB { fn calculate_and_commit_ledger_and_state_kv( &self, - txns_to_commit: &[TransactionToCommit], - first_version: Version, - expected_state_db_usage: StateStorageUsage, - sharded_state_cache: Option<&ShardedStateCache>, + chunk: &ChunkToCommit, skip_index_and_usage: bool, ) -> Result { let _timer = OTHER_TIMERS_SECONDS @@ -307,42 +280,44 @@ impl AptosDB { // // TODO(grao): Consider propagating the error instead of panic, if necessary. s.spawn(|_| { - self.commit_events(txns_to_commit, first_version, skip_index_and_usage) + self.commit_events(chunk.first_version, chunk.transaction_outputs, skip_index_and_usage) .unwrap() }); s.spawn(|_| { self.ledger_db .write_set_db() - .commit_write_sets(txns_to_commit, first_version) + .commit_write_sets( + chunk.first_version, + chunk.transaction_outputs.par_iter().map(TransactionOutput::write_set) + ) .unwrap() }); s.spawn(|_| { self.ledger_db .transaction_db() - .commit_transactions(txns_to_commit, first_version, skip_index_and_usage) + .commit_transactions(chunk.first_version, chunk.transactions, skip_index_and_usage) .unwrap() }); s.spawn(|_| { self.commit_state_kv_and_ledger_metadata( - txns_to_commit, - first_version, - expected_state_db_usage, - sharded_state_cache, + chunk, skip_index_and_usage, ) .unwrap() }); s.spawn(|_| { - self.commit_transaction_infos(txns_to_commit, first_version) + self.commit_transaction_infos(chunk.first_version, chunk.transaction_infos) .unwrap() }); s.spawn(|_| { new_root_hash = self - .commit_transaction_accumulator(txns_to_commit, first_version) + .commit_transaction_accumulator(chunk.first_version, chunk.transaction_infos) .unwrap() }); s.spawn(|_| { - self.commit_transaction_auxiliary_data(txns_to_commit, first_version) + self.commit_transaction_auxiliary_data( + chunk.first_version, + chunk.transaction_outputs.iter().map(TransactionOutput::auxiliary_data)) .unwrap() }); }); @@ -352,22 +327,15 @@ impl AptosDB { fn commit_state_kv_and_ledger_metadata( &self, - txns_to_commit: &[TransactionToCommit], - first_version: Version, - expected_state_db_usage: StateStorageUsage, - sharded_state_cache: Option<&ShardedStateCache>, + chunk: &ChunkToCommit, skip_index_and_usage: bool, ) -> Result<()> { - if txns_to_commit.is_empty() { + if chunk.is_empty() { return Ok(()); } let _timer = OTHER_TIMERS_SECONDS .with_label_values(&["commit_state_kv_and_ledger_metadata"]) .start_timer(); - let state_updates_vec = txns_to_commit - .iter() - .map(|txn_to_commit| txn_to_commit.state_updates()) - .collect::>(); let ledger_metadata_batch = SchemaBatch::new(); let sharded_state_kv_batches = new_sharded_kv_schema_batch(); @@ -375,28 +343,28 @@ impl AptosDB { // TODO(grao): Make state_store take sharded state updates. self.state_store.put_value_sets( - state_updates_vec, - first_version, - expected_state_db_usage, - sharded_state_cache, + chunk.per_version_state_updates, + chunk.first_version, + chunk.latest_in_memory_state.current.usage(), + chunk.sharded_state_cache, &ledger_metadata_batch, &sharded_state_kv_batches, // Always put in state value index for now. // TODO(grao): remove after APIs migrated off the DB to the indexer. self.state_store.state_kv_db.enabled_sharding(), skip_index_and_usage, - txns_to_commit + chunk.transaction_infos .iter() - .rposition(|txn| txn.has_state_checkpoint_hash()), + .rposition(|t| t.state_checkpoint_hash().is_some()), )?; // Write block index if event index is skipped. if skip_index_and_usage { - for (i, txn) in txns_to_commit.iter().enumerate() { - for event in txn.events() { + for (i, txn_out) in chunk.transaction_outputs.iter().enumerate() { + for event in txn_out.events() { if let Some(event_key) = event.event_key() { if *event_key == new_block_event_key() { - let version = first_version + i as Version; + let version = chunk.first_version + i as Version; LedgerMetadataDb::put_block_info( version, event, @@ -408,11 +376,10 @@ impl AptosDB { } } - let last_version = first_version + txns_to_commit.len() as u64 - 1; ledger_metadata_batch .put::( &DbMetadataKey::LedgerCommitProgress, - &DbMetadataValue::Version(last_version), + &DbMetadataValue::Version(chunk.expect_last_version()), ) .unwrap(); @@ -429,7 +396,7 @@ impl AptosDB { s.spawn(|_| { self.state_kv_db .commit( - last_version, + chunk.expect_last_version(), state_kv_metadata_batch, sharded_state_kv_batches, ) @@ -442,23 +409,22 @@ impl AptosDB { fn commit_events( &self, - txns_to_commit: &[TransactionToCommit], first_version: Version, + transaction_outputs: &[TransactionOutput], skip_index: bool, ) -> Result<()> { let _timer = OTHER_TIMERS_SECONDS .with_label_values(&["commit_events"]) .start_timer(); let batch = SchemaBatch::new(); - let num_txns = txns_to_commit.len(); - txns_to_commit + transaction_outputs .par_iter() - .with_min_len(optimal_min_len(num_txns, 128)) + .with_min_len(optimal_min_len(transaction_outputs.len(), 128)) .enumerate() - .try_for_each(|(i, txn_to_commit)| -> Result<()> { + .try_for_each(|(i, txn_out)| -> Result<()> { self.ledger_db.event_db().put_events( - first_version + i as u64, - txn_to_commit.events(), + first_version + i as Version, + txn_out.events(), skip_index, &batch, )?; @@ -473,23 +439,22 @@ impl AptosDB { fn commit_transaction_accumulator( &self, - txns_to_commit: &[TransactionToCommit], - first_version: u64, + first_version: Version, + transaction_infos: &[TransactionInfo], ) -> Result { let _timer = OTHER_TIMERS_SECONDS .with_label_values(&["commit_transaction_accumulator"]) .start_timer(); + let num_txns = transaction_infos.len() as Version; + let batch = SchemaBatch::new(); let root_hash = self .ledger_db .transaction_accumulator_db() .put_transaction_accumulator( first_version, - &txns_to_commit - .iter() - .map(|txn_to_commit| txn_to_commit.transaction_info()) - .collect::>(), + transaction_infos, &batch, )?; @@ -502,7 +467,7 @@ impl AptosDB { let batch = SchemaBatch::new(); let all_versions: Vec<_> = - (first_version..first_version + txns_to_commit.len() as u64).collect(); + (first_version..first_version + num_txns).collect(); THREAD_MANAGER .get_non_exe_cpu_pool() .install(|| -> Result<()> { @@ -530,23 +495,23 @@ impl AptosDB { Ok(root_hash) } - fn commit_transaction_auxiliary_data( + fn commit_transaction_auxiliary_data<'a>( &self, - txns_to_commit: &[TransactionToCommit], - first_version: u64, + first_version: Version, + auxiliary_data: impl IntoIterator, ) -> Result<()> { let _timer = OTHER_TIMERS_SECONDS .with_label_values(&["commit_transaction_auxiliary_data"]) .start_timer(); let batch = SchemaBatch::new(); - txns_to_commit - .iter() + auxiliary_data + .into_iter() .enumerate() - .try_for_each(|(i, txn_to_commit)| -> Result<()> { + .try_for_each(|(i, aux_data)| -> Result<()> { TransactionAuxiliaryDataDb::put_transaction_auxiliary_data( - first_version + i as u64, - txn_to_commit.transaction_auxiliary_data(), + first_version + i as Version, + aux_data, &batch, )?; @@ -563,23 +528,22 @@ impl AptosDB { fn commit_transaction_infos( &self, - txns_to_commit: &[TransactionToCommit], - first_version: u64, + first_version: Version, + txn_infos: &[TransactionInfo], ) -> Result<()> { let _timer = OTHER_TIMERS_SECONDS .with_label_values(&["commit_transaction_infos"]) .start_timer(); let batch = SchemaBatch::new(); - let num_txns = txns_to_commit.len(); - txns_to_commit + txn_infos .par_iter() - .with_min_len(optimal_min_len(num_txns, 128)) + .with_min_len(optimal_min_len(txn_infos.len(), 128)) .enumerate() - .try_for_each(|(i, txn_to_commit)| -> Result<()> { + .try_for_each(|(i, txn_info)| -> Result<()> { let version = first_version + i as u64; TransactionInfoDb::put_transaction_info( version, - txn_to_commit.transaction_info(), + txn_info, &batch, )?; @@ -664,7 +628,7 @@ impl AptosDB { old_committed_version: Option, version: Version, ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>, - txns_to_commit: Option<&[TransactionToCommit]>, + chunk_opt: Option, ) -> Result<()> { // If commit succeeds and there are at least one transaction written to the storage, we // will inform the pruner thread to work. @@ -696,8 +660,8 @@ impl AptosDB { let _timer = OTHER_TIMERS_SECONDS.timer_with(&["indexer_index"]); // n.b. txns_to_commit can be partial, when the control was handed over from consensus to state sync // where state sync won't send the pre-committed part to the DB again. - if txns_to_commit.is_some() && txns_to_commit.unwrap().len() == num_txns as usize { - let write_sets = txns_to_commit.unwrap().iter().map(|txn| txn.write_set()).collect_vec(); + if chunk_opt.is_some() && chunk_opt.as_ref().unwrap().len() == num_txns as usize { + let write_sets = chunk_opt.unwrap().transaction_outputs.iter().map(|t| t.write_set()).collect_vec(); indexer.index(self.state_store.clone(), first_version, &write_sets)?; } else { let write_sets: Vec<_> = self.ledger_db.write_set_db().get_write_set_iter(first_version, num_txns as usize)?.try_collect()?; diff --git a/storage/aptosdb/src/db/mod.rs b/storage/aptosdb/src/db/mod.rs index 2608e7ed52707..2a11b754f5c8f 100644 --- a/storage/aptosdb/src/db/mod.rs +++ b/storage/aptosdb/src/db/mod.rs @@ -82,6 +82,7 @@ use std::{ time::Instant, }; use tokio::sync::watch::Sender; + #[cfg(test)] mod aptosdb_test; #[cfg(any(test, feature = "fuzzing"))] diff --git a/storage/aptosdb/src/db/test_helper.rs b/storage/aptosdb/src/db/test_helper.rs index 058447ad5209c..353b19bbf6f00 100644 --- a/storage/aptosdb/src/db/test_helper.rs +++ b/storage/aptosdb/src/db/test_helper.rs @@ -3,7 +3,6 @@ // SPDX-License-Identifier: Apache-2.0 //! This module provides reusable helpers in tests. -use super::gather_state_updates_until_last_checkpoint; #[cfg(test)] use crate::state_store::StateStore; #[cfg(test)] @@ -19,9 +18,7 @@ use aptos_executor_types::ProofReader; use aptos_jellyfish_merkle::node_type::{Node, NodeKey}; #[cfg(test)] use aptos_schemadb::SchemaBatch; -use aptos_storage_interface::{ - chunk_to_commit::ChunkToCommit, state_delta::StateDelta, DbReader, DbWriter, Order, Result, -}; +use aptos_storage_interface::{state_delta::StateDelta, DbReader, Order, Result}; use aptos_temppath::TempPath; #[cfg(test)] use aptos_types::state_store::state_storage_usage::StateStorageUsage; @@ -91,7 +88,7 @@ pub(crate) fn update_store( let schema_batch = SchemaBatch::new(); store .put_value_sets( - vec![&sharded_value_state_set], + &[sharded_value_state_set], version, StateStorageUsage::new_untracked(), None, @@ -918,14 +915,8 @@ pub(crate) fn put_transaction_infos( version: Version, txn_infos: &[TransactionInfo], ) -> HashValue { - let txns_to_commit: Vec<_> = txn_infos - .iter() - .cloned() - .map(TransactionToCommit::dummy_with_transaction_info) - .collect(); - db.commit_transaction_infos(&txns_to_commit, version) - .unwrap(); - db.commit_transaction_accumulator(&txns_to_commit, version) + db.commit_transaction_infos(version, txn_infos).unwrap(); + db.commit_transaction_accumulator(version, txn_infos) .unwrap() } @@ -935,12 +926,7 @@ pub(crate) fn put_transaction_auxiliary_data( version: Version, auxiliary_data: &[TransactionAuxiliaryData], ) { - let txns_to_commit: Vec<_> = auxiliary_data - .iter() - .cloned() - .map(TransactionToCommit::dummy_with_transaction_auxiliary_data) - .collect(); - db.commit_transaction_auxiliary_data(&txns_to_commit, version) + db.commit_transaction_auxiliary_data(version, auxiliary_data) .unwrap(); } @@ -1001,39 +987,26 @@ pub fn test_sync_transactions_impl( if batch1_len > 0 { let txns_to_commit_batch = &txns_to_commit[..batch1_len]; update_in_memory_state(&mut in_memory_state, txns_to_commit_batch); - let state_updates = gather_state_updates_until_last_checkpoint( - cur_ver, - &in_memory_state, + db.save_transactions_for_test( txns_to_commit_batch, - ); - let chunk = ChunkToCommit { - first_version: cur_ver, + cur_ver, /* first_version */ base_state_version, - txns_to_commit: txns_to_commit_batch, - latest_in_memory_state: &in_memory_state, - state_updates_until_last_checkpoint: state_updates.as_ref(), - sharded_state_cache: None, - }; - db.save_transactions(chunk, None, false /* sync_commit */) - .unwrap(); + None, /* ledger_info_with_sigs */ + false, /* sync_commit */ + &in_memory_state, + ) + .unwrap(); } let ver = cur_ver + batch1_len as Version; let txns_to_commit_batch = &txns_to_commit[batch1_len..]; update_in_memory_state(&mut in_memory_state, txns_to_commit_batch); - let state_updates = - gather_state_updates_until_last_checkpoint(ver, &in_memory_state, txns_to_commit_batch); - let chunk = ChunkToCommit { - first_version: ver, + db.save_transactions_for_test( + txns_to_commit_batch, + ver, base_state_version, - txns_to_commit: txns_to_commit_batch, - latest_in_memory_state: &in_memory_state, - state_updates_until_last_checkpoint: state_updates.as_ref(), - sharded_state_cache: None, - }; - db.save_transactions( - chunk, Some(ledger_info_with_sigs), false, /* sync_commit */ + &in_memory_state, ) .unwrap(); diff --git a/storage/aptosdb/src/fast_sync_storage_wrapper.rs b/storage/aptosdb/src/fast_sync_storage_wrapper.rs index 8530b34586a0b..e58099c0bb31d 100644 --- a/storage/aptosdb/src/fast_sync_storage_wrapper.rs +++ b/storage/aptosdb/src/fast_sync_storage_wrapper.rs @@ -13,7 +13,7 @@ use aptos_storage_interface::{ use aptos_types::{ ledger_info::LedgerInfoWithSignatures, state_store::{state_key::StateKey, state_value::StateValue}, - transaction::{TransactionOutputListWithProof, TransactionToCommit, Version}, + transaction::{TransactionOutputListWithProof, Version}, }; use either::Either; use std::sync::Arc; @@ -40,7 +40,7 @@ pub struct FastSyncStorageWrapper { impl FastSyncStorageWrapper { /// If the db is empty and configured to do fast sync, we return a FastSyncStorageWrapper - /// Otherwise, we returns AptosDB directly and the FastSyncStorageWrapper is None + /// Otherwise, we return AptosDB directly and the FastSyncStorageWrapper is None pub fn initialize_dbs( config: &NodeConfig, internal_indexer_db: Option, @@ -177,10 +177,10 @@ impl DbWriter for FastSyncStorageWrapper { &self, version: Version, ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>, - txns_to_commit: Option<&[TransactionToCommit]>, + chunk_opt: Option, ) -> Result<()> { self.get_aptos_db_write_ref() - .commit_ledger(version, ledger_info_with_sigs, txns_to_commit) + .commit_ledger(version, ledger_info_with_sigs, chunk_opt) } } diff --git a/storage/aptosdb/src/ledger_db/transaction_db.rs b/storage/aptosdb/src/ledger_db/transaction_db.rs index f2f3b48b26641..beaefa935740d 100644 --- a/storage/aptosdb/src/ledger_db/transaction_db.rs +++ b/storage/aptosdb/src/ledger_db/transaction_db.rs @@ -14,7 +14,7 @@ use aptos_crypto::hash::{CryptoHash, HashValue}; use aptos_db_indexer_schemas::schema::transaction_by_account::TransactionByAccountSchema; use aptos_schemadb::{SchemaBatch, DB}; use aptos_storage_interface::{AptosDbError, Result}; -use aptos_types::transaction::{Transaction, TransactionToCommit, Version}; +use aptos_types::transaction::{Transaction, Version}; use rayon::prelude::*; use std::{path::Path, sync::Arc}; @@ -79,32 +79,33 @@ impl TransactionDb { pub(crate) fn commit_transactions( &self, - txns_to_commit: &[TransactionToCommit], first_version: Version, + transactions: &[Transaction], skip_index: bool, ) -> Result<()> { let _timer = OTHER_TIMERS_SECONDS .with_label_values(&["commit_transactions"]) .start_timer(); let chunk_size = 512; - let batches = txns_to_commit + let batches = transactions .par_chunks(chunk_size) .enumerate() .map(|(chunk_index, txns_in_chunk)| -> Result { let batch = SchemaBatch::new(); let chunk_first_version = first_version + (chunk_size * chunk_index) as u64; - txns_in_chunk.iter().enumerate().try_for_each( - |(i, txn_to_commit)| -> Result<()> { + txns_in_chunk + .iter() + .enumerate() + .try_for_each(|(i, txn)| -> Result<()> { self.put_transaction( chunk_first_version + i as u64, - txn_to_commit.transaction(), + txn, skip_index, &batch, )?; Ok(()) - }, - )?; + })?; Ok(batch) }) .collect::>>()?; diff --git a/storage/aptosdb/src/ledger_db/transaction_db_test.rs b/storage/aptosdb/src/ledger_db/transaction_db_test.rs index 3f628925a469d..8370856b2bace 100644 --- a/storage/aptosdb/src/ledger_db/transaction_db_test.rs +++ b/storage/aptosdb/src/ledger_db/transaction_db_test.rs @@ -9,7 +9,7 @@ use aptos_storage_interface::Result; use aptos_temppath::TempPath; use aptos_types::{ proptest_types::{AccountInfoUniverse, SignatureCheckedTransactionGen}, - transaction::{Transaction, TransactionToCommit, Version}, + transaction::{Transaction, Version}, }; use proptest::{collection::vec, prelude::*}; @@ -145,17 +145,7 @@ pub(crate) fn init_db( assert!(transaction_db.get_transaction(0).is_err()); transaction_db - .commit_transactions( - &txns - .iter() - .map(|transaction| TransactionToCommit { - transaction: transaction.clone(), - ..TransactionToCommit::dummy() - }) - .collect::>(), - 0, - /*skip_index=*/ false, - ) + .commit_transactions(0, &txns, /*skip_index=*/ false) .unwrap(); txns diff --git a/storage/aptosdb/src/ledger_db/write_set_db.rs b/storage/aptosdb/src/ledger_db/write_set_db.rs index 9320e6fb5b19a..a37b099d2bac7 100644 --- a/storage/aptosdb/src/ledger_db/write_set_db.rs +++ b/storage/aptosdb/src/ledger_db/write_set_db.rs @@ -12,10 +12,7 @@ use crate::{ use aptos_experimental_runtimes::thread_manager::optimal_min_len; use aptos_schemadb::{SchemaBatch, DB}; use aptos_storage_interface::{db_ensure as ensure, AptosDbError, Result}; -use aptos_types::{ - transaction::{TransactionToCommit, Version}, - write_set::WriteSet, -}; +use aptos_types::{transaction::Version, write_set::WriteSet}; use rayon::prelude::*; use std::{path::Path, sync::Arc}; @@ -107,22 +104,22 @@ impl WriteSetDb { } /// Commits write sets starting from `first_version` to the database. - pub(crate) fn commit_write_sets( + pub(crate) fn commit_write_sets<'a>( &self, - txns_to_commit: &[TransactionToCommit], first_version: Version, + write_sets: impl IndexedParallelIterator, ) -> Result<()> { let _timer = OTHER_TIMERS_SECONDS .with_label_values(&["commit_write_sets"]) .start_timer(); let batch = SchemaBatch::new(); - let num_txns = txns_to_commit.len(); - txns_to_commit - .par_iter() + let num_txns = write_sets.len(); + + write_sets .with_min_len(optimal_min_len(num_txns, 128)) .enumerate() - .try_for_each(|(i, txn_to_commit)| -> Result<()> { - Self::put_write_set(first_version + i as u64, txn_to_commit.write_set(), &batch)?; + .try_for_each(|(i, write_set)| -> Result<()> { + Self::put_write_set(first_version + i as Version, write_set, &batch)?; Ok(()) })?; diff --git a/storage/aptosdb/src/ledger_db/write_set_db_test.rs b/storage/aptosdb/src/ledger_db/write_set_db_test.rs index 44c4e9d38870a..e0140b246f847 100644 --- a/storage/aptosdb/src/ledger_db/write_set_db_test.rs +++ b/storage/aptosdb/src/ledger_db/write_set_db_test.rs @@ -5,11 +5,9 @@ use crate::{ledger_db::WriteSetDb, AptosDB}; use aptos_schemadb::SchemaBatch; use aptos_storage_interface::Result; use aptos_temppath::TempPath; -use aptos_types::{ - transaction::{TransactionToCommit, Version}, - write_set::WriteSet, -}; +use aptos_types::{transaction::Version, write_set::WriteSet}; use proptest::{collection::vec, prelude::*}; +use rayon::prelude::*; proptest! { #![proptest_config(ProptestConfig::with_cases(10))] @@ -113,15 +111,6 @@ fn init_db(write_sets: &[WriteSet], write_set_db: &WriteSetDb) { assert!(write_set_db.get_write_set(0).is_err()); write_set_db - .commit_write_sets( - &write_sets - .iter() - .map(|write_set| TransactionToCommit { - write_set: write_set.clone(), - ..TransactionToCommit::dummy() - }) - .collect::>(), - 0, - ) + .commit_write_sets(0, write_sets.par_iter()) .unwrap(); } diff --git a/storage/aptosdb/src/pruner/state_merkle_pruner/test.rs b/storage/aptosdb/src/pruner/state_merkle_pruner/test.rs index edf59efde854f..67e9cca52c185 100644 --- a/storage/aptosdb/src/pruner/state_merkle_pruner/test.rs +++ b/storage/aptosdb/src/pruner/state_merkle_pruner/test.rs @@ -63,7 +63,7 @@ fn put_value_set( let enable_sharding = state_store.state_kv_db.enabled_sharding(); state_store .put_value_sets( - vec![&sharded_value_set], + &[sharded_value_set], version, StateStorageUsage::new_untracked(), None, diff --git a/storage/aptosdb/src/state_store/mod.rs b/storage/aptosdb/src/state_store/mod.rs index 6196ad2e73e5d..a62226d3e6328 100644 --- a/storage/aptosdb/src/state_store/mod.rs +++ b/storage/aptosdb/src/state_store/mod.rs @@ -614,8 +614,7 @@ impl StateStore { .with_label_values(&["put_writesets"]) .start_timer(); - // convert value state sets to hash map reference - let value_state_sets_raw: Vec = write_sets + let value_state_sets: Vec = write_sets .iter() .map(|ws| { let mut sharded_state_updates = create_empty_sharded_state_updates(); @@ -627,10 +626,8 @@ impl StateStore { }) .collect::>(); - let value_state_sets = value_state_sets_raw.iter().collect::>(); - self.put_stats_and_indices( - value_state_sets.as_slice(), + &value_state_sets, first_version, StateStorageUsage::new_untracked(), None, @@ -642,7 +639,7 @@ impl StateStore { )?; self.put_state_values( - value_state_sets.to_vec(), + &value_state_sets, first_version, sharded_state_kv_batches, enable_sharding, @@ -654,7 +651,7 @@ impl StateStore { /// Put the `value_state_sets` into its own CF. pub fn put_value_sets( &self, - value_state_sets: Vec<&ShardedStateUpdates>, + value_state_sets: &[ShardedStateUpdates], first_version: Version, expected_usage: StateStorageUsage, sharded_state_cache: Option<&ShardedStateCache>, @@ -669,7 +666,7 @@ impl StateStore { .start_timer(); self.put_stats_and_indices( - &value_state_sets, + value_state_sets, first_version, expected_usage, sharded_state_cache, @@ -694,7 +691,7 @@ impl StateStore { pub fn put_state_values( &self, - value_state_sets: Vec<&ShardedStateUpdates>, + value_state_sets: &[ShardedStateUpdates], first_version: Version, sharded_state_kv_batches: &ShardedStateKvSchemaBatch, enable_sharding: bool, @@ -743,7 +740,7 @@ impl StateStore { /// extra stale index as 1 cover the latter case. pub fn put_stats_and_indices( &self, - value_state_sets: &[&ShardedStateUpdates], + value_state_sets: &[ShardedStateUpdates], first_version: Version, expected_usage: StateStorageUsage, // If not None, it must contains all keys in the value_state_sets. diff --git a/storage/aptosdb/src/state_store/state_store_test.rs b/storage/aptosdb/src/state_store/state_store_test.rs index 97f5727a8b431..dfb2abb842c5b 100644 --- a/storage/aptosdb/src/state_store/state_store_test.rs +++ b/storage/aptosdb/src/state_store/state_store_test.rs @@ -53,7 +53,7 @@ fn put_value_set( let state_kv_metadata_batch = SchemaBatch::new(); state_store .put_value_sets( - vec![&sharded_value_set], + &[sharded_value_set], version, StateStorageUsage::new_untracked(), None, diff --git a/storage/storage-interface/src/chunk_to_commit.rs b/storage/storage-interface/src/chunk_to_commit.rs index ef1bd38a81771..67c4e778741e4 100644 --- a/storage/storage-interface/src/chunk_to_commit.rs +++ b/storage/storage-interface/src/chunk_to_commit.rs @@ -4,15 +4,37 @@ use crate::{cached_state_view::ShardedStateCache, state_delta::StateDelta}; use aptos_types::{ state_store::ShardedStateUpdates, - transaction::{TransactionToCommit, Version}, + transaction::{Transaction, TransactionInfo, TransactionOutput, Version}, }; #[derive(Copy, Clone)] pub struct ChunkToCommit<'a> { pub first_version: Version, + pub transactions: &'a [Transaction], + pub transaction_outputs: &'a [TransactionOutput], + pub transaction_infos: &'a [TransactionInfo], pub base_state_version: Option, - pub txns_to_commit: &'a [TransactionToCommit], pub latest_in_memory_state: &'a StateDelta, + pub per_version_state_updates: &'a [ShardedStateUpdates], pub state_updates_until_last_checkpoint: Option<&'a ShardedStateUpdates>, pub sharded_state_cache: Option<&'a ShardedStateCache>, + pub is_reconfig: bool, +} + +impl<'a> ChunkToCommit<'a> { + pub fn len(&self) -> usize { + self.transactions.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn next_version(&self) -> Version { + self.first_version + self.len() as Version + } + + pub fn expect_last_version(&self) -> Version { + self.next_version() - 1 + } } diff --git a/storage/storage-interface/src/lib.rs b/storage/storage-interface/src/lib.rs index e2fd5d8abab02..e99b1e017f0dd 100644 --- a/storage/storage-interface/src/lib.rs +++ b/storage/storage-interface/src/lib.rs @@ -546,24 +546,19 @@ pub trait DbWriter: Send + Sync { sync_commit: bool, ) -> Result<()> { // For reconfig suffix. - if ledger_info_with_sigs.is_none() && chunk.txns_to_commit.is_empty() { + if ledger_info_with_sigs.is_none() && chunk.is_empty() { return Ok(()); } - if !chunk.txns_to_commit.is_empty() { + if !chunk.is_empty() { self.pre_commit_ledger(chunk, sync_commit)?; } let version_to_commit = if let Some(ledger_info_with_sigs) = ledger_info_with_sigs { ledger_info_with_sigs.ledger_info().version() } else { - // here txns_to_commit is known to be non-empty - chunk.first_version + chunk.txns_to_commit.len() as u64 - 1 + chunk.expect_last_version() }; - self.commit_ledger( - version_to_commit, - ledger_info_with_sigs, - Some(chunk.txns_to_commit), - ) + self.commit_ledger(version_to_commit, ledger_info_with_sigs, Some(chunk)) } /// Optimistically persist transactions to the ledger. @@ -587,7 +582,7 @@ pub trait DbWriter: Send + Sync { &self, version: Version, ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>, - txns_to_commit: Option<&[TransactionToCommit]>, + chunk_opt: Option, ) -> Result<()> { unimplemented!() } diff --git a/types/src/proptest_types.rs b/types/src/proptest_types.rs index d668580ab5aac..fb26fdf7ce988 100644 --- a/types/src/proptest_types.rs +++ b/types/src/proptest_types.rs @@ -1177,7 +1177,7 @@ impl BlockGen { Some(HashValue::random()), ExecutionStatus::Success, ), - arr_macro::arr![HashMap::new(); 16], + arr![HashMap::new(); 16], WriteSet::default(), Vec::new(), false,