From e6c765001e1989029c1748c4ef40c1a2fe23ed6a Mon Sep 17 00:00:00 2001 From: Bo Wu Date: Wed, 16 Oct 2024 21:19:21 -0700 Subject: [PATCH] add poc for replay without commit save progress --- Cargo.lock | 1 + storage/aptosdb/src/state_kv_db.rs | 6 +- storage/aptosdb/src/state_merkle_db.rs | 16 +- storage/db-tool/Cargo.toml | 1 + storage/db-tool/src/lib.rs | 4 + storage/db-tool/src/replay_on_archive.rs | 318 +++++++++++++++++++++++ 6 files changed, 337 insertions(+), 9 deletions(-) create mode 100644 storage/db-tool/src/replay_on_archive.rs diff --git a/Cargo.lock b/Cargo.lock index c7d752ed1fca8..beb0c8bca9838 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1253,6 +1253,7 @@ dependencies = [ "bcs 0.1.4", "clap 4.4.14", "itertools 0.13.0", + "rayon", "serde_json", "tokio", ] diff --git a/storage/aptosdb/src/state_kv_db.rs b/storage/aptosdb/src/state_kv_db.rs index 73bb6594b026c..a9feed7adc7fa 100644 --- a/storage/aptosdb/src/state_kv_db.rs +++ b/storage/aptosdb/src/state_kv_db.rs @@ -103,8 +103,10 @@ impl StateKvDb { enabled_sharding: true, }; - if let Some(overall_kv_commit_progress) = get_state_kv_commit_progress(&state_kv_db)? { - truncate_state_kv_db_shards(&state_kv_db, overall_kv_commit_progress)?; + if !readonly { + if let Some(overall_kv_commit_progress) = get_state_kv_commit_progress(&state_kv_db)? { + truncate_state_kv_db_shards(&state_kv_db, overall_kv_commit_progress)?; + } } Ok(state_kv_db) diff --git a/storage/aptosdb/src/state_merkle_db.rs b/storage/aptosdb/src/state_merkle_db.rs index b1f5e757d9fb5..b9a255e5beaec 100644 --- a/storage/aptosdb/src/state_merkle_db.rs +++ b/storage/aptosdb/src/state_merkle_db.rs @@ -599,13 +599,15 @@ impl StateMerkleDb { lru_cache, }; - if let Some(overall_state_merkle_commit_progress) = - get_state_merkle_commit_progress(&state_merkle_db)? - { - truncate_state_merkle_db_shards( - &state_merkle_db, - overall_state_merkle_commit_progress, - )?; + if !readonly { + if let Some(overall_state_merkle_commit_progress) = + get_state_merkle_commit_progress(&state_merkle_db)? + { + truncate_state_merkle_db_shards( + &state_merkle_db, + overall_state_merkle_commit_progress, + )?; + } } Ok(state_merkle_db) diff --git a/storage/db-tool/Cargo.toml b/storage/db-tool/Cargo.toml index c4859b5f4ad76..b2feeb952750d 100644 --- a/storage/db-tool/Cargo.toml +++ b/storage/db-tool/Cargo.toml @@ -26,6 +26,7 @@ aptos-vm = { workspace = true } bcs = { workspace = true } clap = { workspace = true } itertools = { workspace = true } +rayon = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } diff --git a/storage/db-tool/src/lib.rs b/storage/db-tool/src/lib.rs index 36c55e1493afe..72a68b5eceabc 100644 --- a/storage/db-tool/src/lib.rs +++ b/storage/db-tool/src/lib.rs @@ -7,6 +7,7 @@ mod backup; mod backup_maintenance; mod bootstrap; mod gen_replay_verify_jobs; +mod replay_on_archive; mod replay_verify; pub mod restore; #[cfg(test)] @@ -38,6 +39,8 @@ pub enum DBTool { #[clap(subcommand)] Restore(restore::Command), + + ReplayOnArchive(replay_on_archive::Opt), } impl DBTool { @@ -54,6 +57,7 @@ impl DBTool { }, DBTool::GenReplayVerifyJobs(cmd) => cmd.run().await, DBTool::Restore(cmd) => cmd.run().await, + DBTool::ReplayOnArchive(cmd) => cmd.run().await.map_err(anyhow::Error::from), } } } diff --git a/storage/db-tool/src/replay_on_archive.rs b/storage/db-tool/src/replay_on_archive.rs new file mode 100644 index 0000000000000..b5a101d031916 --- /dev/null +++ b/storage/db-tool/src/replay_on_archive.rs @@ -0,0 +1,318 @@ +// Copyright © Aptos Foundation +// Parts of the project are originally copyright © Meta Platforms, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::{bail, Error, Result}; +use aptos_backup_cli::utils::{ReplayConcurrencyLevelOpt, RocksdbOpt}; +use aptos_config::config::{ + StorageDirPaths, BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, + NO_OP_STORAGE_PRUNER_CONFIG, +}; +use aptos_db::{backup::backup_handler::BackupHandler, AptosDB}; +use aptos_executor_types::ParsedTransactionOutput; +use aptos_logger::{error, info}; +use aptos_storage_interface::{state_view::DbStateViewAtVersion, AptosDbError, DbReader}; +use aptos_types::{ + contract_event::ContractEvent, + transaction::{ + signature_verified_transaction::SignatureVerifiedTransaction, Transaction, TransactionInfo, + Version, + }, + write_set::WriteSet, +}; +use aptos_vm::{AptosVM, VMExecutor}; +use clap::Parser; +use itertools::multizip; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; +use std::{ + path::PathBuf, + process, + sync::{atomic::AtomicU64, Arc}, + time::Instant, +}; +// Replay Verify controller is responsible for providing legit range with start and end versions. +#[derive(Parser)] +pub struct Opt { + #[clap( + long, + help = "The first transaction version required to be replayed and verified" + )] + start_version: Version, + + #[clap( + long, + help = "The last transaction version required to be replayed and verified" + )] + end_version: Version, + + #[clap(flatten)] + replay_concurrency_level: ReplayConcurrencyLevelOpt, + + #[clap(long = "target-db-dir", value_parser)] + pub db_dir: PathBuf, + + #[clap(flatten)] + pub rocksdb_opt: RocksdbOpt, + + #[clap( + long, + default_value = "500", + help = "The number of transactions to be replayed in a chunk" + )] + pub chunk_size: usize, + + #[clap(long, default_value = "1", help = "The number of concurrent replays")] + pub concurrent_replay: usize, +} + +impl Opt { + pub async fn run(self) -> Result<()> { + let verifier = Verifier::new(&self)?; + let all_errors = verifier.run()?; + if !all_errors.is_empty() { + error!("All failed transactions: {:?}", all_errors); + process::exit(2); + } + Ok(()) + } +} +struct ReplayTps { + timer: Instant, + txn_cnt: AtomicU64, +} + +impl ReplayTps { + pub fn new() -> Self { + Self { + timer: Instant::now(), + txn_cnt: AtomicU64::new(0), + } + } + + pub fn update_cnt(&self, cnt: u64) { + self.txn_cnt + .fetch_add(cnt, std::sync::atomic::Ordering::Relaxed); + } + + pub fn print_tps(&self) { + let elapsed = self.timer.elapsed().as_secs_f64(); + let cnt = self.txn_cnt.load(std::sync::atomic::Ordering::Relaxed); + let tps = (cnt as f64) / elapsed; + info!( + "Replayed {} transactions in {} seconds, TPS: {}", + cnt, elapsed, tps + ); + } +} + +struct Verifier { + backup_handler: BackupHandler, + arc_db: Arc, + start: Version, + limit: u64, + replay_concurrency_level: usize, + chunk_size: usize, + concurrent_replay: usize, + replay_stat: ReplayTps, +} + +impl Verifier { + pub fn new(config: &Opt) -> Result { + let aptos_db = AptosDB::open( + StorageDirPaths::from_path(config.db_dir.as_path()), + true, + NO_OP_STORAGE_PRUNER_CONFIG, + config.rocksdb_opt.clone().into(), + false, + BUFFERED_STATE_TARGET_ITEMS, + DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, + None, + )?; + + let backup_handler = aptos_db.get_backup_handler(); + let arc_db = Arc::new(aptos_db) as Arc; + + // calculate a valid start and limit + let (start, limit) = + Self::get_start_and_limit(&arc_db, config.start_version, config.end_version)?; + info!( + start_version = start, + end_version = start + limit, + "Replaying transactions." + ); + Ok(Self { + backup_handler, + arc_db, + start, + limit, + replay_concurrency_level: config.replay_concurrency_level.get(), + chunk_size: config.chunk_size, + concurrent_replay: config.concurrent_replay, + replay_stat: ReplayTps::new(), + }) + } + + // Split the replay to multiple reply tasks running in parallel + pub fn run(self) -> Result> { + AptosVM::set_concurrency_level_once(self.replay_concurrency_level); + let task_size = self.limit / self.concurrent_replay as u64; + let ranges: Vec<(u64, u64)> = (0..self.concurrent_replay) + .map(|i| { + let chunk_start = self.start + (i as u64) * task_size; + let chunk_limit = if i == self.concurrent_replay - 1 { + self.start + self.limit - chunk_start + } else { + task_size + }; + (chunk_start, chunk_limit) + }) + .collect(); + + // Process each range in parallel using `par_iter` + let res = ranges + .par_iter() + .map(|(start, limit)| self.verify(*start, *limit)) + .collect::>>>(); + let mut all_failed_txns = Vec::new(); + for iter in res.into_iter() { + all_failed_txns.extend(iter?); + } + Ok(all_failed_txns) + } + + // Execute the verify one valide range + pub fn verify(&self, start: Version, limit: u64) -> Result> { + let mut total_failed_txns = Vec::new(); + let txn_iter = self + .backup_handler + .get_transaction_iter(start, limit as usize)?; + let mut cur_txns = Vec::new(); + let mut expected_events = Vec::new(); + let mut expected_writesets = Vec::new(); + let mut expected_txn_infos = Vec::new(); + let mut chunk_start_version = start; + for (idx, item) in txn_iter.enumerate() { + let (input_txn, expected_txn_info, expected_event, expected_writeset) = item?; + let is_epoch_ending = ParsedTransactionOutput::parse_reconfig_events(&expected_event) + .next() + .is_some(); + cur_txns.push(input_txn); + expected_txn_infos.push(expected_txn_info); + expected_events.push(expected_event); + expected_writesets.push(expected_writeset); + if is_epoch_ending || cur_txns.len() >= self.chunk_size { + // verify results + let fail_txns = self.execute_and_verify( + chunk_start_version, + &cur_txns, + &expected_txn_infos, + &expected_events, + &expected_writesets, + )?; + // collect failed transactions + total_failed_txns.extend(fail_txns); + self.replay_stat.update_cnt(cur_txns.len() as u64); + self.replay_stat.print_tps(); + + // empty for the new chunk + chunk_start_version = start + (idx as u64) + 1; + cur_txns.clear(); + expected_txn_infos.clear(); + expected_events.clear(); + expected_writesets.clear(); + } + } + // verify results + let fail_txns = self.execute_and_verify( + chunk_start_version, + &cur_txns, + &expected_txn_infos, + &expected_events, + &expected_writesets, + )?; + total_failed_txns.extend(fail_txns); + Ok(total_failed_txns) + } + + /// utility functions + fn get_start_and_limit( + aptos_db: &Arc, + start_version: Version, + end_version: Version, + ) -> Result<(Version, u64)> { + let start_version = std::cmp::max( + aptos_db + .get_first_txn_version()? + .ok_or(AptosDbError::NotFound( + "First txn version is None".to_string(), + ))?, + start_version, + ); + + let end_version = std::cmp::min( + aptos_db + .get_synced_version()? + .ok_or(AptosDbError::NotFound("Synced version is None".to_string()))?, + end_version, + ); + assert!( + start_version <= end_version, + "start_version {} must be less than or equal to end_version{}", + start_version, + end_version + ); + let limit = end_version - start_version; + Ok((start_version, limit)) + } + + fn execute_and_verify( + &self, + start_version: Version, + cur_txns: &[Transaction], + expected_txn_infos: &Vec, + expected_epoch_events: &Vec>, + expected_epoch_writesets: &Vec, + ) -> Result> { + let executed_outputs = AptosVM::execute_block_no_limit( + cur_txns + .iter() + .map(|txn| SignatureVerifiedTransaction::from(txn.clone())) + .collect::>() + .as_slice(), + &self + .arc_db + .state_view_at_version(start_version.checked_sub(1))?, + )?; + + let mut failed_txns = Vec::new(); + let mut version = start_version; + for (idx, (expected_txn_info, expected_events, expected_writeset, executed_output)) in + multizip(( + expected_txn_infos, + expected_epoch_events, + expected_epoch_writesets, + executed_outputs, + )) + .enumerate() + { + version = start_version + idx as Version; + if let Err(err) = executed_output.ensure_match_transaction_info( + version, + expected_txn_info, + Some(expected_writeset), + Some(expected_events), + ) { + failed_txns.push(err); + } + } + + if (version + 1 - start_version) as usize != expected_txn_infos.len() { + bail!( + "processed transaction count {} is not equal to expected transaction count {}", + version + 1 - start_version, + expected_txn_infos.len() + ); + } + Ok(failed_txns) + } +}