diff --git a/storage/aptosdb/src/db_debugger/truncate/mod.rs b/storage/aptosdb/src/db_debugger/truncate/mod.rs index 6d30148842e0d..379ad399a8564 100644 --- a/storage/aptosdb/src/db_debugger/truncate/mod.rs +++ b/storage/aptosdb/src/db_debugger/truncate/mod.rs @@ -12,8 +12,9 @@ use crate::{ state_merkle_db::StateMerkleDb, state_store::StateStore, utils::truncation_helper::{ - find_closest_node_version_at_or_before, get_current_version_in_state_merkle_db, - get_state_kv_commit_progress, truncate_state_merkle_db, + find_closest_node_version_at_or_before, find_tree_root_at_or_before, + get_current_version_in_state_merkle_db, get_state_kv_commit_progress, + truncate_state_merkle_db, }, }; use aptos_config::config::{RocksdbConfigs, StorageDirPaths}; @@ -127,30 +128,7 @@ impl Cmd { .0; } - // TODO(grao): We are using a brute force implementation for now. We might be able to make - // it faster, since our data is append only. - if target_version < state_merkle_db_version { - let state_merkle_target_version = Self::find_tree_root_at_or_before( - &ledger_db.metadata_db_arc(), - &state_merkle_db, - target_version, - )? - .unwrap_or_else(|| { - panic!( - "Could not find a valid root before or at version {}, maybe it was pruned?", - target_version - ) - }); - - println!( - "Starting state merkle db truncation... target_version: {}", - state_merkle_target_version - ); - truncate_state_merkle_db(&state_merkle_db, state_merkle_target_version)?; - println!("Done!"); - } - - println!("Starting ledger db and state kv db truncation..."); + println!("Starting db truncation..."); let batch = SchemaBatch::new(); batch.put::( &DbMetadataKey::OverallCommitProgress, @@ -161,6 +139,7 @@ impl Cmd { StateStore::sync_commit_progress( Arc::clone(&ledger_db), Arc::clone(&state_kv_db), + Arc::clone(&state_merkle_db), /*crash_if_difference_is_too_large=*/ false, ); println!("Done!"); @@ -183,40 +162,6 @@ impl Cmd { Ok(()) } - - fn find_tree_root_at_or_before( - ledger_metadata_db: &DB, - state_merkle_db: &StateMerkleDb, - version: Version, - ) -> Result> { - match find_closest_node_version_at_or_before(state_merkle_db, version)? { - Some(closest_version) => { - if Self::root_exists_at_version(state_merkle_db, closest_version)? { - return Ok(Some(closest_version)); - } - let mut iter = ledger_metadata_db.iter::()?; - iter.seek_for_prev(&version)?; - match iter.next().transpose()? { - Some((closest_epoch_version, _)) => { - if Self::root_exists_at_version(state_merkle_db, closest_epoch_version)? { - Ok(Some(closest_epoch_version)) - } else { - Ok(None) - } - }, - None => Ok(None), - } - }, - None => Ok(None), - } - } - - fn root_exists_at_version(state_merkle_db: &StateMerkleDb, version: Version) -> Result { - Ok(state_merkle_db - .metadata_db() - .get::(&NodeKey::new_empty_path(version))? - .is_some()) - } } #[cfg(test)] diff --git a/storage/aptosdb/src/state_store/mod.rs b/storage/aptosdb/src/state_store/mod.rs index 4288fcc8bebb6..422630b2cf8b8 100644 --- a/storage/aptosdb/src/state_store/mod.rs +++ b/storage/aptosdb/src/state_store/mod.rs @@ -26,7 +26,10 @@ use crate::{ utils::{ iterators::PrefixedStateValueIterator, new_sharded_kv_schema_batch, - truncation_helper::{truncate_ledger_db, truncate_state_kv_db}, + truncation_helper::{ + find_tree_root_at_or_before, get_max_version_in_state_merkle_db, truncate_ledger_db, + truncate_state_kv_db, truncate_state_merkle_db, + }, ShardedStateKvSchemaBatch, }, }; @@ -297,6 +300,7 @@ impl StateStore { Self::sync_commit_progress( Arc::clone(&ledger_db), Arc::clone(&state_kv_db), + Arc::clone(&state_merkle_db), /*crash_if_difference_is_too_large=*/ true, ); } @@ -339,6 +343,7 @@ impl StateStore { pub fn sync_commit_progress( ledger_db: Arc, state_kv_db: Arc, + state_merkle_db: Arc, crash_if_difference_is_too_large: bool, ) { let ledger_metadata_db = ledger_db.metadata_db(); @@ -393,6 +398,36 @@ impl StateStore { std::cmp::max(difference as usize, 1), /* batch_size */ ) .expect("Failed to truncate state K/V db."); + + let state_merkle_max_version = get_max_version_in_state_merkle_db(&state_merkle_db) + .expect("Failed to get state merkle max version.") + .expect("State merkle max version cannot be None."); + if state_merkle_max_version > overall_commit_progress { + let difference = state_merkle_max_version - overall_commit_progress; + if crash_if_difference_is_too_large { + assert_le!(difference, MAX_COMMIT_PROGRESS_DIFFERENCE); + } + + let state_merkle_target_version = find_tree_root_at_or_before( + &ledger_db.metadata_db_arc(), + &state_merkle_db, + overall_commit_progress, + )? + .unwrap_or_else(|| { + panic!( + "Could not find a valid root before or at version {}, maybe it was pruned?", + overall_commit_progress + ) + }); + + info!( + state_merkle_max_version = state_merkle_max_version, + target_version = state_merkle_target_version, + "Start state merkle truncation..." + ); + truncate_state_merkle_db(&state_merkle_db, state_merkle_target_version) + .expect("Failed to truncate state merkle db."); + } } else { info!("No overall commit progress was found!"); } @@ -448,7 +483,7 @@ impl StateStore { let latest_snapshot_version = state_db .state_merkle_db - .get_state_snapshot_version_before(Version::MAX) + .get_state_snapshot_version_before(u64::MAX) .expect("Failed to query latest node on initialization."); info!( @@ -457,6 +492,10 @@ impl StateStore { "Initializing BufferedState." ); let latest_snapshot_root_hash = if let Some(version) = latest_snapshot_version { + ensure!( + version < num_transactions, + "State merkle commit progress cannot go beyond overall commit progress." + ); state_db .state_merkle_db .get_root_hash(version) diff --git a/storage/aptosdb/src/utils/truncation_helper.rs b/storage/aptosdb/src/utils/truncation_helper.rs index 7497efe067651..fd654ed59d8c2 100644 --- a/storage/aptosdb/src/utils/truncation_helper.rs +++ b/storage/aptosdb/src/utils/truncation_helper.rs @@ -150,9 +150,11 @@ pub(crate) fn truncate_state_merkle_db( break; } - let version_before = - find_closest_node_version_at_or_before(state_merkle_db, current_version - 1)? - .expect("Must exist."); + let version_before = find_closest_node_version_at_or_before( + state_merkle_db.metadata_db(), + current_version - 1, + )? + .expect("Must exist."); let top_levels_batch = SchemaBatch::new(); @@ -195,19 +197,77 @@ pub(crate) fn truncate_state_merkle_db_single_shard( state_merkle_db.commit_single_shard(target_version, shard_id, batch) } +pub(crate) fn find_tree_root_at_or_before( + ledger_metadata_db: &DB, + state_merkle_db: &StateMerkleDb, + version: Version, +) -> Result> { + match find_closest_node_version_at_or_before(state_merkle_db, version)? { + Some(closest_version) => { + if root_exists_at_version(state_merkle_db, closest_version)? { + return Ok(Some(closest_version)); + } + let mut iter = ledger_metadata_db.iter::()?; + iter.seek_for_prev(&version)?; + match iter.next().transpose()? { + Some((closest_epoch_version, _)) => { + if root_exists_at_version(state_merkle_db, closest_epoch_version)? { + Ok(Some(closest_epoch_version)) + } else { + Ok(None) + } + }, + None => Ok(None), + } + }, + None => Ok(None), + } +} + +pub(crate) fn root_exists_at_version( + state_merkle_db: &StateMerkleDb, + version: Version, +) -> Result { + Ok(state_merkle_db + .metadata_db() + .get::(&NodeKey::new_empty_path(version))? + .is_some()) +} + pub(crate) fn get_current_version_in_state_merkle_db( state_merkle_db: &StateMerkleDb, ) -> Result> { - find_closest_node_version_at_or_before(state_merkle_db, u64::max_value()) + find_closest_node_version_at_or_before(state_merkle_db.metadata_db(), u64::max_value()) } -pub(crate) fn find_closest_node_version_at_or_before( +pub(crate) fn get_max_version_in_state_merkle_db( state_merkle_db: &StateMerkleDb, +) -> Result> { + let mut version = get_current_version_in_state_merkle_db(state_merkle_db)?; + for shard_id in 0..16 { + let shard_version = find_closest_node_version_at_or_before( + state_merkle_db.db_shard(shard_id), + u64::max_value(), + )?; + if version.is_none() { + version = shard_version; + } else { + if let Some(shard_version) = shard_version { + if shard_version > version.unwrap() { + version = Some(shard_version); + } + } + } + } + + Ok(version) +} + +pub(crate) fn find_closest_node_version_at_or_before( + db: &DB, version: Version, ) -> Result> { - let mut iter = state_merkle_db - .metadata_db() - .rev_iter::()?; + let mut iter = db.rev_iter::()?; iter.seek_for_prev(&NodeKey::new_empty_path(version))?; Ok(iter.next().transpose()?.map(|item| item.0.version())) }