Skip to content

Commit

Permalink
add validation for internal indexer data
Browse files Browse the repository at this point in the history
  • Loading branch information
areshand committed Oct 16, 2024
1 parent 4bed99d commit cb6d4ae
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use std::{
sync::Arc,
time::Duration,
};
use tokio::{runtime::Handle, sync::watch::Receiver as WatchReceiver};
use aptos_logger::error;
use tokio::{runtime::Handle, sync::{broadcast::error, watch::Receiver as WatchReceiver}};

const SERVICE_TYPE: &str = "internal_indexer_db_service";
const INTERNAL_INDEXER_DB: &str = "internal_indexer_db";
Expand Down Expand Up @@ -146,6 +147,7 @@ impl InternalIndexerDBService {
loop {
let start_time: std::time::Instant = std::time::Instant::now();
let next_version = self.db_indexer.process_a_batch(start_version)?;
error!("bowu start_version {}, next_version: {}",start_version, next_version);

if next_version == start_version {
if let Ok(recv_res) =
Expand Down
5 changes: 5 additions & 0 deletions storage/aptosdb/src/db_debugger/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod ledger;
pub mod state_kv;
pub mod state_tree;
pub mod truncate;
pub mod validation;

use aptos_storage_interface::Result;
use clap::Parser;
Expand Down Expand Up @@ -35,6 +36,9 @@ pub enum Cmd {

#[clap(subcommand)]
Examine(examine::Cmd),

#[clap(subcommand)]
IndexerValidation(validation::Cmd),
}

impl Cmd {
Expand All @@ -46,6 +50,7 @@ impl Cmd {
Cmd::Ledger(cmd) => cmd.run(),
Cmd::Truncate(cmd) => cmd.run(),
Cmd::Examine(cmd) => cmd.run(),
Cmd::IndexerValidation(cmd) => cmd.run(),
}
}
}
Expand Down
293 changes: 293 additions & 0 deletions storage/aptosdb/src/db_debugger/validation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
use crate::{
schema::state_value_by_key_hash::StateValueByKeyHashSchema, state_kv_db::StateKvDb, AptosDB,
};
use aptos_storage_interface::Result;
use aptos_config::config::{RocksdbConfig, StorageDirPaths};
use aptos_crypto::{hash::CryptoHash, HashValue};
use aptos_db_indexer::db_ops::open_internal_indexer_db;
use aptos_db_indexer_schemas::schema::{
event_by_key::EventByKeySchema, event_by_version::EventByVersionSchema,
state_keys::StateKeysSchema, transaction_by_account::TransactionByAccountSchema,
};
use aptos_schemadb::{ReadOptions, DB};
use aptos_storage_interface::DbReader;
use aptos_types::{
contract_event::ContractEvent,
event::EventKey,
transaction::{Transaction::UserTransaction, TransactionListWithProof},
};
use rayon::{
iter::{IntoParallelIterator, ParallelIterator},
ThreadPoolBuilder,
};
use std::{cmp, collections::HashSet, path::Path};
const SAMPLE_RATE: usize = 500_000;
use clap::Parser;

#[derive(Parser, Debug)]
pub struct ValidationArgs {
#[clap(short, long)]
pub db_root_path: String,

#[clap(short, long)]
pub internal_indexer_db_path: String,

#[clap(short, long)]
pub target_version: u64,
}
#[derive(clap::Subcommand)]
pub enum Cmd {
ValidateIndexerDB(ValidationArgs),
}

impl Cmd {
pub fn run(&self) -> Result<()> {
match self {
Cmd::ValidateIndexerDB(args) => {
validate_db_data(Path::new(args.db_root_path.as_str()), Path::new(&args.internal_indexer_db_path.as_str()), args.target_version)
},
}
}
}

pub fn validate_db_data(
db_root_path: &Path,
internal_indexer_db_path: &Path,
mut target_ledger_version: u64,
) -> Result<()> {
let num_threads = 30;
ThreadPoolBuilder::new()
.num_threads(num_threads)
.build_global()
.unwrap();
let internal_db =
open_internal_indexer_db(internal_indexer_db_path, &RocksdbConfig::default())?;

verify_state_kvs(db_root_path, &internal_db, target_ledger_version)?;

let aptos_db = AptosDB::new_for_test_with_sharding(db_root_path, 1000000);
let batch_size = 20_000;
let start_version = aptos_db.get_first_txn_version()?.unwrap();
target_ledger_version = std::cmp::min(
aptos_db.get_synced_version()?.unwrap(),
target_ledger_version,
);
assert!(start_version < target_ledger_version, "{}, {}", start_version, target_ledger_version);
println!(
"Validating events and transactions {}, {}",
start_version, target_ledger_version
);

// Calculate ranges and split into chunks
let ranges: Vec<(u64, u64)> = (start_version..target_ledger_version)
.step_by(batch_size as usize)
.map(|start| {
let end = cmp::min(start + batch_size, target_ledger_version);
(start, end)
})
.collect();

// Process each chunk in parallel
ranges.into_par_iter().for_each(|(start, end)| {
let num_of_txns = end - start;
println!("Validating transactions from {} to {}", start, end);
let txns = aptos_db
.get_transactions(start, num_of_txns, target_ledger_version, true)
.unwrap();
verify_batch_txn_events(&txns, &internal_db, start)
.unwrap_or_else(|_| panic!("{}, {} failed to verify", start, end));
assert_eq!(txns.transactions.len() as u64, num_of_txns);
});

Ok(())
}

pub fn verify_state_kvs(db_root_path: &Path, internal_db: &DB, target_ledger_version: u64) -> Result<()> {
println!("Validating db statekeys");
let storage_dir = StorageDirPaths::from_path(db_root_path);
let state_kv_db = StateKvDb::open(&storage_dir, RocksdbConfig::default(), true, true)?;

//read all statekeys from internal db and store them in mem
let mut all_internal_keys = HashSet::new();
let mut iter = internal_db.iter::<StateKeysSchema>()?;
iter.seek_to_first();
for (key_ind, state_key_res) in iter.enumerate() {
let state_key = state_key_res?.0;
let state_key_hash = state_key.hash();
all_internal_keys.insert(state_key_hash);
if key_ind % 10_000_000 == 0 {
println!("Processed {} keys", key_ind);
}
}
println!(
"Number of state keys in internal db: {}",
all_internal_keys.len()
);
for shard_id in 0..16 {
let shard = state_kv_db.db_shard(shard_id);
println!("Validating state_kv for shard {}", shard_id);
verify_state_kv(shard, &all_internal_keys, target_ledger_version)?;
}
Ok(())
}

pub fn verify_batch_txn_events(
txns: &TransactionListWithProof,
internal_db: &DB,
start_version: u64,
) -> Result<()> {
verify_transactions(txns, internal_db, start_version)?;
verify_events(txns, internal_db, start_version)
}

fn verify_state_kv(shard: &DB, all_internal_keys: &HashSet<HashValue>, target_ledger_version: u64) -> Result<()> {
let read_opts = ReadOptions::default();
let mut iter = shard.iter_with_opts::<StateValueByKeyHashSchema>(read_opts)?;
// print a message every 10k keys
let mut counter = 0;
iter.seek_to_first();
let mut missing_keys = 0;
for value in iter {
let (state_key_hash, version) = value?.0;
if version > target_ledger_version {
continue;
}
// check if the state key hash is present in the internal db
if !all_internal_keys.contains(&state_key_hash) {
missing_keys += 1;
println!(
"State key hash not found in internal db: {:?}, version: {}",
state_key_hash, version
);
}
counter += 1;
if counter as usize % SAMPLE_RATE == 0 {
println!(
"Processed {} keys, the current sample is {} at version {}",
counter, state_key_hash, version
);
}
}
println!("Number of missing keys: {}", missing_keys);
Ok(())
}

fn verify_transactions(
transaction_list: &TransactionListWithProof,
internal_indexer_db: &DB,
start_version: u64,
) -> Result<()> {
for (idx, txn) in transaction_list.transactions.iter().enumerate() {
match txn {
UserTransaction(signed_transaction) => {
let key = (
signed_transaction.sender(),
signed_transaction.sequence_number(),
);
match internal_indexer_db.get::<TransactionByAccountSchema>(&key)? {
Some(version) => {
assert_eq!(version, start_version + idx as u64);
if idx + start_version as usize % SAMPLE_RATE == 0 {
println!("Processed {} at {:?}", idx + start_version as usize, key);
}
},
None => {
panic!("Transaction not found in internal indexer db: {:?}", key);
},
}
},
_ => continue,
}
}
Ok(())
}

fn verify_event_by_key(
event_key: &EventKey,
seq_num: u64,
internal_indexer_db: &DB,
expected_idx: usize,
expected_version: u64,
) -> Result<()> {
match internal_indexer_db.get::<EventByKeySchema>(&(*event_key, seq_num)) {
Ok(None) => {
panic!("Event not found in internal indexer db: {:?}", event_key);
},
Err(e) => {
panic!("Error while fetching event: {:?}", e);
},
Ok(Some((version, idx))) => {
assert!(idx as usize == expected_idx && version == expected_version);
if version as usize % SAMPLE_RATE == 0 {
println!(
"Processed {} at {:?}, {:?}",
version, event_key, expected_idx
);
}
},
}
Ok(())
}

fn verify_event_by_version(
event_key: &EventKey,
seq_num: u64,
internal_indexer_db: &DB,
version: u64,
expected_idx: usize,
) -> Result<()> {
match internal_indexer_db.get::<EventByVersionSchema>(&(*event_key, version, seq_num)) {
Ok(None) => {
panic!("Event not found in internal indexer db: {:?}", event_key);
},
Err(e) => {
panic!("Error while fetching event: {:?}", e);
},
Ok(Some(idx)) => {
assert!(idx as usize == expected_idx);
},
}
Ok(())
}

fn verify_events(
transaction_list: &TransactionListWithProof,
internal_indexer_db: &DB,
start_version: u64,
) -> Result<()> {
let mut version = start_version;
match &transaction_list.events {
None => {
return Ok(());
},
Some(event_vec) => {
for events in event_vec {
for (idx, event) in events.iter().enumerate() {
match event {
ContractEvent::V1(event) => {
let seq_num = event.sequence_number();
let event_key = event.key();
verify_event_by_version(
event_key,
seq_num,
internal_indexer_db,
version,
idx,
)?;
verify_event_by_key(
event_key,
seq_num,
internal_indexer_db,
idx,
version,
)?;
},
_ => continue,
}
}
version += 1;
}
},
}
Ok(())
}
19 changes: 19 additions & 0 deletions storage/indexer_data_validation/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "aptos-db-indexer-data-validation"
description = "Aptos database"
version = "0.1.0"

# Workspace inherited keys
authors = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
publish = { workspace = true }
repository = { workspace = true }
rust-version = { workspace = true }

[dependencies]
aptos-temppath = { workspace = true }
clap = { workspace = true }
anyhow = { workspace = true}
aptos-db = { workspace = true }
Loading

0 comments on commit cb6d4ae

Please sign in to comment.