diff --git a/execution/executor-types/src/ledger_update_output.rs b/execution/executor-types/src/ledger_update_output.rs index 9bb7f58685447..beb86a5dd2959 100644 --- a/execution/executor-types/src/ledger_update_output.rs +++ b/execution/executor-types/src/ledger_update_output.rs @@ -11,7 +11,7 @@ use aptos_types::{ contract_event::ContractEvent, epoch_state::EpochState, proof::accumulator::InMemoryTransactionAccumulator, - state_store::{combine_or_add_sharded_state_updates, ShardedStateUpdates}, + state_store::ShardedStateUpdates, transaction::{ block_epilogue::BlockEndInfo, TransactionInfo, TransactionStatus, TransactionToCommit, Version, @@ -119,35 +119,6 @@ impl LedgerUpdateOutput { ) } - pub fn combine(&mut self, rhs: Self) { - assert!(self.block_end_info.is_none()); - assert!(rhs.block_end_info.is_none()); - let Self { - statuses_for_input_txns, - to_commit, - subscribable_events, - transaction_info_hashes, - state_updates_until_last_checkpoint: state_updates_before_last_checkpoint, - sharded_state_cache, - transaction_accumulator, - block_end_info: _block_end_info, - } = rhs; - - if let Some(updates) = state_updates_before_last_checkpoint { - combine_or_add_sharded_state_updates( - &mut self.state_updates_until_last_checkpoint, - updates, - ); - } - - self.statuses_for_input_txns.extend(statuses_for_input_txns); - self.to_commit.extend(to_commit); - self.subscribable_events.extend(subscribable_events); - self.transaction_info_hashes.extend(transaction_info_hashes); - self.sharded_state_cache.combine(sharded_state_cache); - self.transaction_accumulator = transaction_accumulator; - } - pub fn next_version(&self) -> Version { self.transaction_accumulator.num_leaves() as Version } diff --git a/execution/executor-types/src/lib.rs b/execution/executor-types/src/lib.rs index 8a3b9ee9a9fb3..8ae1701243ca2 100644 --- a/execution/executor-types/src/lib.rs +++ b/execution/executor-types/src/lib.rs @@ -259,14 +259,14 @@ impl VerifyExecutionMode { } pub trait TransactionReplayer: Send { - fn replay( + fn enqueue_chunks( &self, transactions: Vec, transaction_infos: Vec, write_sets: Vec, event_vecs: Vec>, verify_execution_mode: &VerifyExecutionMode, - ) -> Result<()>; + ) -> Result; fn commit(&self) -> Result; } diff --git a/execution/executor/src/chunk_executor.rs b/execution/executor/src/chunk_executor.rs index fbe4d1f8d1b32..d54497c000ca9 100644 --- a/execution/executor/src/chunk_executor.rs +++ b/execution/executor/src/chunk_executor.rs @@ -6,10 +6,10 @@ use crate::{ components::{ - apply_chunk_output::{ensure_no_discard, ensure_no_retry, ApplyChunkOutput}, + apply_chunk_output::ApplyChunkOutput, chunk_commit_queue::{ChunkCommitQueue, ChunkToUpdateLedger}, chunk_output::ChunkOutput, - chunk_result_verifier::{ChunkResultVerifier, StateSyncChunkVerifier}, + chunk_result_verifier::{ChunkResultVerifier, ReplayChunkVerifier, StateSyncChunkVerifier}, executed_chunk::ExecutedChunk, transaction_chunk::{ChunkToApply, ChunkToExecute, TransactionChunk}, }, @@ -28,7 +28,7 @@ use aptos_logger::prelude::*; use aptos_metrics_core::{IntGaugeHelper, TimerHelper}; use aptos_storage_interface::{ async_proof_fetcher::AsyncProofFetcher, cached_state_view::CachedStateView, - state_delta::StateDelta, DbReaderWriter, ExecutedTrees, + state_delta::StateDelta, DbReaderWriter, }; use aptos_types::{ block_executor::config::BlockExecutorConfigFromOnchain, @@ -93,6 +93,10 @@ impl ChunkExecutor { error }) } + + pub fn is_empty(&self) -> bool { + self.with_inner(|inner| Ok(inner.is_empty())).unwrap() + } } impl ChunkExecutorTrait for ChunkExecutor { @@ -279,6 +283,10 @@ impl ChunkExecutorInner { Ok(chunk) } + fn is_empty(&self) -> bool { + self.commit_queue.lock().is_empty() + } + // ************************* Chunk Executor Implementation ************************* fn enqueue_chunk( &self, @@ -401,24 +409,28 @@ impl ChunkExecutorInner { } impl TransactionReplayer for ChunkExecutor { - fn replay( + fn enqueue_chunks( &self, transactions: Vec, transaction_infos: Vec, write_sets: Vec, event_vecs: Vec>, verify_execution_mode: &VerifyExecutionMode, - ) -> Result<()> { + ) -> Result { let _guard = CONCURRENCY_GAUGE.concurrency_with(&["replayer", "replay"]); self.maybe_initialize()?; - self.inner.read().as_ref().expect("not reset").replay( - transactions, - transaction_infos, - write_sets, - event_vecs, - verify_execution_mode, - ) + self.inner + .read() + .as_ref() + .expect("not reset") + .enqueue_chunks( + transactions, + transaction_infos, + write_sets, + event_vecs, + verify_execution_mode, + ) } fn commit(&self) -> Result { @@ -428,19 +440,18 @@ impl TransactionReplayer for ChunkExecutor { } } -impl TransactionReplayer for ChunkExecutorInner { - fn replay( +impl ChunkExecutorInner { + fn enqueue_chunks( &self, mut transactions: Vec, mut transaction_infos: Vec, mut write_sets: Vec, mut event_vecs: Vec>, verify_execution_mode: &VerifyExecutionMode, - ) -> Result<()> { + ) -> Result { let started = Instant::now(); let num_txns = transactions.len(); - let mut latest_view = self.commit_queue.lock().expect_latest_view()?; - let chunk_begin = latest_view.num_transactions() as Version; + let chunk_begin = self.commit_queue.lock().expecting_version(); let chunk_end = chunk_begin + num_txns as Version; // right-exclusive // Find epoch boundaries. @@ -459,12 +470,10 @@ impl TransactionReplayer for ChunkExecutorInner { epochs.push((epoch_begin, chunk_end)); } - let mut executed_chunk = None; + let mut chunks_enqueued = 0; // Replay epoch by epoch. for (begin, end) in epochs { - self.remove_and_replay_epoch( - &mut executed_chunk, - &mut latest_view, + chunks_enqueued += self.remove_and_replay_epoch( &mut transactions, &mut transaction_infos, &mut write_sets, @@ -475,16 +484,13 @@ impl TransactionReplayer for ChunkExecutorInner { )?; } - self.commit_queue - .lock() - .enqueue_chunk_to_commit_directly(executed_chunk.expect("Nothing to commit."))?; info!( num_txns = num_txns, tps = (num_txns as f64 / started.elapsed().as_secs_f64()), "TransactionReplayer::replay() OK" ); - Ok(()) + Ok(chunks_enqueued) } fn commit(&self) -> Result { @@ -504,16 +510,12 @@ impl TransactionReplayer for ChunkExecutorInner { .current_version .expect("Version must exist after commit.")) } -} -impl ChunkExecutorInner { /// Remove `end_version - begin_version` transactions from the mutable input arguments and replay. /// The input range indicated by `[begin_version, end_version]` is guaranteed not to cross epoch boundaries. /// Notice there can be known broken versions inside the range. fn remove_and_replay_epoch( &self, - executed_chunk: &mut Option, - latest_view: &mut ExecutedTrees, transactions: &mut Vec, transaction_infos: &mut Vec, write_sets: &mut Vec, @@ -521,21 +523,21 @@ impl ChunkExecutorInner { begin_version: Version, end_version: Version, verify_execution_mode: &VerifyExecutionMode, - ) -> Result<()> { + ) -> Result { // we try to apply the txns in sub-batches split by known txns to skip and the end of the batch let txns_to_skip = verify_execution_mode.txns_to_skip(); let mut batch_ends = txns_to_skip .range(begin_version..end_version) .chain(once(&end_version)); + let mut chunks_enqueued = 0; + let mut batch_begin = begin_version; let mut batch_end = *batch_ends.next().unwrap(); while batch_begin < end_version { if batch_begin == batch_end { // batch_end is a known broken version that won't pass execution verification self.remove_and_apply( - executed_chunk, - latest_view, transactions, transaction_infos, write_sets, @@ -543,6 +545,7 @@ impl ChunkExecutorInner { batch_begin, batch_begin + 1, )?; + chunks_enqueued += 1; info!( version_skipped = batch_begin, "Skipped known broken transaction, applied transaction output directly." @@ -555,7 +558,6 @@ impl ChunkExecutorInner { // Try to run the transactions with the VM let next_begin = if verify_execution_mode.should_verify() { self.verify_execution( - latest_view, transactions, transaction_infos, write_sets, @@ -568,8 +570,6 @@ impl ChunkExecutorInner { batch_end }; self.remove_and_apply( - executed_chunk, - latest_view, transactions, transaction_infos, write_sets, @@ -577,15 +577,15 @@ impl ChunkExecutorInner { batch_begin, next_begin, )?; + chunks_enqueued += 1; batch_begin = next_begin; } - Ok(()) + Ok(chunks_enqueued) } fn verify_execution( &self, - latest_view: &mut ExecutedTrees, transactions: &[Transaction], transaction_infos: &[TransactionInfo], write_sets: &[WriteSet], @@ -595,7 +595,7 @@ impl ChunkExecutorInner { verify_execution_mode: &VerifyExecutionMode, ) -> Result { // Execute transactions. - let state_view = self.latest_state_view(latest_view.state())?; + let state_view = self.latest_state_view(&self.commit_queue.lock().latest_state())?; let txns = transactions .iter() .take((end_version - begin_version) as usize) @@ -639,8 +639,6 @@ impl ChunkExecutorInner { /// It's guaranteed that there's no known broken versions or epoch endings in the range. fn remove_and_apply( &self, - executed_chunk: &mut Option, - latest_view: &mut ExecutedTrees, transactions: &mut Vec, transaction_infos: &mut Vec, write_sets: &mut Vec, @@ -650,7 +648,7 @@ impl ChunkExecutorInner { ) -> Result<()> { let num_txns = (end_version - begin_version) as usize; let txn_infos: Vec<_> = transaction_infos.drain(..num_txns).collect(); - let (txns, txn_outs) = multizip(( + let (transactions, transaction_outputs) = multizip(( transactions.drain(..num_txns), txn_infos.iter(), write_sets.drain(..num_txns), @@ -670,28 +668,16 @@ impl ChunkExecutorInner { }) .unzip(); - let state_view = self.latest_state_view(latest_view.state())?; - let chunk_output = ChunkOutput::by_transaction_output(txns, txn_outs, state_view)?; - let (executed_batch, to_discard, to_retry) = chunk_output.apply_to_ledger( - latest_view, - Some( - txn_infos - .iter() - .map(|txn_info| txn_info.state_checkpoint_hash()) - .collect(), - ), - )?; - ensure_no_discard(to_discard)?; - ensure_no_retry(to_retry)?; - executed_batch - .ledger_update_output - .ensure_transaction_infos_match(&txn_infos)?; - - match executed_chunk { - Some(chunk) => chunk.combine(executed_batch), - None => *executed_chunk = Some(executed_batch), - } - *latest_view = executed_chunk.as_ref().unwrap().result_view(); + let chunk = ChunkToApply { + transactions, + transaction_outputs, + first_version: begin_version, + }; + let chunk_verifier = Arc::new(ReplayChunkVerifier { + transaction_infos: txn_infos, + }); + self.enqueue_chunk(chunk, chunk_verifier, "replay")?; + Ok(()) } } diff --git a/execution/executor/src/components/chunk_commit_queue.rs b/execution/executor/src/components/chunk_commit_queue.rs index 54c69a2f61afe..d796079bf812e 100644 --- a/execution/executor/src/components/chunk_commit_queue.rs +++ b/execution/executor/src/components/chunk_commit_queue.rs @@ -66,18 +66,7 @@ impl ChunkCommitQueue { } pub(crate) fn expecting_version(&self) -> Version { - self.latest_txn_accumulator.num_leaves() - } - - pub(crate) fn expect_latest_view(&self) -> Result { - ensure!( - self.to_update_ledger.is_empty(), - "Pending chunk to update_ledger, can't construct latest ExecutedTrees." - ); - Ok(ExecutedTrees::new( - self.latest_state.clone(), - self.latest_txn_accumulator.clone(), - )) + self.latest_state.next_version() } pub(crate) fn enqueue_for_ledger_update( @@ -130,17 +119,6 @@ impl ChunkCommitQueue { Ok((self.persisted_state.clone(), chunk)) } - pub(crate) fn enqueue_chunk_to_commit_directly(&mut self, chunk: ExecutedChunk) -> Result<()> { - ensure!( - self.to_update_ledger.is_empty(), - "Mixed usage of different modes." - ); - self.latest_state = chunk.result_state.clone(); - self.latest_txn_accumulator = chunk.ledger_update_output.transaction_accumulator.clone(); - self.to_commit.push_back(Some(chunk)); - Ok(()) - } - pub(crate) fn dequeue_committed(&mut self, latest_state: StateDelta) -> Result<()> { ensure!(!self.to_commit.is_empty(), "to_commit is empty."); ensure!( @@ -154,4 +132,8 @@ impl ChunkCommitQueue { .log_generation("commit_queue_base"); Ok(()) } + + pub(crate) fn is_empty(&self) -> bool { + self.to_commit.is_empty() && self.to_update_ledger.is_empty() + } } diff --git a/execution/executor/src/components/chunk_result_verifier.rs b/execution/executor/src/components/chunk_result_verifier.rs index d9723dc0c57b5..5cc222163c99f 100644 --- a/execution/executor/src/components/chunk_result_verifier.rs +++ b/execution/executor/src/components/chunk_result_verifier.rs @@ -133,3 +133,29 @@ impl ChunkResultVerifier for StateSyncChunkVerifier { } } } + +pub struct ReplayChunkVerifier { + pub transaction_infos: Vec, +} + +impl ChunkResultVerifier for ReplayChunkVerifier { + fn verify_chunk_result( + &self, + _parent_accumulator: &InMemoryTransactionAccumulator, + ledger_update_output: &LedgerUpdateOutput, + ) -> Result<()> { + ledger_update_output.ensure_transaction_infos_match(&self.transaction_infos) + } + + fn transaction_infos(&self) -> &[TransactionInfo] { + &self.transaction_infos + } + + fn maybe_select_chunk_ending_ledger_info( + &self, + _ledger_update_output: &LedgerUpdateOutput, + _next_epoch_state: Option<&EpochState>, + ) -> Result> { + Ok(None) + } +} diff --git a/execution/executor/src/components/executed_chunk.rs b/execution/executor/src/components/executed_chunk.rs index aa9872dc5c3b5..28e8e68fcf87e 100644 --- a/execution/executor/src/components/executed_chunk.rs +++ b/execution/executor/src/components/executed_chunk.rs @@ -15,7 +15,7 @@ use aptos_types::account_config::NewEpochEvent; use aptos_types::contract_event::ContractEvent; use aptos_types::{ epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures, - state_store::combine_or_add_sharded_state_updates, transaction::TransactionToCommit, + transaction::TransactionToCommit, }; #[derive(Debug)] @@ -28,16 +28,6 @@ pub struct ExecutedChunk { } impl ExecutedChunk { - pub fn reconfig_suffix(&self) -> Self { - assert!(self.next_epoch_state.is_some()); - Self { - result_state: self.result_state.clone(), - ledger_info: None, - next_epoch_state: self.next_epoch_state.clone(), - ledger_update_output: self.ledger_update_output.reconfig_suffix(), - } - } - pub fn transactions_to_commit(&self) -> &Vec { &self.ledger_update_output.to_commit } @@ -46,37 +36,6 @@ impl ExecutedChunk { self.next_epoch_state.is_some() } - pub fn combine(&mut self, rhs: Self) { - assert_eq!( - self.ledger_update_output.next_version(), - rhs.ledger_update_output.first_version(), - "Chunks to be combined are not consecutive.", - ); - let Self { - result_state, - ledger_info, - next_epoch_state, - ledger_update_output, - } = rhs; - - let old_result_state = self.result_state.replace_with(result_state); - // TODO(aldenhu): This is very unfortunate. Will revisit soon by remodeling the state diff. - if self.result_state.base_version > old_result_state.base_version - && old_result_state.base_version != old_result_state.current_version - { - combine_or_add_sharded_state_updates( - &mut self - .ledger_update_output - .state_updates_until_last_checkpoint, - old_result_state.updates_since_base, - ) - } - - self.ledger_info = ledger_info; - self.next_epoch_state = next_epoch_state; - self.ledger_update_output.combine(ledger_update_output) - } - pub fn result_view(&self) -> ExecutedTrees { ExecutedTrees::new( self.result_state.clone(), diff --git a/execution/executor/src/tests/mod.rs b/execution/executor/src/tests/mod.rs index d7c65868196a3..c50d2480a1b84 100644 --- a/execution/executor/src/tests/mod.rs +++ b/execution/executor/src/tests/mod.rs @@ -14,7 +14,8 @@ use crate::{ use aptos_crypto::{ed25519::Ed25519PrivateKey, HashValue, PrivateKey, SigningKey, Uniform}; use aptos_db::AptosDB; use aptos_executor_types::{ - BlockExecutorTrait, LedgerUpdateOutput, TransactionReplayer, VerifyExecutionMode, + BlockExecutorTrait, ChunkExecutorTrait, LedgerUpdateOutput, TransactionReplayer, + VerifyExecutionMode, }; use aptos_storage_interface::{ async_proof_fetcher::AsyncProofFetcher, DbReaderWriter, ExecutedTrees, Result, @@ -27,7 +28,6 @@ use aptos_types::{ bytes::NumToBytes, chain_id::ChainId, ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, - proof::definition::LeafCount, state_store::{state_key::StateKey, state_value::StateValue, StateViewId}, test_helpers::transaction_test_helpers::{block, TEST_BLOCK_EXECUTOR_ONCHAIN_CONFIG}, transaction::{ @@ -38,6 +38,7 @@ use aptos_types::{ }, write_set::{WriteOp, WriteSet, WriteSetMut}, }; +use itertools::Itertools; use proptest::prelude::*; use std::{iter::once, sync::Arc}; @@ -741,23 +742,23 @@ fn run_transactions_naive( } proptest! { -#![proptest_config(ProptestConfig::with_cases(5))] + #![proptest_config(ProptestConfig::with_cases(5))] -#[test] -#[cfg_attr(feature = "consensus-only-perf-test", ignore)] -fn test_reconfiguration_with_retry_transaction_status( - (num_user_txns, reconfig_txn_index) in (2..5u64).prop_flat_map(|num_user_txns| { - ( - Just(num_user_txns), - 0..num_user_txns - 1 // avoid state checkpoint right after reconfig - ) + #[test] + #[cfg_attr(feature = "consensus-only-perf-test", ignore)] + fn test_reconfiguration_with_retry_transaction_status( + (num_user_txns, reconfig_txn_index) in (2..5usize).prop_flat_map(|num_user_txns| { + ( + Just(num_user_txns), + 0..num_user_txns - 1 // avoid state checkpoint right after reconfig + ) }).no_shrink()) { let executor = TestExecutor::new(); let block_id = gen_block_id(1); - let mut block = TestBlock::new(num_user_txns, 10, block_id); - let num_input_txns = block.txns.len() as LeafCount; - block.txns[reconfig_txn_index as usize] = encode_reconfiguration_transaction().into(); + let mut block = TestBlock::new(num_user_txns as u64, 10, block_id); + let num_input_txns = block.txns.len(); + block.txns[reconfig_txn_index] = encode_reconfiguration_transaction().into(); let parent_block_id = executor.committed_block_id(); let output = executor.execute_block( @@ -768,34 +769,52 @@ fn test_reconfiguration_with_retry_transaction_status( let retry_iter = output.compute_status_for_input_txns().iter() .skip_while(|status| matches!(*status, TransactionStatus::Keep(_))); prop_assert_eq!( - retry_iter.take_while(|status| matches!(*status,TransactionStatus::Retry)).count() as u64, + retry_iter.take_while(|status| matches!(*status,TransactionStatus::Retry)).count(), num_input_txns - reconfig_txn_index - 1 ); // commit - let ledger_info = gen_ledger_info(reconfig_txn_index + 1 /* version */, output.root_hash(), block_id, 1 /* timestamp */); + let ledger_info = gen_ledger_info( + reconfig_txn_index as Version + 1 /* version */, + output.root_hash(), + block_id, + 1 /* timestamp */ + ); executor.commit_blocks(vec![block_id], ledger_info).unwrap(); let parent_block_id = executor.committed_block_id(); // retry txns after reconfiguration + let retry_txns = block.txns.iter().skip(reconfig_txn_index + 1).cloned().collect_vec(); let retry_block_id = gen_block_id(2); let retry_output = executor.execute_block( - (retry_block_id, block.txns.iter().skip(reconfig_txn_index as usize + 1).cloned().collect::>()).into(), parent_block_id, TEST_BLOCK_EXECUTOR_ONCHAIN_CONFIG + ( retry_block_id, retry_txns ).into(), + parent_block_id, + TEST_BLOCK_EXECUTOR_ONCHAIN_CONFIG ).unwrap(); prop_assert!(retry_output.compute_status_for_input_txns().iter().all(|s| matches!(*s, TransactionStatus::Keep(_)))); // Second block has StateCheckpoint/BlockPrologue transaction added. - let ledger_version = num_input_txns + 1; + let ledger_version = num_input_txns as Version + 1; // commit - let ledger_info = gen_ledger_info(ledger_version, retry_output.root_hash(), retry_block_id, 12345 /* timestamp */); + let ledger_info = gen_ledger_info( + ledger_version, + retry_output.root_hash(), + retry_block_id, + 12345 /* timestamp */ + ); executor.commit_blocks(vec![retry_block_id], ledger_info).unwrap(); // get txn_infos from db let db = executor.db.reader.clone(); prop_assert_eq!(db.expect_synced_version(), ledger_version); - let txn_list = db.get_transactions(1 /* start version */, ledger_version, ledger_version /* ledger version */, false /* fetch events */).unwrap(); - prop_assert_eq!(&block.inner_txns(), &txn_list.transactions[..num_input_txns as usize]); + let txn_list = db.get_transactions( + 1, /* start version */ + ledger_version, /* version */ + ledger_version, /* ledger version */ + false /* fetch events */ + ).unwrap(); + prop_assert_eq!(&block.inner_txns(), &txn_list.transactions[..num_input_txns]); let txn_infos = txn_list.proof.transaction_infos; let write_sets = db.get_write_set_iterator(1, ledger_version).unwrap().collect::>().unwrap(); let event_vecs = db.get_events_iterator(1, ledger_version).unwrap().collect::>().unwrap(); @@ -803,8 +822,20 @@ fn test_reconfiguration_with_retry_transaction_status( // replay txns in one batch across epoch boundary, // and the replayer should deal with `Retry`s automatically let replayer = chunk_executor_tests::TestExecutor::new(); - replayer.executor.replay(txn_list.transactions, txn_infos, write_sets, event_vecs, &VerifyExecutionMode::verify_all()).unwrap(); + let chunks_enqueued = replayer.executor.enqueue_chunks( + txn_list.transactions, + txn_infos, + write_sets, + event_vecs, + &VerifyExecutionMode::verify_all() + ).unwrap(); + assert_eq!(chunks_enqueued, 2); + replayer.executor.update_ledger().unwrap(); + replayer.executor.update_ledger().unwrap(); + + replayer.executor.commit().unwrap(); replayer.executor.commit().unwrap(); + prop_assert!(replayer.executor.is_empty()); let replayed_db = replayer.db.reader.clone(); prop_assert_eq!( replayed_db.get_accumulator_root_hash(ledger_version).unwrap(), diff --git a/storage/backup/backup-cli/src/backup_types/transaction/restore.rs b/storage/backup/backup-cli/src/backup_types/transaction/restore.rs index c1eadb5366ff4..22a7ab7ff8b8b 100644 --- a/storage/backup/backup-cli/src/backup_types/transaction/restore.rs +++ b/storage/backup/backup-cli/src/backup_types/transaction/restore.rs @@ -27,8 +27,9 @@ use crate::{ use anyhow::{anyhow, ensure, Result}; use aptos_db::backup::restore_handler::RestoreHandler; use aptos_executor::chunk_executor::ChunkExecutor; -use aptos_executor_types::{TransactionReplayer, VerifyExecutionMode}; +use aptos_executor_types::{ChunkExecutorTrait, TransactionReplayer, VerifyExecutionMode}; use aptos_logger::prelude::*; +use aptos_metrics_core::TimerHelper; use aptos_storage_interface::DbReaderWriter; use aptos_types::{ contract_event::ContractEvent, @@ -592,7 +593,7 @@ impl TransactionRestoreBatchController { let replay_start = Instant::now(); let db = DbReaderWriter::from_arc(Arc::clone(&restore_handler.aptosdb)); let chunk_replayer = Arc::new(ChunkExecutor::::new(db)); - let db_commit_stream = txns_to_execute_stream + let ledger_update_stream = txns_to_execute_stream .try_chunks(BATCH_SIZE) .err_into::() .map_ok(|chunk| { @@ -602,11 +603,10 @@ impl TransactionRestoreBatchController { let verify_execution_mode = self.verify_execution_mode.clone(); async move { - let _timer = OTHER_TIMERS_SECONDS - .with_label_values(&["replay_txn_chunk"]) - .start_timer(); + let _timer = OTHER_TIMERS_SECONDS.timer_with(&["enqueue_chunks"]); + tokio::task::spawn_blocking(move || { - chunk_replayer.replay( + chunk_replayer.enqueue_chunks( txns, txn_infos, write_sets, @@ -614,22 +614,38 @@ impl TransactionRestoreBatchController { &verify_execution_mode, ) }) - .err_into::() .await + .expect("spawn_blocking failed") } }) - .try_buffered_x(self.global_opt.concurrent_downloads, 1) - .and_then(future::ready); + .try_buffered_x(3, 1) + .map_ok(|chunks_enqueued| { + futures::stream::repeat_with(|| Result::Ok(())).take(chunks_enqueued) + }) + .try_flatten(); + + let db_commit_stream = ledger_update_stream + .map_ok(|()| { + let chunk_replayer = chunk_replayer.clone(); + async move { + let _timer = OTHER_TIMERS_SECONDS.timer_with(&["ledger_update"]); + + tokio::task::spawn_blocking(move || chunk_replayer.update_ledger()) + .await + .expect("spawn_blocking failed") + } + }) + .try_buffered_x(3, 1); let total_replayed = db_commit_stream .and_then(|()| { let chunk_replayer = chunk_replayer.clone(); async move { - let _timer = OTHER_TIMERS_SECONDS - .with_label_values(&["commit_txn_chunk"]) - .start_timer(); + let _timer = OTHER_TIMERS_SECONDS.timer_with(&["commit"]); + tokio::task::spawn_blocking(move || { let v = chunk_replayer.commit()?; + let total_replayed = v - first_version + 1; TRANSACTION_REPLAY_VERSION.set(v as i64); info!( @@ -639,13 +655,17 @@ impl TransactionRestoreBatchController { as u64, "Transactions replayed." ); - Ok(v) + Ok(total_replayed) }) - .await? + .await + .expect("spawn_blocking failed") } }) - .try_fold(0, |_total, total| future::ok(total)) + .try_fold(0, |_prev_total, total| future::ok(total)) .await?; + // assert all chunks are fully processed and in DB. + assert!(chunk_replayer.is_empty()); + info!( total_replayed = total_replayed, accumulative_tps =