diff --git a/storage/aptosdb/src/state_kv_db.rs b/storage/aptosdb/src/state_kv_db.rs index 73bb6594b026c1..a9feed7adc7fae 100644 --- a/storage/aptosdb/src/state_kv_db.rs +++ b/storage/aptosdb/src/state_kv_db.rs @@ -103,8 +103,10 @@ impl StateKvDb { enabled_sharding: true, }; - if let Some(overall_kv_commit_progress) = get_state_kv_commit_progress(&state_kv_db)? { - truncate_state_kv_db_shards(&state_kv_db, overall_kv_commit_progress)?; + if !readonly { + if let Some(overall_kv_commit_progress) = get_state_kv_commit_progress(&state_kv_db)? { + truncate_state_kv_db_shards(&state_kv_db, overall_kv_commit_progress)?; + } } Ok(state_kv_db) diff --git a/storage/aptosdb/src/state_merkle_db.rs b/storage/aptosdb/src/state_merkle_db.rs index b1f5e757d9fb59..b9a255e5beaec6 100644 --- a/storage/aptosdb/src/state_merkle_db.rs +++ b/storage/aptosdb/src/state_merkle_db.rs @@ -599,13 +599,15 @@ impl StateMerkleDb { lru_cache, }; - if let Some(overall_state_merkle_commit_progress) = - get_state_merkle_commit_progress(&state_merkle_db)? - { - truncate_state_merkle_db_shards( - &state_merkle_db, - overall_state_merkle_commit_progress, - )?; + if !readonly { + if let Some(overall_state_merkle_commit_progress) = + get_state_merkle_commit_progress(&state_merkle_db)? + { + truncate_state_merkle_db_shards( + &state_merkle_db, + overall_state_merkle_commit_progress, + )?; + } } Ok(state_merkle_db) diff --git a/storage/backup/backup-cli/src/backup_types/transaction/restore.rs b/storage/backup/backup-cli/src/backup_types/transaction/restore.rs index 62677c6a0dc479..22a7ab7ff8b8bb 100644 --- a/storage/backup/backup-cli/src/backup_types/transaction/restore.rs +++ b/storage/backup/backup-cli/src/backup_types/transaction/restore.rs @@ -25,12 +25,12 @@ use crate::{ }, }; use anyhow::{anyhow, ensure, Result}; -use aptos_db::{backup::restore_handler::RestoreHandler, AptosDB}; +use aptos_db::backup::restore_handler::RestoreHandler; use aptos_executor::chunk_executor::ChunkExecutor; use aptos_executor_types::{ChunkExecutorTrait, TransactionReplayer, VerifyExecutionMode}; use aptos_logger::prelude::*; use aptos_metrics_core::TimerHelper; -use aptos_storage_interface::{cached_state_view::{CachedDbStateView, CachedStateView}, DbReaderWriter}; +use aptos_storage_interface::DbReaderWriter; use aptos_types::{ contract_event::ContractEvent, ledger_info::LedgerInfoWithSignatures, @@ -712,30 +712,3 @@ impl TransactionRestoreBatchController { Ok(()) } } - -#[test] -fn test_replay_with_no_commit() { - use aptos_storage_interface::DbReader; - use aptos_executor::components::chunk_output::ChunkOutput; - let db_root_path = "/tmp/aptosdb_test"; - let aptos_db = AptosDB::new_readonly_for_test(db_root_path); - let start_version = 1000; - let limit = 1000; - // fetch transactions from aptos_db - let txn_info_iter = aptos_db.get_transaction_info_iterator(start_version, limit).unwrap(); - let txn_iter = aptos_db.get_transaction_iterator(start_version, limit).unwrap(); - // create batch of 100 transactions - let state_view = CachedStateView::new_impl(id, snapshot, speculative_state, proof_fetcher) - for chunk in izip!(txn_info_iter, txn_iter).chunks(100) { - let (txn_info_batch, txn_batch) = chunk.map(|(txn_info, txn)| (txn_info.unwrap(), txn.unwrap())).unzip(); - let chunk_output = ChunkOutput::by_transaction_execution::<V>( - txn_batch, - state_view, - BlockExecutorConfigFromOnchain::new_no_block_limit(), - )?; - } - - // execute transaction against the aptos db - - -} \ No newline at end of file diff --git a/storage/db-tool/src/lib.rs b/storage/db-tool/src/lib.rs index 36c55e1493afe2..b777fa1d0d8f37 100644 --- a/storage/db-tool/src/lib.rs +++ b/storage/db-tool/src/lib.rs @@ -7,6 +7,7 @@ mod backup; mod backup_maintenance; mod bootstrap; mod gen_replay_verify_jobs; +mod replay_on_archive; mod replay_verify; pub mod restore; #[cfg(test)] @@ -38,6 +39,8 @@ pub enum DBTool { #[clap(subcommand)] Restore(restore::Command), + + ReplayOnArchive(replay_on_archive::Opt), } impl DBTool { @@ -54,6 +57,7 @@ impl DBTool { }, DBTool::GenReplayVerifyJobs(cmd) => cmd.run().await, DBTool::Restore(cmd) => cmd.run().await, + DBTool::ReplayOnArchive(cmd) => cmd.run().await, } } } diff --git a/storage/db-tool/src/replay_on_archive.rs b/storage/db-tool/src/replay_on_archive.rs new file mode 100644 index 00000000000000..12c2187db4b17f --- /dev/null +++ b/storage/db-tool/src/replay_on_archive.rs @@ -0,0 +1,227 @@ +// Copyright © Aptos Foundation +// Parts of the project are originally copyright © Meta Platforms, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::{bail, Error, Result}; +use aptos_backup_cli::utils::{ReplayConcurrencyLevelOpt, RocksdbOpt}; +use aptos_config::config::{ + StorageDirPaths, BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, + NO_OP_STORAGE_PRUNER_CONFIG, +}; +use aptos_db::AptosDB; +use aptos_executor_types::ParsedTransactionOutput; +use aptos_logger::{debug, info}; +use aptos_storage_interface::{state_view::DbStateViewAtVersion, AptosDbError, DbReader}; +use aptos_types::{ + contract_event::ContractEvent, + transaction::{ + signature_verified_transaction::SignatureVerifiedTransaction, TransactionInfo, + TransactionOutput, Version, + }, + write_set::WriteSet, +}; +use aptos_vm::{AptosVM, VMExecutor}; +use clap::Parser; +use itertools::multizip; +use std::{path::PathBuf, process, sync::Arc, time::Instant}; + +// Replay Verify controller is responsible for providing legit range with start and end versions. +#[derive(Parser)] +pub struct Opt { + #[clap( + long, + help = "The first transaction version required to be replayed and verified" + )] + start_version: Version, + #[clap( + long, + help = "The last transaction version required to be replayed and verified" + )] + end_version: Version, + #[clap(flatten)] + replay_concurrency_level: ReplayConcurrencyLevelOpt, + #[clap(long = "target-db-dir", value_parser)] + pub db_dir: PathBuf, + #[clap(flatten)] + pub rocksdb_opt: RocksdbOpt, + #[clap( + long, + default_value = "2000", + help = "The number of transactions to be replayed in a chunk" + )] + pub chunk_size: usize, +} + +impl Opt { + pub async fn run(self) -> Result<()> { + AptosVM::set_concurrency_level_once(self.replay_concurrency_level.get()); + let replay_start: Instant = Instant::now(); + let aptos_db = AptosDB::open( + StorageDirPaths::from_path(self.db_dir.as_path()), + true, + NO_OP_STORAGE_PRUNER_CONFIG, + self.rocksdb_opt.into(), + false, + BUFFERED_STATE_TARGET_ITEMS, + DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, + None, + )?; + + let backup_handler = aptos_db.get_backup_handler(); + let arc_db = Arc::new(aptos_db) as Arc<dyn DbReader>; + let (start, limit) = + Self::get_start_and_limit(&arc_db, self.start_version, self.end_version)?; + info!( + start_version = start, + end_version = start + limit, + "Replaying transactions." + ); + let mut failed_txns = Vec::new(); + let txn_iter = backup_handler.get_transaction_iter(start, limit as usize)?; + let mut cur_txns = Vec::new(); + let mut expected_events = Vec::new(); + let mut expected_writesets = Vec::new(); + let mut expected_txn_infos = Vec::new(); + let mut chunk_start_version = start; + for (idx, item) in txn_iter.enumerate() { + let (input_txn, expected_txn_info, expected_event, expected_writeset) = item?; + let is_epoch_ending = ParsedTransactionOutput::parse_reconfig_events(&expected_event) + .next() + .is_some(); + cur_txns.push(input_txn); + expected_txn_infos.push(expected_txn_info); + expected_events.push(expected_event); + expected_writesets.push(expected_writeset); + if is_epoch_ending || cur_txns.len() >= self.chunk_size { + let executed_outputs = AptosVM::execute_block_no_limit( + cur_txns + .iter() + .map(|txn| SignatureVerifiedTransaction::from(txn.clone())) + .collect::<Vec<_>>() + .as_slice(), + &arc_db.state_view_at_version(chunk_start_version.checked_sub(1))?, + )?; + // verify results + let fail_txns = Self::verify_execution_results( + chunk_start_version, + &expected_txn_infos, + &expected_events, + &expected_writesets, + &executed_outputs, + )?; + // collect failed transactions + failed_txns.extend(fail_txns); + + // empty for the new chunk + chunk_start_version = start + (idx as u64) + 1; + cur_txns.clear(); + expected_txn_infos.clear(); + expected_events.clear(); + expected_writesets.clear(); + info!( + version = start + idx as u64, + accumulative_tps = ((idx as f64) / replay_start.elapsed().as_secs_f64()) as u64, + "Transactions verified." + ); + } + } + // Replay the remaining txns + let executed_outputs = AptosVM::execute_block_no_limit( + cur_txns + .iter() + .map(|txn| SignatureVerifiedTransaction::from(txn.clone())) + .collect::<Vec<_>>() + .as_slice(), + &arc_db.state_view_at_version(Some(chunk_start_version - 1))?, + )?; + let fail_txns = Self::verify_execution_results( + chunk_start_version, + &expected_txn_infos, + &expected_events, + &expected_writesets, + &executed_outputs, + )?; + info!( + version = start + limit, + accumulative_tps = ((limit as f64) / replay_start.elapsed().as_secs_f64()) as u64, + "Transactions verified." + ); + + failed_txns.extend(fail_txns); + + if !failed_txns.is_empty() { + debug!("Failed transactions: {:?}", failed_txns); + process::exit(2); + } + Ok(()) + } + + fn verify_execution_results( + start_version: Version, + expected_txn_infos: &Vec<TransactionInfo>, + expected_epoch_events: &Vec<Vec<ContractEvent>>, + expected_epoch_writesets: &Vec<WriteSet>, + executed_outputs: &Vec<TransactionOutput>, + ) -> Result<Vec<Error>> { + let mut failed_txns = Vec::new(); + let mut version = start_version; + for (idx, (expected_txn_info, expected_events, expected_writeset, executed_output)) in + multizip(( + expected_txn_infos, + expected_epoch_events, + expected_epoch_writesets, + executed_outputs, + )) + .enumerate() + { + version = start_version + idx as Version; + if let Err(err) = executed_output.ensure_match_transaction_info( + version, + expected_txn_info, + Some(expected_writeset), + Some(expected_events), + ) { + failed_txns.push(err); + } + } + + if (version + 1 - start_version) as usize != expected_txn_infos.len() { + bail!( + "processed transaction count {} is not equal to expected transaction count {}", + version + 1 - start_version, + expected_txn_infos.len() + ); + } + Ok(failed_txns) + } + + fn get_start_and_limit( + aptos_db: &Arc<dyn DbReader>, + start_version: Version, + end_version: Version, + ) -> Result<(Version, Version)> { + let start_version = std::cmp::max( + aptos_db + .get_first_txn_version()? + .ok_or(AptosDbError::NotFound( + "First txn version is None".to_string(), + ))?, + start_version, + ); + + let end_version = std::cmp::min( + aptos_db + .get_synced_version()? + .ok_or(AptosDbError::NotFound("Synced version is None".to_string()))?, + end_version, + ); + assert!( + start_version <= end_version, + "start_version {} must be less than or equal to end_version{}", + start_version, + end_version + ); + let limit = end_version - start_version; + Ok((start_version, limit)) + } +}