From a48febfd2d6a941c29a1ce711caacedf7497fc27 Mon Sep 17 00:00:00 2001 From: aldenhu Date: Fri, 18 Oct 2024 22:12:13 +0000 Subject: [PATCH] Revert "avoid TransactionToCommit (#14999)" This reverts commit 79d5257bd6f458293ab4eafef82c8bade3898266. --- consensus/src/state_computer.rs | 5 +- .../src/ledger_update_output.rs | 59 +++-- .../src/parsed_transaction_output.rs | 7 +- .../src/state_compute_result.rs | 20 +- execution/executor/src/block_executor.rs | 7 +- execution/executor/src/chunk_executor.rs | 15 +- .../src/components/apply_chunk_output.rs | 235 +++++++++++++++++- .../src/components/do_ledger_update.rs | 205 --------------- .../executor/src/components/executed_chunk.rs | 8 +- execution/executor/src/components/mod.rs | 1 - execution/executor/src/db_bootstrapper.rs | 2 +- execution/executor/src/fuzzing.rs | 4 +- .../src/db/include/aptosdb_testonly.rs | 153 +++--------- .../aptosdb/src/db/include/aptosdb_writer.rs | 184 ++++++++------ storage/aptosdb/src/db/mod.rs | 1 - storage/aptosdb/src/db/test_helper.rs | 59 +++-- .../aptosdb/src/fast_sync_storage_wrapper.rs | 8 +- .../aptosdb/src/ledger_db/transaction_db.rs | 17 +- .../src/ledger_db/transaction_db_test.rs | 14 +- storage/aptosdb/src/ledger_db/write_set_db.rs | 19 +- .../src/ledger_db/write_set_db_test.rs | 17 +- .../src/pruner/state_merkle_pruner/test.rs | 2 +- storage/aptosdb/src/state_store/mod.rs | 17 +- .../src/state_store/state_store_test.rs | 2 +- .../storage-interface/src/chunk_to_commit.rs | 26 +- storage/storage-interface/src/lib.rs | 15 +- types/src/proptest_types.rs | 2 +- 27 files changed, 566 insertions(+), 538 deletions(-) delete mode 100644 execution/executor/src/components/do_ledger_update.rs diff --git a/consensus/src/state_computer.rs b/consensus/src/state_computer.rs index 94e5bc3e968ff..fb8230e013c00 100644 --- a/consensus/src/state_computer.rs +++ b/consensus/src/state_computer.rs @@ -144,10 +144,7 @@ impl ExecutionProxy { Box::pin(async move { pre_commit_notifier .send(Box::pin(async move { - let txns = state_compute_result - .ledger_update_output - .transactions - .clone(); + let txns = state_compute_result.transactions_to_commit(); let subscribable_events = state_compute_result.subscribable_events().to_vec(); if let Err(e) = monitor!( diff --git a/execution/executor-types/src/ledger_update_output.rs b/execution/executor-types/src/ledger_update_output.rs index 4187f0e80245a..d5f0f101c998c 100644 --- a/execution/executor-types/src/ledger_update_output.rs +++ b/execution/executor-types/src/ledger_update_output.rs @@ -12,8 +12,8 @@ use aptos_types::{ proof::accumulator::InMemoryTransactionAccumulator, state_store::ShardedStateUpdates, transaction::{ - block_epilogue::BlockEndInfo, Transaction, TransactionInfo, TransactionOutput, - TransactionStatus, Version, + block_epilogue::BlockEndInfo, TransactionInfo, TransactionStatus, TransactionToCommit, + Version, }, }; use derive_more::Deref; @@ -32,7 +32,7 @@ impl LedgerUpdateOutput { } #[cfg(any(test, feature = "fuzzing"))] - pub fn new_dummy_with_input_txns(txns: Vec) -> Self { + pub fn new_dummy_with_input_txns(txns: Vec) -> Self { Self::new_impl(Inner::new_dummy_with_input_txns(txns)) } @@ -46,10 +46,7 @@ impl LedgerUpdateOutput { pub fn new( statuses_for_input_txns: Vec, - transactions: Vec, - transaction_outputs: Vec, - transaction_infos: Vec, - per_version_state_updates: Vec, + to_commit: Vec, subscribable_events: Vec, transaction_info_hashes: Vec, state_updates_until_last_checkpoint: Option, @@ -60,10 +57,7 @@ impl LedgerUpdateOutput { ) -> Self { Self::new_impl(Inner { statuses_for_input_txns, - transactions, - transaction_outputs, - transaction_infos, - per_version_state_updates, + to_commit, subscribable_events, transaction_info_hashes, state_updates_until_last_checkpoint, @@ -84,10 +78,7 @@ impl LedgerUpdateOutput { #[derive(Default, Debug)] pub struct Inner { pub statuses_for_input_txns: Vec, - pub transactions: Vec, - pub transaction_outputs: Vec, - pub transaction_infos: Vec, - pub per_version_state_updates: Vec, + pub to_commit: Vec, pub subscribable_events: Vec, pub transaction_info_hashes: Vec, pub state_updates_until_last_checkpoint: Option, @@ -109,10 +100,17 @@ impl Inner { } #[cfg(any(test, feature = "fuzzing"))] - pub fn new_dummy_with_input_txns(transactions: Vec) -> Self { - let num_txns = transactions.len(); + pub fn new_dummy_with_input_txns(txns: Vec) -> Self { + let num_txns = txns.len(); + let to_commit = txns + .into_iter() + .chain(std::iter::once( + aptos_types::transaction::Transaction::StateCheckpoint(HashValue::zero()), + )) + .map(TransactionToCommit::dummy_with_transaction) + .collect(); Self { - transactions, + to_commit, statuses_for_input_txns: vec![ TransactionStatus::Keep( aptos_types::transaction::ExecutionStatus::Success @@ -138,15 +136,19 @@ impl Inner { &self.transaction_accumulator } + pub fn transactions_to_commit(&self) -> &Vec { + &self.to_commit + } + /// Ensure that every block committed by consensus ends with a state checkpoint. That can be /// one of the two cases: 1. a reconfiguration (txns in the proposed block after the txn caused /// the reconfiguration will be retried) 2. a Transaction::StateCheckpoint at the end of the /// block. pub fn ensure_ends_with_state_checkpoint(&self) -> Result<()> { ensure!( - self.transactions + self.to_commit .last() - .map_or(true, |t| t.is_non_reconfig_block_ending()), + .map_or(true, |txn| txn.transaction().is_non_reconfig_block_ending()), "Block not ending with a state checkpoint.", ); Ok(()) @@ -156,17 +158,20 @@ impl Inner { &self, transaction_infos: &[TransactionInfo], ) -> Result<()> { + let first_version = + self.transaction_accumulator.version() + 1 - self.to_commit.len() as Version; ensure!( - self.transaction_infos.len() == transaction_infos.len(), + self.transactions_to_commit().len() == transaction_infos.len(), "Lengths don't match. {} vs {}", - self.transaction_infos.len(), + self.transactions_to_commit().len(), transaction_infos.len(), ); - let mut version = self.first_version(); - for (txn_info, expected_txn_info) in - zip_eq(self.transaction_infos.iter(), transaction_infos.iter()) + let mut version = first_version; + for (txn_to_commit, expected_txn_info) in + zip_eq(self.to_commit.iter(), transaction_infos.iter()) { + let txn_info = txn_to_commit.transaction_info(); ensure!( txn_info == expected_txn_info, "Transaction infos don't match. version:{version}, txn_info:{txn_info}, expected_txn_info:{expected_txn_info}", @@ -181,10 +186,10 @@ impl Inner { } pub fn first_version(&self) -> Version { - self.parent_accumulator.num_leaves + self.transaction_accumulator.num_leaves() - self.to_commit.len() as Version } pub fn num_txns(&self) -> usize { - self.transactions.len() + self.to_commit.len() } } diff --git a/execution/executor-types/src/parsed_transaction_output.rs b/execution/executor-types/src/parsed_transaction_output.rs index ddf81e5a2873f..bce63c825ed1a 100644 --- a/execution/executor-types/src/parsed_transaction_output.rs +++ b/execution/executor-types/src/parsed_transaction_output.rs @@ -160,11 +160,8 @@ impl TransactionsWithParsedOutput { self.transactions } - pub fn into_inner(self) -> (Vec, Vec) { - ( - self.transactions, - self.parsed_output.into_iter().map(|t| t.output).collect(), - ) + pub fn into_inner(self) -> (Vec, Vec) { + (self.transactions, self.parsed_output) } pub fn iter(&self) -> impl Iterator { diff --git a/execution/executor-types/src/state_compute_result.rs b/execution/executor-types/src/state_compute_result.rs index 0fd5f85ea24d6..8d67930555726 100644 --- a/execution/executor-types/src/state_compute_result.rs +++ b/execution/executor-types/src/state_compute_result.rs @@ -104,7 +104,17 @@ impl StateComputeResult { } pub fn transactions_to_commit_len(&self) -> usize { - self.ledger_update_output.transactions.len() + self.ledger_update_output.to_commit.len() + } + + /// On top of input transactions (which contain BlockMetadata and Validator txns), + /// filter out those that should be committed, and add StateCheckpoint/BlockEpilogue if needed. + pub fn transactions_to_commit(&self) -> Vec { + self.ledger_update_output + .to_commit + .iter() + .map(|t| t.transaction.clone()) + .collect() } pub fn epoch_state(&self) -> &Option { @@ -145,7 +155,7 @@ impl StateComputeResult { pub fn make_chunk_commit_notification(&self) -> ChunkCommitNotification { ChunkCommitNotification { subscribable_events: self.ledger_update_output.subscribable_events.clone(), - committed_transactions: self.ledger_update_output.transactions.clone(), + committed_transactions: self.transactions_to_commit(), reconfiguration_occurred: self.has_reconfiguration(), } } @@ -153,18 +163,14 @@ impl StateComputeResult { pub fn as_chunk_to_commit(&self) -> ChunkToCommit { ChunkToCommit { first_version: self.ledger_update_output.first_version(), - transactions: &self.ledger_update_output.transactions, - transaction_outputs: &self.ledger_update_output.transaction_outputs, - transaction_infos: &self.ledger_update_output.transaction_infos, - per_version_state_updates: &self.ledger_update_output.per_version_state_updates, base_state_version: self.parent_state.base_version, + txns_to_commit: &self.ledger_update_output.to_commit, latest_in_memory_state: &self.result_state, state_updates_until_last_checkpoint: self .ledger_update_output .state_updates_until_last_checkpoint .as_ref(), sharded_state_cache: Some(&self.ledger_update_output.sharded_state_cache), - is_reconfig: self.ledger_update_output.block_end_info.is_some(), } } } diff --git a/execution/executor/src/block_executor.rs b/execution/executor/src/block_executor.rs index 73ed2811a64d7..72426a15dd951 100644 --- a/execution/executor/src/block_executor.rs +++ b/execution/executor/src/block_executor.rs @@ -6,7 +6,7 @@ use crate::{ components::{ - block_tree::BlockTree, chunk_output::ChunkOutput, do_ledger_update::DoLedgerUpdate, + apply_chunk_output::ApplyChunkOutput, block_tree::BlockTree, chunk_output::ChunkOutput, partial_state_compute_result::PartialStateComputeResult, }, logging::{LogEntry, LogSchema}, @@ -318,7 +318,10 @@ where parent_output.reconfig_suffix() } else { let (output, _, _) = THREAD_MANAGER.get_non_exe_cpu_pool().install(|| { - DoLedgerUpdate::run(state_checkpoint_output, parent_accumulator.clone()) + ApplyChunkOutput::calculate_ledger_update( + state_checkpoint_output, + parent_accumulator.clone(), + ) })?; output }; diff --git a/execution/executor/src/chunk_executor.rs b/execution/executor/src/chunk_executor.rs index c5854affe53e0..4feb6a4bcb076 100644 --- a/execution/executor/src/chunk_executor.rs +++ b/execution/executor/src/chunk_executor.rs @@ -10,7 +10,6 @@ use crate::{ chunk_commit_queue::{ChunkCommitQueue, ChunkToUpdateLedger}, chunk_output::ChunkOutput, chunk_result_verifier::{ChunkResultVerifier, ReplayChunkVerifier, StateSyncChunkVerifier}, - do_ledger_update::DoLedgerUpdate, executed_chunk::ExecutedChunk, partial_state_compute_result::PartialStateComputeResult, transaction_chunk::{ChunkToApply, ChunkToExecute, TransactionChunk}, @@ -350,7 +349,10 @@ impl ChunkExecutorInner { let first_version = parent_accumulator.num_leaves(); let (ledger_update_output, to_discard, to_retry) = { let _timer = CHUNK_OTHER_TIMERS.timer_with(&["chunk_update_ledger__calculate"]); - DoLedgerUpdate::run(state_checkpoint_output, parent_accumulator.clone())? + ApplyChunkOutput::calculate_ledger_update( + state_checkpoint_output, + parent_accumulator.clone(), + )? }; ensure!(to_discard.is_empty(), "Unexpected discard."); @@ -492,16 +494,19 @@ impl ChunkExecutorInner { let started = Instant::now(); let chunk = self.commit_chunk_impl()?; - let output = chunk.output.expect_complete_result(); - let num_committed = output.transactions_to_commit_len(); + let num_committed = chunk.transactions_to_commit().len(); info!( num_committed = num_committed, tps = num_committed as f64 / started.elapsed().as_secs_f64(), "TransactionReplayer::commit() OK" ); - Ok(output.version()) + Ok(chunk + .output + .result_state + .current_version + .expect("Version must exist after commit.")) } /// Remove `end_version - begin_version` transactions from the mutable input arguments and replay. diff --git a/execution/executor/src/components/apply_chunk_output.rs b/execution/executor/src/components/apply_chunk_output.rs index 42f091b5564fd..321579363ba02 100644 --- a/execution/executor/src/components/apply_chunk_output.rs +++ b/execution/executor/src/components/apply_chunk_output.rs @@ -6,31 +6,38 @@ use crate::{ components::{ - chunk_output::ChunkOutput, do_ledger_update::DoLedgerUpdate, executed_chunk::ExecutedChunk, + chunk_output::{update_counters_for_processed_chunk, ChunkOutput}, + executed_chunk::ExecutedChunk, in_memory_state_calculator_v2::InMemoryStateCalculatorV2, partial_state_compute_result::PartialStateComputeResult, }, metrics::{EXECUTOR_ERRORS, OTHER_TIMERS}, }; -use anyhow::Result; -use aptos_crypto::HashValue; +use anyhow::{ensure, Result}; +use aptos_crypto::{hash::CryptoHash, HashValue}; use aptos_executor_types::{ parsed_transaction_output::TransactionsWithParsedOutput, + should_forward_to_subscription_service, state_checkpoint_output::{StateCheckpointOutput, TransactionsByStatus}, - ParsedTransactionOutput, + LedgerUpdateOutput, ParsedTransactionOutput, }; +use aptos_experimental_runtimes::thread_manager::optimal_min_len; use aptos_logger::error; use aptos_metrics_core::TimerHelper; use aptos_storage_interface::{state_delta::StateDelta, ExecutedTrees}; use aptos_types::{ + contract_event::ContractEvent, epoch_state::EpochState, + proof::accumulator::{InMemoryEventAccumulator, InMemoryTransactionAccumulator}, + state_store::ShardedStateUpdates, transaction::{ block_epilogue::{BlockEndInfo, BlockEpiloguePayload}, - ExecutionStatus, Transaction, TransactionAuxiliaryData, TransactionOutput, - TransactionStatus, + ExecutionStatus, Transaction, TransactionAuxiliaryData, TransactionInfo, TransactionOutput, + TransactionStatus, TransactionToCommit, }, write_set::WriteSet, }; +use rayon::prelude::*; use std::{iter::repeat, sync::Arc}; pub struct ApplyChunkOutput; @@ -108,6 +115,61 @@ impl ApplyChunkOutput { )) } + pub fn calculate_ledger_update( + state_checkpoint_output: StateCheckpointOutput, + base_txn_accumulator: Arc, + ) -> Result<(LedgerUpdateOutput, Vec, Vec)> { + let ( + txns, + state_updates_vec, + state_checkpoint_hashes, + state_updates_before_last_checkpoint, + sharded_state_cache, + block_end_info, + ) = state_checkpoint_output.into_inner(); + + let (statuses_for_input_txns, to_commit, to_discard, to_retry) = txns.into_inner(); + + update_counters_for_processed_chunk( + to_commit.txns(), + to_commit.parsed_outputs(), + "execution", + ); + update_counters_for_processed_chunk( + to_discard.txns(), + to_discard.parsed_outputs(), + "execution", + ); + update_counters_for_processed_chunk( + to_retry.txns(), + to_retry.parsed_outputs(), + "execution", + ); + + // Calculate TransactionData and TransactionInfo, i.e. the ledger history diff. + let _timer = OTHER_TIMERS.timer_with(&["assemble_ledger_diff_for_block"]); + + let (txns_to_commit, transaction_info_hashes, subscribable_events) = + Self::assemble_ledger_diff(to_commit, state_updates_vec, state_checkpoint_hashes); + let transaction_accumulator = + Arc::new(base_txn_accumulator.append(&transaction_info_hashes)); + Ok(( + LedgerUpdateOutput::new( + statuses_for_input_txns, + txns_to_commit, + subscribable_events, + transaction_info_hashes, + state_updates_before_last_checkpoint, + sharded_state_cache, + transaction_accumulator, + base_txn_accumulator, + block_end_info, + ), + to_discard.into_txns(), + to_retry.into_txns(), + )) + } + pub fn apply_chunk( chunk_output: ChunkOutput, base_view: &ExecutedTrees, @@ -121,8 +183,10 @@ impl ApplyChunkOutput { known_state_checkpoint_hashes, /*is_block=*/ false, )?; - let (ledger_update_output, to_discard, to_retry) = - DoLedgerUpdate::run(state_checkpoint_output, base_view.txn_accumulator().clone())?; + let (ledger_update_output, to_discard, to_retry) = Self::calculate_ledger_update( + state_checkpoint_output, + base_view.txn_accumulator().clone(), + )?; let output = PartialStateComputeResult::new( base_view.state().clone(), result_state, @@ -258,4 +322,159 @@ impl ApplyChunkOutput { to_retry, )) } + + fn assemble_ledger_diff( + to_commit_from_execution: TransactionsWithParsedOutput, + state_updates_vec: Vec, + state_checkpoint_hashes: Vec>, + ) -> (Vec, Vec, Vec) { + // these are guaranteed by caller side logic + assert_eq!(to_commit_from_execution.len(), state_updates_vec.len()); + assert_eq!( + to_commit_from_execution.len(), + state_checkpoint_hashes.len() + ); + + let num_txns = to_commit_from_execution.len(); + let mut to_commit = Vec::with_capacity(num_txns); + let mut txn_info_hashes = Vec::with_capacity(num_txns); + let hashes_vec = + Self::calculate_events_and_writeset_hashes(to_commit_from_execution.parsed_outputs()); + + let _timer = OTHER_TIMERS.timer_with(&["process_events_and_writeset_hashes"]); + let hashes_vec: Vec<(HashValue, HashValue)> = hashes_vec + .into_par_iter() + .map(|(event_hashes, write_set_hash)| { + ( + InMemoryEventAccumulator::from_leaves(&event_hashes).root_hash(), + write_set_hash, + ) + }) + .collect(); + + let mut all_subscribable_events = Vec::new(); + let (to_commit_txns, to_commit_outputs) = to_commit_from_execution.into_inner(); + for ( + txn, + txn_output, + state_checkpoint_hash, + state_updates, + (event_root_hash, write_set_hash), + ) in itertools::izip!( + to_commit_txns, + to_commit_outputs, + state_checkpoint_hashes, + state_updates_vec, + hashes_vec + ) { + let (write_set, events, per_txn_reconfig_events, gas_used, status, auxiliary_data) = + txn_output.unpack(); + + let subscribable_events: Vec = events + .iter() + .filter(|evt| should_forward_to_subscription_service(evt)) + .cloned() + .collect(); + let txn_info = match &status { + TransactionStatus::Keep(status) => TransactionInfo::new( + txn.hash(), + write_set_hash, + event_root_hash, + state_checkpoint_hash, + gas_used, + status.clone(), + ), + _ => unreachable!("Transaction sorted by status already."), + }; + let txn_info_hash = txn_info.hash(); + txn_info_hashes.push(txn_info_hash); + let txn_to_commit = TransactionToCommit::new( + txn, + txn_info, + state_updates, + write_set, + events, + !per_txn_reconfig_events.is_empty(), + auxiliary_data, + ); + all_subscribable_events.extend(subscribable_events); + to_commit.push(txn_to_commit); + } + (to_commit, txn_info_hashes, all_subscribable_events) + } + + fn calculate_events_and_writeset_hashes( + to_commit_from_execution: &[ParsedTransactionOutput], + ) -> Vec<(Vec, HashValue)> { + let _timer = OTHER_TIMERS.timer_with(&["calculate_events_and_writeset_hashes"]); + + let num_txns = to_commit_from_execution.len(); + to_commit_from_execution + .par_iter() + .with_min_len(optimal_min_len(num_txns, 64)) + .map(|txn_output| { + ( + txn_output + .events() + .iter() + .map(CryptoHash::hash) + .collect::>(), + CryptoHash::hash(txn_output.write_set()), + ) + }) + .collect::>() + } +} + +pub fn ensure_no_discard(to_discard: Vec) -> Result<()> { + ensure!(to_discard.is_empty(), "Syncing discarded transactions"); + Ok(()) +} + +pub fn ensure_no_retry(to_retry: Vec) -> Result<()> { + ensure!( + to_retry.is_empty(), + "Seeing retries when syncing, did it crosses epoch boundary?", + ); + Ok(()) +} + +#[test] +fn assemble_ledger_diff_should_filter_subscribable_events() { + let event_0 = + ContractEvent::new_v2_with_type_tag_str("0x1::dkg::DKGStartEvent", b"dkg_1".to_vec()); + let event_1 = ContractEvent::new_v2_with_type_tag_str( + "0x2345::random_module::RandomEvent", + b"random_x".to_vec(), + ); + let event_2 = + ContractEvent::new_v2_with_type_tag_str("0x1::dkg::DKGStartEvent", b"dkg_2".to_vec()); + let txns_n_outputs = + TransactionsWithParsedOutput::new(vec![Transaction::dummy(), Transaction::dummy()], vec![ + ParsedTransactionOutput::from(TransactionOutput::new( + WriteSet::default(), + vec![event_0.clone()], + 0, + TransactionStatus::Keep(ExecutionStatus::Success), + TransactionAuxiliaryData::default(), + )), + ParsedTransactionOutput::from(TransactionOutput::new( + WriteSet::default(), + vec![event_1.clone(), event_2.clone()], + 0, + TransactionStatus::Keep(ExecutionStatus::Success), + TransactionAuxiliaryData::default(), + )), + ]); + let state_updates_vec = vec![ + ShardedStateUpdates::default(), + ShardedStateUpdates::default(), + ]; + let state_checkpoint_hashes = vec![Some(HashValue::zero()), Some(HashValue::zero())]; + let (_, _, subscribable_events) = ApplyChunkOutput::assemble_ledger_diff( + txns_n_outputs, + state_updates_vec, + state_checkpoint_hashes, + ); + assert_eq!(vec![event_0, event_2], subscribable_events); } diff --git a/execution/executor/src/components/do_ledger_update.rs b/execution/executor/src/components/do_ledger_update.rs deleted file mode 100644 index 2fe989faea05b..0000000000000 --- a/execution/executor/src/components/do_ledger_update.rs +++ /dev/null @@ -1,205 +0,0 @@ -// Copyright (c) Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use crate::{components::chunk_output::update_counters_for_processed_chunk, metrics::OTHER_TIMERS}; -use anyhow::Result; -use aptos_crypto::{hash::CryptoHash, HashValue}; -use aptos_executor_types::{ - parsed_transaction_output::TransactionsWithParsedOutput, - should_forward_to_subscription_service, state_checkpoint_output::StateCheckpointOutput, - LedgerUpdateOutput, ParsedTransactionOutput, -}; -use aptos_experimental_runtimes::thread_manager::optimal_min_len; -use aptos_metrics_core::TimerHelper; -use aptos_types::{ - contract_event::ContractEvent, - proof::accumulator::{InMemoryEventAccumulator, InMemoryTransactionAccumulator}, - transaction::{Transaction, TransactionInfo}, -}; -use itertools::{izip, Itertools}; -use rayon::prelude::*; -use std::sync::Arc; - -pub struct DoLedgerUpdate; - -impl DoLedgerUpdate { - pub fn run( - state_checkpoint_output: StateCheckpointOutput, - base_txn_accumulator: Arc, - ) -> Result<(LedgerUpdateOutput, Vec, Vec)> { - let ( - txns, - state_updates_vec, - state_checkpoint_hashes, - state_updates_before_last_checkpoint, - sharded_state_cache, - block_end_info, - ) = state_checkpoint_output.into_inner(); - - let (statuses_for_input_txns, to_commit, to_discard, to_retry) = txns.into_inner(); - for group in [&to_commit, &to_discard, &to_retry] { - update_counters_for_processed_chunk(group.txns(), group.parsed_outputs(), "execution"); - } - - // these are guaranteed by caller side logic - assert_eq!(to_commit.len(), state_updates_vec.len()); - assert_eq!(to_commit.len(), state_checkpoint_hashes.len()); - - let (event_hashes, write_set_hashes) = - Self::calculate_events_and_writeset_hashes(to_commit.parsed_outputs()); - - let (transaction_infos, subscribible_events) = Self::assemble_transaction_infos( - &to_commit, - state_checkpoint_hashes, - event_hashes, - write_set_hashes, - ); - let transaction_info_hashes = transaction_infos.iter().map(CryptoHash::hash).collect_vec(); - let transaction_accumulator = - Arc::new(base_txn_accumulator.append(&transaction_info_hashes)); - - let (transactions, transaction_outputs) = to_commit.into_inner(); - - let ledger_update_output = LedgerUpdateOutput::new( - statuses_for_input_txns, - transactions, - transaction_outputs, - transaction_infos, - state_updates_vec, - subscribible_events, - transaction_info_hashes, - state_updates_before_last_checkpoint, - sharded_state_cache, - transaction_accumulator, - base_txn_accumulator, - block_end_info, - ); - - Ok(( - ledger_update_output, - to_discard.into_txns(), - to_retry.into_txns(), - )) - } - - fn calculate_events_and_writeset_hashes( - to_commit: &[ParsedTransactionOutput], - ) -> (Vec, Vec) { - let _timer = OTHER_TIMERS.timer_with(&["calculate_events_and_writeset_hashes"]); - - let num_txns = to_commit.len(); - to_commit - .par_iter() - .with_min_len(optimal_min_len(num_txns, 64)) - .map(|txn_output| { - let event_hashes = txn_output - .events() - .iter() - .map(CryptoHash::hash) - .collect::>(); - - ( - InMemoryEventAccumulator::from_leaves(&event_hashes).root_hash(), - CryptoHash::hash(txn_output.write_set()), - ) - }) - .unzip() - } - - fn assemble_transaction_infos( - to_commit: &TransactionsWithParsedOutput, - state_checkpoint_hashes: Vec>, - event_hashes: Vec, - writeset_hashes: Vec, - ) -> (Vec, Vec) { - let _timer = OTHER_TIMERS.timer_with(&["process_events_and_writeset_hashes"]); - - let mut txn_infos = Vec::with_capacity(to_commit.len()); - let mut subscribable_events = Vec::new(); - izip!( - to_commit.iter(), - state_checkpoint_hashes, - event_hashes, - writeset_hashes - ) - .for_each( - |((txn, txn_out), state_checkpoint_hash, event_root_hash, write_set_hash)| { - subscribable_events.extend( - txn_out - .events() - .iter() - .filter(|evt| should_forward_to_subscription_service(evt)) - .cloned(), - ); - txn_infos.push(TransactionInfo::new( - txn.hash(), - write_set_hash, - event_root_hash, - state_checkpoint_hash, - txn_out.gas_used(), - txn_out.status().as_kept_status().expect("Already sorted."), - )); - }, - ); - - (txn_infos, subscribable_events) - } -} - -#[cfg(test)] -mod tests { - use super::DoLedgerUpdate; - use aptos_crypto::hash::HashValue; - use aptos_executor_types::parsed_transaction_output::{ - ParsedTransactionOutput, TransactionsWithParsedOutput, - }; - use aptos_types::{ - contract_event::ContractEvent, - transaction::{ - ExecutionStatus, Transaction, TransactionAuxiliaryData, TransactionOutput, - TransactionStatus, - }, - write_set::WriteSet, - }; - - #[test] - fn assemble_ledger_diff_should_filter_subscribable_events() { - let event_0 = - ContractEvent::new_v2_with_type_tag_str("0x1::dkg::DKGStartEvent", b"dkg_1".to_vec()); - let event_1 = ContractEvent::new_v2_with_type_tag_str( - "0x2345::random_module::RandomEvent", - b"random_x".to_vec(), - ); - let event_2 = - ContractEvent::new_v2_with_type_tag_str("0x1::dkg::DKGStartEvent", b"dkg_2".to_vec()); - let txns_n_outputs = TransactionsWithParsedOutput::new( - vec![Transaction::dummy(), Transaction::dummy()], - vec![ - ParsedTransactionOutput::from(TransactionOutput::new( - WriteSet::default(), - vec![event_0.clone()], - 0, - TransactionStatus::Keep(ExecutionStatus::Success), - TransactionAuxiliaryData::default(), - )), - ParsedTransactionOutput::from(TransactionOutput::new( - WriteSet::default(), - vec![event_1.clone(), event_2.clone()], - 0, - TransactionStatus::Keep(ExecutionStatus::Success), - TransactionAuxiliaryData::default(), - )), - ], - ); - let state_checkpoint_hashes = vec![Some(HashValue::zero()); 2]; - let event_hashes = vec![HashValue::zero(); 2]; - let write_set_hashes = vec![HashValue::zero(); 2]; - let (_transaction_infos, subscribable_events) = DoLedgerUpdate::assemble_transaction_infos( - &txns_n_outputs, - state_checkpoint_hashes, - event_hashes, - write_set_hashes, - ); - assert_eq!(vec![event_0, event_2], subscribable_events); - } -} diff --git a/execution/executor/src/components/executed_chunk.rs b/execution/executor/src/components/executed_chunk.rs index 50e8f0b185290..f1d8654729d6e 100644 --- a/execution/executor/src/components/executed_chunk.rs +++ b/execution/executor/src/components/executed_chunk.rs @@ -5,10 +5,16 @@ #![forbid(unsafe_code)] use crate::components::partial_state_compute_result::PartialStateComputeResult; -use aptos_types::ledger_info::LedgerInfoWithSignatures; +use aptos_types::{ledger_info::LedgerInfoWithSignatures, transaction::TransactionToCommit}; #[derive(Debug)] pub struct ExecutedChunk { pub output: PartialStateComputeResult, pub ledger_info_opt: Option, } + +impl ExecutedChunk { + pub fn transactions_to_commit(&self) -> &[TransactionToCommit] { + &self.output.expect_ledger_update_output().to_commit + } +} diff --git a/execution/executor/src/components/mod.rs b/execution/executor/src/components/mod.rs index da78a4b5dff16..27c9b6ca74ec1 100644 --- a/execution/executor/src/components/mod.rs +++ b/execution/executor/src/components/mod.rs @@ -11,7 +11,6 @@ pub mod chunk_output; pub mod in_memory_state_calculator_v2; pub mod chunk_result_verifier; -pub mod do_ledger_update; pub mod executed_chunk; pub mod partial_state_compute_result; pub mod transaction_chunk; diff --git a/execution/executor/src/db_bootstrapper.rs b/execution/executor/src/db_bootstrapper.rs index d4bfeae878975..02f74c12118c6 100644 --- a/execution/executor/src/db_bootstrapper.rs +++ b/execution/executor/src/db_bootstrapper.rs @@ -136,7 +136,7 @@ pub fn calculate_genesis( .apply_to_ledger(&executed_trees, None)?; let output = &chunk.output; ensure!( - output.expect_ledger_update_output().num_txns() != 0, + !chunk.transactions_to_commit().is_empty(), "Genesis txn execution failed." ); diff --git a/execution/executor/src/fuzzing.rs b/execution/executor/src/fuzzing.rs index 88500a7db1d00..2621e4b3fcd7e 100644 --- a/execution/executor/src/fuzzing.rs +++ b/execution/executor/src/fuzzing.rs @@ -25,7 +25,7 @@ use aptos_types::{ signature_verified_transaction::{ into_signature_verified_block, SignatureVerifiedTransaction, }, - BlockOutput, Transaction, TransactionOutput, Version, + BlockOutput, Transaction, TransactionOutput, TransactionToCommit, Version, }, vm_status::VMStatus, }; @@ -124,7 +124,7 @@ impl DbWriter for FakeDb { &self, _version: Version, _ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>, - _chunk: Option, + _txns_to_commit: Option<&[TransactionToCommit]>, ) -> aptos_storage_interface::Result<()> { Ok(()) } diff --git a/storage/aptosdb/src/db/include/aptosdb_testonly.rs b/storage/aptosdb/src/db/include/aptosdb_testonly.rs index e6d4edc08ccea..73d5e10d13f5b 100644 --- a/storage/aptosdb/src/db/include/aptosdb_testonly.rs +++ b/storage/aptosdb/src/db/include/aptosdb_testonly.rs @@ -6,7 +6,6 @@ use aptos_config::config::{ BUFFERED_STATE_TARGET_ITEMS_FOR_TEST, DEFAULT_MAX_NU use aptos_infallible::Mutex; use aptos_types::state_store::create_empty_sharded_state_updates; use std::default::Default; -use aptos_types::transaction::TransactionStatus; impl AptosDB { /// This opens db in non-readonly mode, without the pruner. @@ -99,6 +98,35 @@ impl AptosDB { } } +pub fn gather_state_updates_until_last_checkpoint( + first_version: Version, + latest_in_memory_state: &StateDelta, + txns_to_commit: &[TransactionToCommit], +) -> Option { + if let Some(latest_checkpoint_version) = latest_in_memory_state.base_version { + if latest_checkpoint_version >= first_version { + let idx = (latest_checkpoint_version - first_version) as usize; + assert!( + txns_to_commit[idx].has_state_checkpoint_hash(), + "The new latest snapshot version passed in {:?} does not match with the last checkpoint version in txns_to_commit {:?}", + latest_checkpoint_version, + first_version + idx as u64 + ); + let mut sharded_state_updates = create_empty_sharded_state_updates(); + sharded_state_updates.par_iter_mut().enumerate().for_each( + |(shard_id, state_updates_shard)| { + txns_to_commit[..=idx].iter().for_each(|txn_to_commit| { + state_updates_shard.extend(txn_to_commit.state_updates()[shard_id].clone()); + }) + }, + ); + return Some(sharded_state_updates); + } + } + + None +} + /// Test only methods for the DB impl AptosDB { pub fn save_transactions_for_test( @@ -110,126 +138,23 @@ impl AptosDB { sync_commit: bool, latest_in_memory_state: &StateDelta, ) -> Result<()> { - let chunk = ChunkToCommitOwned::from_test_txns_to_commit( + let state_updates_until_last_checkpoint = gather_state_updates_until_last_checkpoint( + first_version, + latest_in_memory_state, txns_to_commit, + ); + let chunk = ChunkToCommit { first_version, base_state_version, + txns_to_commit, latest_in_memory_state, - ); + state_updates_until_last_checkpoint: state_updates_until_last_checkpoint.as_ref(), + sharded_state_cache: None, + }; self.save_transactions( - chunk.as_ref(), + chunk, ledger_info_with_sigs, sync_commit, ) } } - -pub struct ChunkToCommitOwned { - first_version: Version, - transactions: Vec, - transaction_outputs: Vec, - transaction_infos: Vec, - base_state_version: Option, - latest_in_memory_state: Arc, - per_version_state_updates: Vec, - state_updates_until_last_checkpoint: Option, - sharded_state_cache: Option, - is_reconfig: bool, -} - -impl ChunkToCommitOwned { - pub fn from_test_txns_to_commit( - txns_to_commit: &[TransactionToCommit], - first_version: Version, - base_state_version: Option, - latest_in_memory_state: &StateDelta, - ) -> Self { - let (transactions, transaction_outputs, transaction_infos, per_version_state_updates) = Self::disassemble_txns_to_commit(txns_to_commit); - - let state_updates_until_last_checkpoint = Self::gather_state_updates_until_last_checkpoint( - first_version, - latest_in_memory_state, - &per_version_state_updates, - &transaction_infos, - ); - - Self { - first_version, - transactions, - transaction_outputs, - transaction_infos, - base_state_version, - latest_in_memory_state: Arc::new(latest_in_memory_state.clone()), - per_version_state_updates, - state_updates_until_last_checkpoint, - sharded_state_cache: None, - is_reconfig: false, - } - } - - pub fn as_ref(&self) -> ChunkToCommit { - ChunkToCommit { - first_version: self.first_version, - transactions: &self.transactions, - transaction_outputs: &self.transaction_outputs, - transaction_infos: &self.transaction_infos, - base_state_version: self.base_state_version, - latest_in_memory_state: &self.latest_in_memory_state, - per_version_state_updates: &self.per_version_state_updates, - state_updates_until_last_checkpoint: self.state_updates_until_last_checkpoint.as_ref(), - sharded_state_cache: self.sharded_state_cache.as_ref(), - is_reconfig: self.is_reconfig, - } - } - - fn disassemble_txns_to_commit(txns_to_commit: &[TransactionToCommit]) -> ( - Vec, Vec, Vec, Vec, - ) { - txns_to_commit.iter().map(|txn_to_commit| { - let TransactionToCommit { - transaction, transaction_info, state_updates, write_set, events, is_reconfig: _, transaction_auxiliary_data - } = txn_to_commit; - - let transaction_output = TransactionOutput::new( - write_set.clone(), - events.clone(), - transaction_info.gas_used(), - TransactionStatus::Keep(transaction_info.status().clone()), - transaction_auxiliary_data.clone(), - ); - - (transaction.clone(), transaction_output, transaction_info.clone(), state_updates.clone()) - }).multiunzip() - } - - pub fn gather_state_updates_until_last_checkpoint( - first_version: Version, - latest_in_memory_state: &StateDelta, - per_version_state_updates: &[ShardedStateUpdates], - transaction_infos: &[TransactionInfo], - ) -> Option { - if let Some(latest_checkpoint_version) = latest_in_memory_state.base_version { - if latest_checkpoint_version >= first_version { - let idx = (latest_checkpoint_version - first_version) as usize; - assert!( - transaction_infos[idx].state_checkpoint_hash().is_some(), - "The new latest snapshot version passed in {:?} does not match with the last checkpoint version in txns_to_commit {:?}", - latest_checkpoint_version, - first_version + idx as u64 - ); - let mut sharded_state_updates = create_empty_sharded_state_updates(); - sharded_state_updates.par_iter_mut().enumerate().for_each( - |(shard_id, state_updates_shard)| { - per_version_state_updates[..=idx].iter().for_each(|updates| { - state_updates_shard.extend(updates[shard_id].clone()); - }) - }, - ); - return Some(sharded_state_updates); - } - } - - None - } - -} diff --git a/storage/aptosdb/src/db/include/aptosdb_writer.rs b/storage/aptosdb/src/db/include/aptosdb_writer.rs index 56b8fb7373f82..dadffda470ce0 100644 --- a/storage/aptosdb/src/db/include/aptosdb_writer.rs +++ b/storage/aptosdb/src/db/include/aptosdb_writer.rs @@ -21,11 +21,30 @@ impl DbWriter for AptosDB { .expect("Concurrent committing detected."); let _timer = OTHER_TIMERS_SECONDS.timer_with(&["pre_commit_ledger"]); - chunk.latest_in_memory_state.current.log_generation("db_save"); + let ChunkToCommit { + txns_to_commit, + first_version, + base_state_version, + state_updates_until_last_checkpoint, + latest_in_memory_state, + sharded_state_cache, + } = chunk; + + latest_in_memory_state.current.log_generation("db_save"); + + self.pre_commit_validation( + txns_to_commit, + first_version, + base_state_version, + latest_in_memory_state, + )?; + let last_version = first_version + txns_to_commit.len() as u64 - 1; - self.pre_commit_validation(&chunk)?; let _new_root_hash = self.calculate_and_commit_ledger_and_state_kv( - &chunk, + txns_to_commit, + first_version, + latest_in_memory_state.current.usage(), + sharded_state_cache, self.skip_index_and_usage, )?; @@ -35,12 +54,12 @@ impl DbWriter for AptosDB { let _timer = OTHER_TIMERS_SECONDS.timer_with(&["buffered_state___update"]); buffered_state.update( - chunk.state_updates_until_last_checkpoint, - chunk.latest_in_memory_state, - sync_commit || chunk.is_reconfig, + state_updates_until_last_checkpoint, + latest_in_memory_state, + sync_commit || txns_to_commit.last().unwrap().is_reconfig(), )?; } - self.ledger_db.metadata_db().set_pre_committed_version(chunk.expect_last_version()); + self.ledger_db.metadata_db().set_pre_committed_version(last_version); Ok(()) }) } @@ -49,7 +68,7 @@ impl DbWriter for AptosDB { &self, version: Version, ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>, - chunk_opt: Option, + txns_to_commit: Option<&[TransactionToCommit]>, ) -> Result<()> { gauged_api("commit_ledger", || { // Pre-committing and committing in concurrency is allowed but not pre-committing at the @@ -81,7 +100,7 @@ impl DbWriter for AptosDB { old_committed_ver, version, ledger_info_with_sigs, - chunk_opt, + txns_to_commit, ) }) } @@ -219,30 +238,35 @@ impl DbWriter for AptosDB { impl AptosDB { fn pre_commit_validation( &self, - chunk: &ChunkToCommit, + txns_to_commit: &[TransactionToCommit], + first_version: Version, + base_state_version: Option, + latest_in_memory_state: &StateDelta, ) -> Result<()> { let _timer = OTHER_TIMERS_SECONDS .with_label_values(&["save_transactions_validation"]) .start_timer(); + let num_txns = txns_to_commit.len() as u64; ensure!( - !chunk.is_empty(), - "chunk is empty, nothing to save.", + num_txns > 0, + "txns_to_commit is empty, nothing to save.", ); + let last_version = first_version + num_txns - 1; ensure!( - Some(chunk.expect_last_version()) == chunk.latest_in_memory_state.current_version, + Some(last_version) == latest_in_memory_state.current_version, "the last_version {:?} to commit doesn't match the current_version {:?} in latest_in_memory_state", - chunk.expect_last_version(), - chunk.latest_in_memory_state.current_version.expect("Must exist"), + last_version, + latest_in_memory_state.current_version.expect("Must exist"), ); let num_transactions_in_db = self.get_pre_committed_version()?.map_or(0, |v| v + 1); { let buffered_state = self.state_store.buffered_state().lock(); ensure!( - chunk.base_state_version == buffered_state.current_state().base_version, + base_state_version == buffered_state.current_state().base_version, "base_state_version {:?} does not equal to the base_version {:?} in buffered state with current version {:?}", - chunk.base_state_version, + base_state_version, buffered_state.current_state().base_version, buffered_state.current_state().current_version, ); @@ -254,9 +278,9 @@ impl AptosDB { .current_version .map(|version| version + 1) .unwrap_or(0); - ensure!(num_transactions_in_db == chunk.first_version && num_transactions_in_db == next_version_in_buffered_state, + ensure!(num_transactions_in_db == first_version && num_transactions_in_db == next_version_in_buffered_state, "The first version passed in ({}), the next version in buffered state ({}) and the next version in db ({}) are inconsistent.", - chunk.first_version, + first_version, next_version_in_buffered_state, num_transactions_in_db, ); @@ -267,7 +291,10 @@ impl AptosDB { fn calculate_and_commit_ledger_and_state_kv( &self, - chunk: &ChunkToCommit, + txns_to_commit: &[TransactionToCommit], + first_version: Version, + expected_state_db_usage: StateStorageUsage, + sharded_state_cache: Option<&ShardedStateCache>, skip_index_and_usage: bool, ) -> Result { let _timer = OTHER_TIMERS_SECONDS @@ -280,44 +307,42 @@ impl AptosDB { // // TODO(grao): Consider propagating the error instead of panic, if necessary. s.spawn(|_| { - self.commit_events(chunk.first_version, chunk.transaction_outputs, skip_index_and_usage) + self.commit_events(txns_to_commit, first_version, skip_index_and_usage) .unwrap() }); s.spawn(|_| { self.ledger_db .write_set_db() - .commit_write_sets( - chunk.first_version, - chunk.transaction_outputs.par_iter().map(TransactionOutput::write_set) - ) + .commit_write_sets(txns_to_commit, first_version) .unwrap() }); s.spawn(|_| { self.ledger_db .transaction_db() - .commit_transactions(chunk.first_version, chunk.transactions, skip_index_and_usage) + .commit_transactions(txns_to_commit, first_version, skip_index_and_usage) .unwrap() }); s.spawn(|_| { self.commit_state_kv_and_ledger_metadata( - chunk, + txns_to_commit, + first_version, + expected_state_db_usage, + sharded_state_cache, skip_index_and_usage, ) .unwrap() }); s.spawn(|_| { - self.commit_transaction_infos(chunk.first_version, chunk.transaction_infos) + self.commit_transaction_infos(txns_to_commit, first_version) .unwrap() }); s.spawn(|_| { new_root_hash = self - .commit_transaction_accumulator(chunk.first_version, chunk.transaction_infos) + .commit_transaction_accumulator(txns_to_commit, first_version) .unwrap() }); s.spawn(|_| { - self.commit_transaction_auxiliary_data( - chunk.first_version, - chunk.transaction_outputs.iter().map(TransactionOutput::auxiliary_data)) + self.commit_transaction_auxiliary_data(txns_to_commit, first_version) .unwrap() }); }); @@ -327,15 +352,22 @@ impl AptosDB { fn commit_state_kv_and_ledger_metadata( &self, - chunk: &ChunkToCommit, + txns_to_commit: &[TransactionToCommit], + first_version: Version, + expected_state_db_usage: StateStorageUsage, + sharded_state_cache: Option<&ShardedStateCache>, skip_index_and_usage: bool, ) -> Result<()> { - if chunk.is_empty() { + if txns_to_commit.is_empty() { return Ok(()); } let _timer = OTHER_TIMERS_SECONDS .with_label_values(&["commit_state_kv_and_ledger_metadata"]) .start_timer(); + let state_updates_vec = txns_to_commit + .iter() + .map(|txn_to_commit| txn_to_commit.state_updates()) + .collect::>(); let ledger_metadata_batch = SchemaBatch::new(); let sharded_state_kv_batches = new_sharded_kv_schema_batch(); @@ -343,28 +375,28 @@ impl AptosDB { // TODO(grao): Make state_store take sharded state updates. self.state_store.put_value_sets( - chunk.per_version_state_updates, - chunk.first_version, - chunk.latest_in_memory_state.current.usage(), - chunk.sharded_state_cache, + state_updates_vec, + first_version, + expected_state_db_usage, + sharded_state_cache, &ledger_metadata_batch, &sharded_state_kv_batches, // Always put in state value index for now. // TODO(grao): remove after APIs migrated off the DB to the indexer. self.state_store.state_kv_db.enabled_sharding(), skip_index_and_usage, - chunk.transaction_infos + txns_to_commit .iter() - .rposition(|t| t.state_checkpoint_hash().is_some()), + .rposition(|txn| txn.has_state_checkpoint_hash()), )?; // Write block index if event index is skipped. if skip_index_and_usage { - for (i, txn_out) in chunk.transaction_outputs.iter().enumerate() { - for event in txn_out.events() { + for (i, txn) in txns_to_commit.iter().enumerate() { + for event in txn.events() { if let Some(event_key) = event.event_key() { if *event_key == new_block_event_key() { - let version = chunk.first_version + i as Version; + let version = first_version + i as Version; LedgerMetadataDb::put_block_info( version, event, @@ -376,10 +408,11 @@ impl AptosDB { } } + let last_version = first_version + txns_to_commit.len() as u64 - 1; ledger_metadata_batch .put::( &DbMetadataKey::LedgerCommitProgress, - &DbMetadataValue::Version(chunk.expect_last_version()), + &DbMetadataValue::Version(last_version), ) .unwrap(); @@ -396,7 +429,7 @@ impl AptosDB { s.spawn(|_| { self.state_kv_db .commit( - chunk.expect_last_version(), + last_version, state_kv_metadata_batch, sharded_state_kv_batches, ) @@ -409,22 +442,23 @@ impl AptosDB { fn commit_events( &self, + txns_to_commit: &[TransactionToCommit], first_version: Version, - transaction_outputs: &[TransactionOutput], skip_index: bool, ) -> Result<()> { let _timer = OTHER_TIMERS_SECONDS .with_label_values(&["commit_events"]) .start_timer(); let batch = SchemaBatch::new(); - transaction_outputs + let num_txns = txns_to_commit.len(); + txns_to_commit .par_iter() - .with_min_len(optimal_min_len(transaction_outputs.len(), 128)) + .with_min_len(optimal_min_len(num_txns, 128)) .enumerate() - .try_for_each(|(i, txn_out)| -> Result<()> { + .try_for_each(|(i, txn_to_commit)| -> Result<()> { self.ledger_db.event_db().put_events( - first_version + i as Version, - txn_out.events(), + first_version + i as u64, + txn_to_commit.events(), skip_index, &batch, )?; @@ -439,22 +473,23 @@ impl AptosDB { fn commit_transaction_accumulator( &self, - first_version: Version, - transaction_infos: &[TransactionInfo], + txns_to_commit: &[TransactionToCommit], + first_version: u64, ) -> Result { let _timer = OTHER_TIMERS_SECONDS .with_label_values(&["commit_transaction_accumulator"]) .start_timer(); - let num_txns = transaction_infos.len() as Version; - let batch = SchemaBatch::new(); let root_hash = self .ledger_db .transaction_accumulator_db() .put_transaction_accumulator( first_version, - transaction_infos, + &txns_to_commit + .iter() + .map(|txn_to_commit| txn_to_commit.transaction_info()) + .collect::>(), &batch, )?; @@ -467,7 +502,7 @@ impl AptosDB { let batch = SchemaBatch::new(); let all_versions: Vec<_> = - (first_version..first_version + num_txns).collect(); + (first_version..first_version + txns_to_commit.len() as u64).collect(); THREAD_MANAGER .get_non_exe_cpu_pool() .install(|| -> Result<()> { @@ -495,23 +530,23 @@ impl AptosDB { Ok(root_hash) } - fn commit_transaction_auxiliary_data<'a>( + fn commit_transaction_auxiliary_data( &self, - first_version: Version, - auxiliary_data: impl IntoIterator, + txns_to_commit: &[TransactionToCommit], + first_version: u64, ) -> Result<()> { let _timer = OTHER_TIMERS_SECONDS .with_label_values(&["commit_transaction_auxiliary_data"]) .start_timer(); let batch = SchemaBatch::new(); - auxiliary_data - .into_iter() + txns_to_commit + .iter() .enumerate() - .try_for_each(|(i, aux_data)| -> Result<()> { + .try_for_each(|(i, txn_to_commit)| -> Result<()> { TransactionAuxiliaryDataDb::put_transaction_auxiliary_data( - first_version + i as Version, - aux_data, + first_version + i as u64, + txn_to_commit.transaction_auxiliary_data(), &batch, )?; @@ -528,22 +563,23 @@ impl AptosDB { fn commit_transaction_infos( &self, - first_version: Version, - txn_infos: &[TransactionInfo], + txns_to_commit: &[TransactionToCommit], + first_version: u64, ) -> Result<()> { let _timer = OTHER_TIMERS_SECONDS .with_label_values(&["commit_transaction_infos"]) .start_timer(); let batch = SchemaBatch::new(); - txn_infos + let num_txns = txns_to_commit.len(); + txns_to_commit .par_iter() - .with_min_len(optimal_min_len(txn_infos.len(), 128)) + .with_min_len(optimal_min_len(num_txns, 128)) .enumerate() - .try_for_each(|(i, txn_info)| -> Result<()> { + .try_for_each(|(i, txn_to_commit)| -> Result<()> { let version = first_version + i as u64; TransactionInfoDb::put_transaction_info( version, - txn_info, + txn_to_commit.transaction_info(), &batch, )?; @@ -628,7 +664,7 @@ impl AptosDB { old_committed_version: Option, version: Version, ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>, - chunk_opt: Option, + txns_to_commit: Option<&[TransactionToCommit]>, ) -> Result<()> { // If commit succeeds and there are at least one transaction written to the storage, we // will inform the pruner thread to work. @@ -660,8 +696,8 @@ impl AptosDB { let _timer = OTHER_TIMERS_SECONDS.timer_with(&["indexer_index"]); // n.b. txns_to_commit can be partial, when the control was handed over from consensus to state sync // where state sync won't send the pre-committed part to the DB again. - if chunk_opt.is_some() && chunk_opt.as_ref().unwrap().len() == num_txns as usize { - let write_sets = chunk_opt.unwrap().transaction_outputs.iter().map(|t| t.write_set()).collect_vec(); + if txns_to_commit.is_some() && txns_to_commit.unwrap().len() == num_txns as usize { + let write_sets = txns_to_commit.unwrap().iter().map(|txn| txn.write_set()).collect_vec(); indexer.index(self.state_store.clone(), first_version, &write_sets)?; } else { let write_sets: Vec<_> = self.ledger_db.write_set_db().get_write_set_iter(first_version, num_txns as usize)?.try_collect()?; diff --git a/storage/aptosdb/src/db/mod.rs b/storage/aptosdb/src/db/mod.rs index 2a11b754f5c8f..2608e7ed52707 100644 --- a/storage/aptosdb/src/db/mod.rs +++ b/storage/aptosdb/src/db/mod.rs @@ -82,7 +82,6 @@ use std::{ time::Instant, }; use tokio::sync::watch::Sender; - #[cfg(test)] mod aptosdb_test; #[cfg(any(test, feature = "fuzzing"))] diff --git a/storage/aptosdb/src/db/test_helper.rs b/storage/aptosdb/src/db/test_helper.rs index 353b19bbf6f00..058447ad5209c 100644 --- a/storage/aptosdb/src/db/test_helper.rs +++ b/storage/aptosdb/src/db/test_helper.rs @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 //! This module provides reusable helpers in tests. +use super::gather_state_updates_until_last_checkpoint; #[cfg(test)] use crate::state_store::StateStore; #[cfg(test)] @@ -18,7 +19,9 @@ use aptos_executor_types::ProofReader; use aptos_jellyfish_merkle::node_type::{Node, NodeKey}; #[cfg(test)] use aptos_schemadb::SchemaBatch; -use aptos_storage_interface::{state_delta::StateDelta, DbReader, Order, Result}; +use aptos_storage_interface::{ + chunk_to_commit::ChunkToCommit, state_delta::StateDelta, DbReader, DbWriter, Order, Result, +}; use aptos_temppath::TempPath; #[cfg(test)] use aptos_types::state_store::state_storage_usage::StateStorageUsage; @@ -88,7 +91,7 @@ pub(crate) fn update_store( let schema_batch = SchemaBatch::new(); store .put_value_sets( - &[sharded_value_state_set], + vec![&sharded_value_state_set], version, StateStorageUsage::new_untracked(), None, @@ -915,8 +918,14 @@ pub(crate) fn put_transaction_infos( version: Version, txn_infos: &[TransactionInfo], ) -> HashValue { - db.commit_transaction_infos(version, txn_infos).unwrap(); - db.commit_transaction_accumulator(version, txn_infos) + let txns_to_commit: Vec<_> = txn_infos + .iter() + .cloned() + .map(TransactionToCommit::dummy_with_transaction_info) + .collect(); + db.commit_transaction_infos(&txns_to_commit, version) + .unwrap(); + db.commit_transaction_accumulator(&txns_to_commit, version) .unwrap() } @@ -926,7 +935,12 @@ pub(crate) fn put_transaction_auxiliary_data( version: Version, auxiliary_data: &[TransactionAuxiliaryData], ) { - db.commit_transaction_auxiliary_data(version, auxiliary_data) + let txns_to_commit: Vec<_> = auxiliary_data + .iter() + .cloned() + .map(TransactionToCommit::dummy_with_transaction_auxiliary_data) + .collect(); + db.commit_transaction_auxiliary_data(&txns_to_commit, version) .unwrap(); } @@ -987,26 +1001,39 @@ pub fn test_sync_transactions_impl( if batch1_len > 0 { let txns_to_commit_batch = &txns_to_commit[..batch1_len]; update_in_memory_state(&mut in_memory_state, txns_to_commit_batch); - db.save_transactions_for_test( + let state_updates = gather_state_updates_until_last_checkpoint( + cur_ver, + &in_memory_state, txns_to_commit_batch, - cur_ver, /* first_version */ + ); + let chunk = ChunkToCommit { + first_version: cur_ver, base_state_version, - None, /* ledger_info_with_sigs */ - false, /* sync_commit */ - &in_memory_state, - ) - .unwrap(); + txns_to_commit: txns_to_commit_batch, + latest_in_memory_state: &in_memory_state, + state_updates_until_last_checkpoint: state_updates.as_ref(), + sharded_state_cache: None, + }; + db.save_transactions(chunk, None, false /* sync_commit */) + .unwrap(); } let ver = cur_ver + batch1_len as Version; let txns_to_commit_batch = &txns_to_commit[batch1_len..]; update_in_memory_state(&mut in_memory_state, txns_to_commit_batch); - db.save_transactions_for_test( - txns_to_commit_batch, - ver, + let state_updates = + gather_state_updates_until_last_checkpoint(ver, &in_memory_state, txns_to_commit_batch); + let chunk = ChunkToCommit { + first_version: ver, base_state_version, + txns_to_commit: txns_to_commit_batch, + latest_in_memory_state: &in_memory_state, + state_updates_until_last_checkpoint: state_updates.as_ref(), + sharded_state_cache: None, + }; + db.save_transactions( + chunk, Some(ledger_info_with_sigs), false, /* sync_commit */ - &in_memory_state, ) .unwrap(); diff --git a/storage/aptosdb/src/fast_sync_storage_wrapper.rs b/storage/aptosdb/src/fast_sync_storage_wrapper.rs index e58099c0bb31d..8530b34586a0b 100644 --- a/storage/aptosdb/src/fast_sync_storage_wrapper.rs +++ b/storage/aptosdb/src/fast_sync_storage_wrapper.rs @@ -13,7 +13,7 @@ use aptos_storage_interface::{ use aptos_types::{ ledger_info::LedgerInfoWithSignatures, state_store::{state_key::StateKey, state_value::StateValue}, - transaction::{TransactionOutputListWithProof, Version}, + transaction::{TransactionOutputListWithProof, TransactionToCommit, Version}, }; use either::Either; use std::sync::Arc; @@ -40,7 +40,7 @@ pub struct FastSyncStorageWrapper { impl FastSyncStorageWrapper { /// If the db is empty and configured to do fast sync, we return a FastSyncStorageWrapper - /// Otherwise, we return AptosDB directly and the FastSyncStorageWrapper is None + /// Otherwise, we returns AptosDB directly and the FastSyncStorageWrapper is None pub fn initialize_dbs( config: &NodeConfig, internal_indexer_db: Option, @@ -177,10 +177,10 @@ impl DbWriter for FastSyncStorageWrapper { &self, version: Version, ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>, - chunk_opt: Option, + txns_to_commit: Option<&[TransactionToCommit]>, ) -> Result<()> { self.get_aptos_db_write_ref() - .commit_ledger(version, ledger_info_with_sigs, chunk_opt) + .commit_ledger(version, ledger_info_with_sigs, txns_to_commit) } } diff --git a/storage/aptosdb/src/ledger_db/transaction_db.rs b/storage/aptosdb/src/ledger_db/transaction_db.rs index beaefa935740d..f2f3b48b26641 100644 --- a/storage/aptosdb/src/ledger_db/transaction_db.rs +++ b/storage/aptosdb/src/ledger_db/transaction_db.rs @@ -14,7 +14,7 @@ use aptos_crypto::hash::{CryptoHash, HashValue}; use aptos_db_indexer_schemas::schema::transaction_by_account::TransactionByAccountSchema; use aptos_schemadb::{SchemaBatch, DB}; use aptos_storage_interface::{AptosDbError, Result}; -use aptos_types::transaction::{Transaction, Version}; +use aptos_types::transaction::{Transaction, TransactionToCommit, Version}; use rayon::prelude::*; use std::{path::Path, sync::Arc}; @@ -79,33 +79,32 @@ impl TransactionDb { pub(crate) fn commit_transactions( &self, + txns_to_commit: &[TransactionToCommit], first_version: Version, - transactions: &[Transaction], skip_index: bool, ) -> Result<()> { let _timer = OTHER_TIMERS_SECONDS .with_label_values(&["commit_transactions"]) .start_timer(); let chunk_size = 512; - let batches = transactions + let batches = txns_to_commit .par_chunks(chunk_size) .enumerate() .map(|(chunk_index, txns_in_chunk)| -> Result { let batch = SchemaBatch::new(); let chunk_first_version = first_version + (chunk_size * chunk_index) as u64; - txns_in_chunk - .iter() - .enumerate() - .try_for_each(|(i, txn)| -> Result<()> { + txns_in_chunk.iter().enumerate().try_for_each( + |(i, txn_to_commit)| -> Result<()> { self.put_transaction( chunk_first_version + i as u64, - txn, + txn_to_commit.transaction(), skip_index, &batch, )?; Ok(()) - })?; + }, + )?; Ok(batch) }) .collect::>>()?; diff --git a/storage/aptosdb/src/ledger_db/transaction_db_test.rs b/storage/aptosdb/src/ledger_db/transaction_db_test.rs index 8370856b2bace..3f628925a469d 100644 --- a/storage/aptosdb/src/ledger_db/transaction_db_test.rs +++ b/storage/aptosdb/src/ledger_db/transaction_db_test.rs @@ -9,7 +9,7 @@ use aptos_storage_interface::Result; use aptos_temppath::TempPath; use aptos_types::{ proptest_types::{AccountInfoUniverse, SignatureCheckedTransactionGen}, - transaction::{Transaction, Version}, + transaction::{Transaction, TransactionToCommit, Version}, }; use proptest::{collection::vec, prelude::*}; @@ -145,7 +145,17 @@ pub(crate) fn init_db( assert!(transaction_db.get_transaction(0).is_err()); transaction_db - .commit_transactions(0, &txns, /*skip_index=*/ false) + .commit_transactions( + &txns + .iter() + .map(|transaction| TransactionToCommit { + transaction: transaction.clone(), + ..TransactionToCommit::dummy() + }) + .collect::>(), + 0, + /*skip_index=*/ false, + ) .unwrap(); txns diff --git a/storage/aptosdb/src/ledger_db/write_set_db.rs b/storage/aptosdb/src/ledger_db/write_set_db.rs index a37b099d2bac7..9320e6fb5b19a 100644 --- a/storage/aptosdb/src/ledger_db/write_set_db.rs +++ b/storage/aptosdb/src/ledger_db/write_set_db.rs @@ -12,7 +12,10 @@ use crate::{ use aptos_experimental_runtimes::thread_manager::optimal_min_len; use aptos_schemadb::{SchemaBatch, DB}; use aptos_storage_interface::{db_ensure as ensure, AptosDbError, Result}; -use aptos_types::{transaction::Version, write_set::WriteSet}; +use aptos_types::{ + transaction::{TransactionToCommit, Version}, + write_set::WriteSet, +}; use rayon::prelude::*; use std::{path::Path, sync::Arc}; @@ -104,22 +107,22 @@ impl WriteSetDb { } /// Commits write sets starting from `first_version` to the database. - pub(crate) fn commit_write_sets<'a>( + pub(crate) fn commit_write_sets( &self, + txns_to_commit: &[TransactionToCommit], first_version: Version, - write_sets: impl IndexedParallelIterator, ) -> Result<()> { let _timer = OTHER_TIMERS_SECONDS .with_label_values(&["commit_write_sets"]) .start_timer(); let batch = SchemaBatch::new(); - let num_txns = write_sets.len(); - - write_sets + let num_txns = txns_to_commit.len(); + txns_to_commit + .par_iter() .with_min_len(optimal_min_len(num_txns, 128)) .enumerate() - .try_for_each(|(i, write_set)| -> Result<()> { - Self::put_write_set(first_version + i as Version, write_set, &batch)?; + .try_for_each(|(i, txn_to_commit)| -> Result<()> { + Self::put_write_set(first_version + i as u64, txn_to_commit.write_set(), &batch)?; Ok(()) })?; diff --git a/storage/aptosdb/src/ledger_db/write_set_db_test.rs b/storage/aptosdb/src/ledger_db/write_set_db_test.rs index e0140b246f847..44c4e9d38870a 100644 --- a/storage/aptosdb/src/ledger_db/write_set_db_test.rs +++ b/storage/aptosdb/src/ledger_db/write_set_db_test.rs @@ -5,9 +5,11 @@ use crate::{ledger_db::WriteSetDb, AptosDB}; use aptos_schemadb::SchemaBatch; use aptos_storage_interface::Result; use aptos_temppath::TempPath; -use aptos_types::{transaction::Version, write_set::WriteSet}; +use aptos_types::{ + transaction::{TransactionToCommit, Version}, + write_set::WriteSet, +}; use proptest::{collection::vec, prelude::*}; -use rayon::prelude::*; proptest! { #![proptest_config(ProptestConfig::with_cases(10))] @@ -111,6 +113,15 @@ fn init_db(write_sets: &[WriteSet], write_set_db: &WriteSetDb) { assert!(write_set_db.get_write_set(0).is_err()); write_set_db - .commit_write_sets(0, write_sets.par_iter()) + .commit_write_sets( + &write_sets + .iter() + .map(|write_set| TransactionToCommit { + write_set: write_set.clone(), + ..TransactionToCommit::dummy() + }) + .collect::>(), + 0, + ) .unwrap(); } diff --git a/storage/aptosdb/src/pruner/state_merkle_pruner/test.rs b/storage/aptosdb/src/pruner/state_merkle_pruner/test.rs index 67e9cca52c185..edf59efde854f 100644 --- a/storage/aptosdb/src/pruner/state_merkle_pruner/test.rs +++ b/storage/aptosdb/src/pruner/state_merkle_pruner/test.rs @@ -63,7 +63,7 @@ fn put_value_set( let enable_sharding = state_store.state_kv_db.enabled_sharding(); state_store .put_value_sets( - &[sharded_value_set], + vec![&sharded_value_set], version, StateStorageUsage::new_untracked(), None, diff --git a/storage/aptosdb/src/state_store/mod.rs b/storage/aptosdb/src/state_store/mod.rs index a62226d3e6328..6196ad2e73e5d 100644 --- a/storage/aptosdb/src/state_store/mod.rs +++ b/storage/aptosdb/src/state_store/mod.rs @@ -614,7 +614,8 @@ impl StateStore { .with_label_values(&["put_writesets"]) .start_timer(); - let value_state_sets: Vec = write_sets + // convert value state sets to hash map reference + let value_state_sets_raw: Vec = write_sets .iter() .map(|ws| { let mut sharded_state_updates = create_empty_sharded_state_updates(); @@ -626,8 +627,10 @@ impl StateStore { }) .collect::>(); + let value_state_sets = value_state_sets_raw.iter().collect::>(); + self.put_stats_and_indices( - &value_state_sets, + value_state_sets.as_slice(), first_version, StateStorageUsage::new_untracked(), None, @@ -639,7 +642,7 @@ impl StateStore { )?; self.put_state_values( - &value_state_sets, + value_state_sets.to_vec(), first_version, sharded_state_kv_batches, enable_sharding, @@ -651,7 +654,7 @@ impl StateStore { /// Put the `value_state_sets` into its own CF. pub fn put_value_sets( &self, - value_state_sets: &[ShardedStateUpdates], + value_state_sets: Vec<&ShardedStateUpdates>, first_version: Version, expected_usage: StateStorageUsage, sharded_state_cache: Option<&ShardedStateCache>, @@ -666,7 +669,7 @@ impl StateStore { .start_timer(); self.put_stats_and_indices( - value_state_sets, + &value_state_sets, first_version, expected_usage, sharded_state_cache, @@ -691,7 +694,7 @@ impl StateStore { pub fn put_state_values( &self, - value_state_sets: &[ShardedStateUpdates], + value_state_sets: Vec<&ShardedStateUpdates>, first_version: Version, sharded_state_kv_batches: &ShardedStateKvSchemaBatch, enable_sharding: bool, @@ -740,7 +743,7 @@ impl StateStore { /// extra stale index as 1 cover the latter case. pub fn put_stats_and_indices( &self, - value_state_sets: &[ShardedStateUpdates], + value_state_sets: &[&ShardedStateUpdates], first_version: Version, expected_usage: StateStorageUsage, // If not None, it must contains all keys in the value_state_sets. diff --git a/storage/aptosdb/src/state_store/state_store_test.rs b/storage/aptosdb/src/state_store/state_store_test.rs index dfb2abb842c5b..97f5727a8b431 100644 --- a/storage/aptosdb/src/state_store/state_store_test.rs +++ b/storage/aptosdb/src/state_store/state_store_test.rs @@ -53,7 +53,7 @@ fn put_value_set( let state_kv_metadata_batch = SchemaBatch::new(); state_store .put_value_sets( - &[sharded_value_set], + vec![&sharded_value_set], version, StateStorageUsage::new_untracked(), None, diff --git a/storage/storage-interface/src/chunk_to_commit.rs b/storage/storage-interface/src/chunk_to_commit.rs index 67c4e778741e4..ef1bd38a81771 100644 --- a/storage/storage-interface/src/chunk_to_commit.rs +++ b/storage/storage-interface/src/chunk_to_commit.rs @@ -4,37 +4,15 @@ use crate::{cached_state_view::ShardedStateCache, state_delta::StateDelta}; use aptos_types::{ state_store::ShardedStateUpdates, - transaction::{Transaction, TransactionInfo, TransactionOutput, Version}, + transaction::{TransactionToCommit, Version}, }; #[derive(Copy, Clone)] pub struct ChunkToCommit<'a> { pub first_version: Version, - pub transactions: &'a [Transaction], - pub transaction_outputs: &'a [TransactionOutput], - pub transaction_infos: &'a [TransactionInfo], pub base_state_version: Option, + pub txns_to_commit: &'a [TransactionToCommit], pub latest_in_memory_state: &'a StateDelta, - pub per_version_state_updates: &'a [ShardedStateUpdates], pub state_updates_until_last_checkpoint: Option<&'a ShardedStateUpdates>, pub sharded_state_cache: Option<&'a ShardedStateCache>, - pub is_reconfig: bool, -} - -impl<'a> ChunkToCommit<'a> { - pub fn len(&self) -> usize { - self.transactions.len() - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - pub fn next_version(&self) -> Version { - self.first_version + self.len() as Version - } - - pub fn expect_last_version(&self) -> Version { - self.next_version() - 1 - } } diff --git a/storage/storage-interface/src/lib.rs b/storage/storage-interface/src/lib.rs index e99b1e017f0dd..e2fd5d8abab02 100644 --- a/storage/storage-interface/src/lib.rs +++ b/storage/storage-interface/src/lib.rs @@ -546,19 +546,24 @@ pub trait DbWriter: Send + Sync { sync_commit: bool, ) -> Result<()> { // For reconfig suffix. - if ledger_info_with_sigs.is_none() && chunk.is_empty() { + if ledger_info_with_sigs.is_none() && chunk.txns_to_commit.is_empty() { return Ok(()); } - if !chunk.is_empty() { + if !chunk.txns_to_commit.is_empty() { self.pre_commit_ledger(chunk, sync_commit)?; } let version_to_commit = if let Some(ledger_info_with_sigs) = ledger_info_with_sigs { ledger_info_with_sigs.ledger_info().version() } else { - chunk.expect_last_version() + // here txns_to_commit is known to be non-empty + chunk.first_version + chunk.txns_to_commit.len() as u64 - 1 }; - self.commit_ledger(version_to_commit, ledger_info_with_sigs, Some(chunk)) + self.commit_ledger( + version_to_commit, + ledger_info_with_sigs, + Some(chunk.txns_to_commit), + ) } /// Optimistically persist transactions to the ledger. @@ -582,7 +587,7 @@ pub trait DbWriter: Send + Sync { &self, version: Version, ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>, - chunk_opt: Option, + txns_to_commit: Option<&[TransactionToCommit]>, ) -> Result<()> { unimplemented!() } diff --git a/types/src/proptest_types.rs b/types/src/proptest_types.rs index fb26fdf7ce988..d668580ab5aac 100644 --- a/types/src/proptest_types.rs +++ b/types/src/proptest_types.rs @@ -1177,7 +1177,7 @@ impl BlockGen { Some(HashValue::random()), ExecutionStatus::Success, ), - arr![HashMap::new(); 16], + arr_macro::arr![HashMap::new(); 16], WriteSet::default(), Vec::new(), false,