Skip to content

Commit

Permalink
save progress
Browse files Browse the repository at this point in the history
  • Loading branch information
areshand committed Oct 22, 2024
1 parent ef1c00d commit 8480235
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 38 deletions.
6 changes: 4 additions & 2 deletions storage/aptosdb/src/state_kv_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,10 @@ impl StateKvDb {
enabled_sharding: true,
};

if let Some(overall_kv_commit_progress) = get_state_kv_commit_progress(&state_kv_db)? {
truncate_state_kv_db_shards(&state_kv_db, overall_kv_commit_progress)?;
if !readonly {
if let Some(overall_kv_commit_progress) = get_state_kv_commit_progress(&state_kv_db)? {
truncate_state_kv_db_shards(&state_kv_db, overall_kv_commit_progress)?;
}
}

Ok(state_kv_db)
Expand Down
16 changes: 9 additions & 7 deletions storage/aptosdb/src/state_merkle_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,13 +599,15 @@ impl StateMerkleDb {
lru_cache,
};

if let Some(overall_state_merkle_commit_progress) =
get_state_merkle_commit_progress(&state_merkle_db)?
{
truncate_state_merkle_db_shards(
&state_merkle_db,
overall_state_merkle_commit_progress,
)?;
if !readonly {
if let Some(overall_state_merkle_commit_progress) =
get_state_merkle_commit_progress(&state_merkle_db)?
{
truncate_state_merkle_db_shards(
&state_merkle_db,
overall_state_merkle_commit_progress,
)?;
}
}

Ok(state_merkle_db)
Expand Down
31 changes: 2 additions & 29 deletions storage/backup/backup-cli/src/backup_types/transaction/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ use crate::{
},
};
use anyhow::{anyhow, ensure, Result};
use aptos_db::{backup::restore_handler::RestoreHandler, AptosDB};
use aptos_db::backup::restore_handler::RestoreHandler;
use aptos_executor::chunk_executor::ChunkExecutor;
use aptos_executor_types::{ChunkExecutorTrait, TransactionReplayer, VerifyExecutionMode};
use aptos_logger::prelude::*;
use aptos_metrics_core::TimerHelper;
use aptos_storage_interface::{cached_state_view::{CachedDbStateView, CachedStateView}, DbReaderWriter};
use aptos_storage_interface::DbReaderWriter;
use aptos_types::{
contract_event::ContractEvent,
ledger_info::LedgerInfoWithSignatures,
Expand Down Expand Up @@ -712,30 +712,3 @@ impl TransactionRestoreBatchController {
Ok(())
}
}

#[test]
fn test_replay_with_no_commit() {
use aptos_storage_interface::DbReader;
use aptos_executor::components::chunk_output::ChunkOutput;
let db_root_path = "/tmp/aptosdb_test";
let aptos_db = AptosDB::new_readonly_for_test(db_root_path);
let start_version = 1000;
let limit = 1000;
// fetch transactions from aptos_db
let txn_info_iter = aptos_db.get_transaction_info_iterator(start_version, limit).unwrap();
let txn_iter = aptos_db.get_transaction_iterator(start_version, limit).unwrap();
// create batch of 100 transactions
let state_view = CachedStateView::new_impl(id, snapshot, speculative_state, proof_fetcher)
for chunk in izip!(txn_info_iter, txn_iter).chunks(100) {
let (txn_info_batch, txn_batch) = chunk.map(|(txn_info, txn)| (txn_info.unwrap(), txn.unwrap())).unzip();
let chunk_output = ChunkOutput::by_transaction_execution::<V>(
txn_batch,
state_view,
BlockExecutorConfigFromOnchain::new_no_block_limit(),
)?;
}

// execute transaction against the aptos db


}
4 changes: 4 additions & 0 deletions storage/db-tool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod backup;
mod backup_maintenance;
mod bootstrap;
mod gen_replay_verify_jobs;
mod replay_on_archive;
mod replay_verify;
pub mod restore;
#[cfg(test)]
Expand Down Expand Up @@ -38,6 +39,8 @@ pub enum DBTool {

#[clap(subcommand)]
Restore(restore::Command),

ReplayOnArchive(replay_on_archive::Opt),
}

impl DBTool {
Expand All @@ -54,6 +57,7 @@ impl DBTool {
},
DBTool::GenReplayVerifyJobs(cmd) => cmd.run().await,
DBTool::Restore(cmd) => cmd.run().await,
DBTool::ReplayOnArchive(cmd) => cmd.run().await,
}
}
}
Expand Down
227 changes: 227 additions & 0 deletions storage/db-tool/src/replay_on_archive.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// Copyright © Aptos Foundation
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::{bail, Error, Result};
use aptos_backup_cli::utils::{ReplayConcurrencyLevelOpt, RocksdbOpt};
use aptos_config::config::{
StorageDirPaths, BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD,
NO_OP_STORAGE_PRUNER_CONFIG,
};
use aptos_db::AptosDB;
use aptos_executor_types::ParsedTransactionOutput;
use aptos_logger::{debug, info};
use aptos_storage_interface::{state_view::DbStateViewAtVersion, AptosDbError, DbReader};
use aptos_types::{
contract_event::ContractEvent,
transaction::{
signature_verified_transaction::SignatureVerifiedTransaction, TransactionInfo,
TransactionOutput, Version,
},
write_set::WriteSet,
};
use aptos_vm::{AptosVM, VMExecutor};
use clap::Parser;
use itertools::multizip;
use std::{path::PathBuf, process, sync::Arc, time::Instant};

// Replay Verify controller is responsible for providing legit range with start and end versions.
#[derive(Parser)]
pub struct Opt {
#[clap(
long,
help = "The first transaction version required to be replayed and verified"
)]
start_version: Version,
#[clap(
long,
help = "The last transaction version required to be replayed and verified"
)]
end_version: Version,
#[clap(flatten)]
replay_concurrency_level: ReplayConcurrencyLevelOpt,
#[clap(long = "target-db-dir", value_parser)]
pub db_dir: PathBuf,
#[clap(flatten)]
pub rocksdb_opt: RocksdbOpt,
#[clap(
long,
default_value = "2000",
help = "The number of transactions to be replayed in a chunk"
)]
pub chunk_size: usize,
}

impl Opt {
pub async fn run(self) -> Result<()> {
AptosVM::set_concurrency_level_once(self.replay_concurrency_level.get());
let replay_start: Instant = Instant::now();
let aptos_db = AptosDB::open(
StorageDirPaths::from_path(self.db_dir.as_path()),
true,
NO_OP_STORAGE_PRUNER_CONFIG,
self.rocksdb_opt.into(),
false,
BUFFERED_STATE_TARGET_ITEMS,
DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD,
None,
)?;

let backup_handler = aptos_db.get_backup_handler();
let arc_db = Arc::new(aptos_db) as Arc<dyn DbReader>;
let (start, limit) =
Self::get_start_and_limit(&arc_db, self.start_version, self.end_version)?;
info!(
start_version = start,
end_version = start + limit,
"Replaying transactions."
);
let mut failed_txns = Vec::new();
let txn_iter = backup_handler.get_transaction_iter(start, limit as usize)?;
let mut cur_txns = Vec::new();
let mut expected_events = Vec::new();
let mut expected_writesets = Vec::new();
let mut expected_txn_infos = Vec::new();
let mut chunk_start_version = start;
for (idx, item) in txn_iter.enumerate() {
let (input_txn, expected_txn_info, expected_event, expected_writeset) = item?;
let is_epoch_ending = ParsedTransactionOutput::parse_reconfig_events(&expected_event)
.next()
.is_some();
cur_txns.push(input_txn);
expected_txn_infos.push(expected_txn_info);
expected_events.push(expected_event);
expected_writesets.push(expected_writeset);
if is_epoch_ending || cur_txns.len() >= self.chunk_size {
let executed_outputs = AptosVM::execute_block_no_limit(
cur_txns
.iter()
.map(|txn| SignatureVerifiedTransaction::from(txn.clone()))
.collect::<Vec<_>>()
.as_slice(),
&arc_db.state_view_at_version(chunk_start_version.checked_sub(1))?,
)?;
// verify results
let fail_txns = Self::verify_execution_results(
chunk_start_version,
&expected_txn_infos,
&expected_events,
&expected_writesets,
&executed_outputs,
)?;
// collect failed transactions
failed_txns.extend(fail_txns);

// empty for the new chunk
chunk_start_version = start + (idx as u64) + 1;
cur_txns.clear();
expected_txn_infos.clear();
expected_events.clear();
expected_writesets.clear();
info!(
version = start + idx as u64,
accumulative_tps = ((idx as f64) / replay_start.elapsed().as_secs_f64()) as u64,
"Transactions verified."
);
}
}
// Replay the remaining txns
let executed_outputs = AptosVM::execute_block_no_limit(
cur_txns
.iter()
.map(|txn| SignatureVerifiedTransaction::from(txn.clone()))
.collect::<Vec<_>>()
.as_slice(),
&arc_db.state_view_at_version(Some(chunk_start_version - 1))?,
)?;
let fail_txns = Self::verify_execution_results(
chunk_start_version,
&expected_txn_infos,
&expected_events,
&expected_writesets,
&executed_outputs,
)?;
info!(
version = start + limit,
accumulative_tps = ((limit as f64) / replay_start.elapsed().as_secs_f64()) as u64,
"Transactions verified."
);

failed_txns.extend(fail_txns);

if !failed_txns.is_empty() {
debug!("Failed transactions: {:?}", failed_txns);
process::exit(2);
}
Ok(())
}

fn verify_execution_results(
start_version: Version,
expected_txn_infos: &Vec<TransactionInfo>,
expected_epoch_events: &Vec<Vec<ContractEvent>>,
expected_epoch_writesets: &Vec<WriteSet>,
executed_outputs: &Vec<TransactionOutput>,
) -> Result<Vec<Error>> {
let mut failed_txns = Vec::new();
let mut version = start_version;
for (idx, (expected_txn_info, expected_events, expected_writeset, executed_output)) in
multizip((
expected_txn_infos,
expected_epoch_events,
expected_epoch_writesets,
executed_outputs,
))
.enumerate()
{
version = start_version + idx as Version;
if let Err(err) = executed_output.ensure_match_transaction_info(
version,
expected_txn_info,
Some(expected_writeset),
Some(expected_events),
) {
failed_txns.push(err);
}
}

if (version + 1 - start_version) as usize != expected_txn_infos.len() {
bail!(
"processed transaction count {} is not equal to expected transaction count {}",
version + 1 - start_version,
expected_txn_infos.len()
);
}
Ok(failed_txns)
}

fn get_start_and_limit(
aptos_db: &Arc<dyn DbReader>,
start_version: Version,
end_version: Version,
) -> Result<(Version, Version)> {
let start_version = std::cmp::max(
aptos_db
.get_first_txn_version()?
.ok_or(AptosDbError::NotFound(
"First txn version is None".to_string(),
))?,
start_version,
);

let end_version = std::cmp::min(
aptos_db
.get_synced_version()?
.ok_or(AptosDbError::NotFound("Synced version is None".to_string()))?,
end_version,
);
assert!(
start_version <= end_version,
"start_version {} must be less than or equal to end_version{}",
start_version,
end_version
);
let limit = end_version - start_version;
Ok((start_version, limit))
}
}

0 comments on commit 8480235

Please sign in to comment.