From cb6d4aecbdd10185b43e55ce23afc931624ae652 Mon Sep 17 00:00:00 2001 From: Bo Wu Date: Sun, 29 Sep 2024 11:11:59 -0700 Subject: [PATCH] add validation for internal indexer data --- .../src/internal_indexer_db_service.rs | 4 +- storage/aptosdb/src/db_debugger/mod.rs | 5 + storage/aptosdb/src/db_debugger/validation.rs | 293 ++++++++++++++++++ storage/indexer_data_validation/Cargo.toml | 19 ++ storage/indexer_data_validation/main.rs | 44 +++ 5 files changed, 364 insertions(+), 1 deletion(-) create mode 100644 storage/aptosdb/src/db_debugger/validation.rs create mode 100644 storage/indexer_data_validation/Cargo.toml create mode 100644 storage/indexer_data_validation/main.rs diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs index 1806cf6231e7ca..d6310a15fad0c1 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs @@ -17,7 +17,8 @@ use std::{ sync::Arc, time::Duration, }; -use tokio::{runtime::Handle, sync::watch::Receiver as WatchReceiver}; +use aptos_logger::error; +use tokio::{runtime::Handle, sync::{broadcast::error, watch::Receiver as WatchReceiver}}; const SERVICE_TYPE: &str = "internal_indexer_db_service"; const INTERNAL_INDEXER_DB: &str = "internal_indexer_db"; @@ -146,6 +147,7 @@ impl InternalIndexerDBService { loop { let start_time: std::time::Instant = std::time::Instant::now(); let next_version = self.db_indexer.process_a_batch(start_version)?; + error!("bowu start_version {}, next_version: {}",start_version, next_version); if next_version == start_version { if let Ok(recv_res) = diff --git a/storage/aptosdb/src/db_debugger/mod.rs b/storage/aptosdb/src/db_debugger/mod.rs index 5fb1a67276c053..6b1be2a4ed7134 100644 --- a/storage/aptosdb/src/db_debugger/mod.rs +++ b/storage/aptosdb/src/db_debugger/mod.rs @@ -8,6 +8,7 @@ pub mod ledger; pub mod state_kv; pub mod state_tree; pub mod truncate; +pub mod validation; use aptos_storage_interface::Result; use clap::Parser; @@ -35,6 +36,9 @@ pub enum Cmd { #[clap(subcommand)] Examine(examine::Cmd), + + #[clap(subcommand)] + IndexerValidation(validation::Cmd), } impl Cmd { @@ -46,6 +50,7 @@ impl Cmd { Cmd::Ledger(cmd) => cmd.run(), Cmd::Truncate(cmd) => cmd.run(), Cmd::Examine(cmd) => cmd.run(), + Cmd::IndexerValidation(cmd) => cmd.run(), } } } diff --git a/storage/aptosdb/src/db_debugger/validation.rs b/storage/aptosdb/src/db_debugger/validation.rs new file mode 100644 index 00000000000000..e1ae6ab1cb5c93 --- /dev/null +++ b/storage/aptosdb/src/db_debugger/validation.rs @@ -0,0 +1,293 @@ +use crate::{ + schema::state_value_by_key_hash::StateValueByKeyHashSchema, state_kv_db::StateKvDb, AptosDB, +}; +use aptos_storage_interface::Result; +use aptos_config::config::{RocksdbConfig, StorageDirPaths}; +use aptos_crypto::{hash::CryptoHash, HashValue}; +use aptos_db_indexer::db_ops::open_internal_indexer_db; +use aptos_db_indexer_schemas::schema::{ + event_by_key::EventByKeySchema, event_by_version::EventByVersionSchema, + state_keys::StateKeysSchema, transaction_by_account::TransactionByAccountSchema, +}; +use aptos_schemadb::{ReadOptions, DB}; +use aptos_storage_interface::DbReader; +use aptos_types::{ + contract_event::ContractEvent, + event::EventKey, + transaction::{Transaction::UserTransaction, TransactionListWithProof}, +}; +use rayon::{ + iter::{IntoParallelIterator, ParallelIterator}, + ThreadPoolBuilder, +}; +use std::{cmp, collections::HashSet, path::Path}; +const SAMPLE_RATE: usize = 500_000; +use clap::Parser; + +#[derive(Parser, Debug)] +pub struct ValidationArgs { + #[clap(short, long)] + pub db_root_path: String, + + #[clap(short, long)] + pub internal_indexer_db_path: String, + + #[clap(short, long)] + pub target_version: u64, +} +#[derive(clap::Subcommand)] +pub enum Cmd { + ValidateIndexerDB(ValidationArgs), +} + +impl Cmd { + pub fn run(&self) -> Result<()> { + match self { + Cmd::ValidateIndexerDB(args) => { + validate_db_data(Path::new(args.db_root_path.as_str()), Path::new(&args.internal_indexer_db_path.as_str()), args.target_version) + }, + } + } +} + +pub fn validate_db_data( + db_root_path: &Path, + internal_indexer_db_path: &Path, + mut target_ledger_version: u64, +) -> Result<()> { + let num_threads = 30; + ThreadPoolBuilder::new() + .num_threads(num_threads) + .build_global() + .unwrap(); + let internal_db = + open_internal_indexer_db(internal_indexer_db_path, &RocksdbConfig::default())?; + + verify_state_kvs(db_root_path, &internal_db, target_ledger_version)?; + + let aptos_db = AptosDB::new_for_test_with_sharding(db_root_path, 1000000); + let batch_size = 20_000; + let start_version = aptos_db.get_first_txn_version()?.unwrap(); + target_ledger_version = std::cmp::min( + aptos_db.get_synced_version()?.unwrap(), + target_ledger_version, + ); + assert!(start_version < target_ledger_version, "{}, {}", start_version, target_ledger_version); + println!( + "Validating events and transactions {}, {}", + start_version, target_ledger_version + ); + + // Calculate ranges and split into chunks + let ranges: Vec<(u64, u64)> = (start_version..target_ledger_version) + .step_by(batch_size as usize) + .map(|start| { + let end = cmp::min(start + batch_size, target_ledger_version); + (start, end) + }) + .collect(); + + // Process each chunk in parallel + ranges.into_par_iter().for_each(|(start, end)| { + let num_of_txns = end - start; + println!("Validating transactions from {} to {}", start, end); + let txns = aptos_db + .get_transactions(start, num_of_txns, target_ledger_version, true) + .unwrap(); + verify_batch_txn_events(&txns, &internal_db, start) + .unwrap_or_else(|_| panic!("{}, {} failed to verify", start, end)); + assert_eq!(txns.transactions.len() as u64, num_of_txns); + }); + + Ok(()) +} + +pub fn verify_state_kvs(db_root_path: &Path, internal_db: &DB, target_ledger_version: u64) -> Result<()> { + println!("Validating db statekeys"); + let storage_dir = StorageDirPaths::from_path(db_root_path); + let state_kv_db = StateKvDb::open(&storage_dir, RocksdbConfig::default(), true, true)?; + + //read all statekeys from internal db and store them in mem + let mut all_internal_keys = HashSet::new(); + let mut iter = internal_db.iter::()?; + iter.seek_to_first(); + for (key_ind, state_key_res) in iter.enumerate() { + let state_key = state_key_res?.0; + let state_key_hash = state_key.hash(); + all_internal_keys.insert(state_key_hash); + if key_ind % 10_000_000 == 0 { + println!("Processed {} keys", key_ind); + } + } + println!( + "Number of state keys in internal db: {}", + all_internal_keys.len() + ); + for shard_id in 0..16 { + let shard = state_kv_db.db_shard(shard_id); + println!("Validating state_kv for shard {}", shard_id); + verify_state_kv(shard, &all_internal_keys, target_ledger_version)?; + } + Ok(()) +} + +pub fn verify_batch_txn_events( + txns: &TransactionListWithProof, + internal_db: &DB, + start_version: u64, +) -> Result<()> { + verify_transactions(txns, internal_db, start_version)?; + verify_events(txns, internal_db, start_version) +} + +fn verify_state_kv(shard: &DB, all_internal_keys: &HashSet, target_ledger_version: u64) -> Result<()> { + let read_opts = ReadOptions::default(); + let mut iter = shard.iter_with_opts::(read_opts)?; + // print a message every 10k keys + let mut counter = 0; + iter.seek_to_first(); + let mut missing_keys = 0; + for value in iter { + let (state_key_hash, version) = value?.0; + if version > target_ledger_version { + continue; + } + // check if the state key hash is present in the internal db + if !all_internal_keys.contains(&state_key_hash) { + missing_keys += 1; + println!( + "State key hash not found in internal db: {:?}, version: {}", + state_key_hash, version + ); + } + counter += 1; + if counter as usize % SAMPLE_RATE == 0 { + println!( + "Processed {} keys, the current sample is {} at version {}", + counter, state_key_hash, version + ); + } + } + println!("Number of missing keys: {}", missing_keys); + Ok(()) +} + +fn verify_transactions( + transaction_list: &TransactionListWithProof, + internal_indexer_db: &DB, + start_version: u64, +) -> Result<()> { + for (idx, txn) in transaction_list.transactions.iter().enumerate() { + match txn { + UserTransaction(signed_transaction) => { + let key = ( + signed_transaction.sender(), + signed_transaction.sequence_number(), + ); + match internal_indexer_db.get::(&key)? { + Some(version) => { + assert_eq!(version, start_version + idx as u64); + if idx + start_version as usize % SAMPLE_RATE == 0 { + println!("Processed {} at {:?}", idx + start_version as usize, key); + } + }, + None => { + panic!("Transaction not found in internal indexer db: {:?}", key); + }, + } + }, + _ => continue, + } + } + Ok(()) +} + +fn verify_event_by_key( + event_key: &EventKey, + seq_num: u64, + internal_indexer_db: &DB, + expected_idx: usize, + expected_version: u64, +) -> Result<()> { + match internal_indexer_db.get::(&(*event_key, seq_num)) { + Ok(None) => { + panic!("Event not found in internal indexer db: {:?}", event_key); + }, + Err(e) => { + panic!("Error while fetching event: {:?}", e); + }, + Ok(Some((version, idx))) => { + assert!(idx as usize == expected_idx && version == expected_version); + if version as usize % SAMPLE_RATE == 0 { + println!( + "Processed {} at {:?}, {:?}", + version, event_key, expected_idx + ); + } + }, + } + Ok(()) +} + +fn verify_event_by_version( + event_key: &EventKey, + seq_num: u64, + internal_indexer_db: &DB, + version: u64, + expected_idx: usize, +) -> Result<()> { + match internal_indexer_db.get::(&(*event_key, version, seq_num)) { + Ok(None) => { + panic!("Event not found in internal indexer db: {:?}", event_key); + }, + Err(e) => { + panic!("Error while fetching event: {:?}", e); + }, + Ok(Some(idx)) => { + assert!(idx as usize == expected_idx); + }, + } + Ok(()) +} + +fn verify_events( + transaction_list: &TransactionListWithProof, + internal_indexer_db: &DB, + start_version: u64, +) -> Result<()> { + let mut version = start_version; + match &transaction_list.events { + None => { + return Ok(()); + }, + Some(event_vec) => { + for events in event_vec { + for (idx, event) in events.iter().enumerate() { + match event { + ContractEvent::V1(event) => { + let seq_num = event.sequence_number(); + let event_key = event.key(); + verify_event_by_version( + event_key, + seq_num, + internal_indexer_db, + version, + idx, + )?; + verify_event_by_key( + event_key, + seq_num, + internal_indexer_db, + idx, + version, + )?; + }, + _ => continue, + } + } + version += 1; + } + }, + } + Ok(()) +} diff --git a/storage/indexer_data_validation/Cargo.toml b/storage/indexer_data_validation/Cargo.toml new file mode 100644 index 00000000000000..090e5a30bd950a --- /dev/null +++ b/storage/indexer_data_validation/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "aptos-db-indexer-data-validation" +description = "Aptos database" +version = "0.1.0" + +# Workspace inherited keys +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +repository = { workspace = true } +rust-version = { workspace = true } + +[dependencies] +aptos-temppath = { workspace = true } +clap = { workspace = true } +anyhow = { workspace = true} +aptos-db = { workspace = true } diff --git a/storage/indexer_data_validation/main.rs b/storage/indexer_data_validation/main.rs new file mode 100644 index 00000000000000..2bf5d5a24c322b --- /dev/null +++ b/storage/indexer_data_validation/main.rs @@ -0,0 +1,44 @@ +use anyhow::Result; +use aptos_db::db_debugger::validation::validate_db_data; +use clap::{Arg, Command}; +use std::path::Path; + +pub fn main() -> Result<()> { + let matches = Command::new("db_validation") + .arg( + Arg::new("db_root_path") + .short('d') + .long("db-root") + .value_parser(clap::value_parser!(String)) + .required(true), + ) + .arg( + Arg::new("internal_indexer_db_path") + .short('i') + .long("internal-indexer-db") + .value_parser(clap::value_parser!(String)) + .required(true), + ) + .arg( + Arg::new("target_version") + .short('t') + .long("target-version") + .value_parser(clap::value_parser!(u64)) + .required(true), + ) + .get_matches(); + + let db_root_path = matches.get_one::("db_root_path").unwrap(); + let internal_indexer_db_path = matches + .get_one::("internal_indexer_db_path") + .unwrap(); + + let target_version = *matches.get_one::("target_version").unwrap(); + validate_db_data( + Path::new(db_root_path), + Path::new(internal_indexer_db_path), + target_version, + )?; + + Ok(()) +}