Skip to content

Commit

Permalink
fix fix truncate state merkle DB
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse committed Oct 26, 2024
1 parent d6068b9 commit a6dd53d
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 64 deletions.
30 changes: 9 additions & 21 deletions storage/aptosdb/src/db_debugger/truncate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,15 @@
use crate::{
db::AptosDB,
db_debugger::ShardingConfig,
schema::{
db_metadata::{DbMetadataKey, DbMetadataSchema, DbMetadataValue},
epoch_by_version::EpochByVersionSchema,
jellyfish_merkle_node::JellyfishMerkleNodeSchema,
},
state_merkle_db::StateMerkleDb,
schema::db_metadata::{DbMetadataKey, DbMetadataSchema, DbMetadataValue},
state_store::StateStore,
utils::truncation_helper::{
find_closest_node_version_at_or_before, find_tree_root_at_or_before,
get_current_version_in_state_merkle_db, get_state_kv_commit_progress,
truncate_state_merkle_db,
},
};
use aptos_config::config::{RocksdbConfigs, StorageDirPaths};
use aptos_jellyfish_merkle::node_type::NodeKey;
use aptos_schemadb::{SchemaBatch, DB};
use aptos_schemadb::SchemaBatch;
use aptos_storage_interface::{db_ensure as ensure, AptosDbError, Result};
use aptos_types::transaction::Version;
use claims::assert_le;
use clap::Parser;
use std::{fs, path::PathBuf, sync::Arc};
Expand Down Expand Up @@ -174,7 +165,8 @@ mod test {
AptosDB,
},
schema::{
epoch_by_version::EpochByVersionSchema, ledger_info::LedgerInfoSchema,
epoch_by_version::EpochByVersionSchema,
jellyfish_merkle_node::JellyfishMerkleNodeSchema, ledger_info::LedgerInfoSchema,
stale_node_index::StaleNodeIndexSchema,
stale_node_index_cross_epoch::StaleNodeIndexCrossEpochSchema,
stale_state_value_index::StaleStateValueIndexSchema,
Expand Down Expand Up @@ -325,17 +317,13 @@ mod test {
}

let mut iter = state_kv_db.metadata_db().iter::<StaleStateValueIndexSchema>().unwrap();
iter.seek_to_first();
for item in iter {
let version = item.unwrap().0.stale_since_version;
prop_assert!(version <= target_version);
}
iter.seek_to_first();
for item in iter {
let version = item.unwrap().0.stale_since_version;
prop_assert!(version <= target_version);
}
}





let mut iter = state_merkle_db.metadata_db().iter::<StaleNodeIndexSchema>().unwrap();
iter.seek_to_first();
for item in iter {
Expand Down
31 changes: 13 additions & 18 deletions storage/aptosdb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,19 +407,18 @@ impl StateStore {
if crash_if_difference_is_too_large {
assert_le!(difference, MAX_COMMIT_PROGRESS_DIFFERENCE);
}

let state_merkle_target_version = find_tree_root_at_or_before(
&ledger_db.metadata_db_arc(),
&state_merkle_db,
overall_commit_progress,
)?
.unwrap_or_else(|| {
panic!(
"Could not find a valid root before or at version {}, maybe it was pruned?",
overall_commit_progress
)
});

}
let db = state_merkle_db.metadata_db();
let state_merkle_target_version =
find_tree_root_at_or_before(db, &state_merkle_db, overall_commit_progress)
.expect("DB read failed.")
.unwrap_or_else(|| {
panic!(
"Could not find a valid root before or at version {}, maybe it was pruned?",
overall_commit_progress
)
});
if state_merkle_target_version < state_merkle_max_version {
info!(
state_merkle_max_version = state_merkle_max_version,
target_version = state_merkle_target_version,
Expand Down Expand Up @@ -483,7 +482,7 @@ impl StateStore {

let latest_snapshot_version = state_db
.state_merkle_db
.get_state_snapshot_version_before(u64::MAX)
.get_state_snapshot_version_before(Version::MAX)
.expect("Failed to query latest node on initialization.");

info!(
Expand All @@ -492,10 +491,6 @@ impl StateStore {
"Initializing BufferedState."
);
let latest_snapshot_root_hash = if let Some(version) = latest_snapshot_version {
ensure!(
version < num_transactions,
"State merkle commit progress cannot go beyond overall commit progress."
);
state_db
.state_merkle_db
.get_root_hash(version)
Expand Down
57 changes: 33 additions & 24 deletions storage/aptosdb/src/utils/truncation_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,26 +202,35 @@ pub(crate) fn find_tree_root_at_or_before(
state_merkle_db: &StateMerkleDb,
version: Version,
) -> Result<Option<Version>> {
match find_closest_node_version_at_or_before(state_merkle_db, version)? {
Some(closest_version) => {
if let Some(closest_version) =
find_closest_node_version_at_or_before(state_merkle_db.metadata_db(), version)?
{
if root_exists_at_version(state_merkle_db, closest_version)? {
return Ok(Some(closest_version));
}

// It's possible that it's a partial commit when sharding is not enabled,
// look again for the previous version:
if let Some(closest_version) =
find_closest_node_version_at_or_before(state_merkle_db.metadata_db(), version)?
{
if root_exists_at_version(state_merkle_db, closest_version)? {
return Ok(Some(closest_version));
}

// Now we are probably looking at a pruned version in this epoch, look for the previous
// epoch ending:
let mut iter = ledger_metadata_db.iter::<EpochByVersionSchema>()?;
iter.seek_for_prev(&version)?;
match iter.next().transpose()? {
Some((closest_epoch_version, _)) => {
if root_exists_at_version(state_merkle_db, closest_epoch_version)? {
Ok(Some(closest_epoch_version))
} else {
Ok(None)
}
},
None => Ok(None),
if let Some((closest_epoch_version, _)) = iter.next().transpose()? {
if root_exists_at_version(state_merkle_db, closest_epoch_version)? {
return Ok(Some(closest_epoch_version));
}
}
},
None => Ok(None),
}
}

Ok(None)
}

pub(crate) fn root_exists_at_version(
Expand All @@ -237,29 +246,29 @@ pub(crate) fn root_exists_at_version(
pub(crate) fn get_current_version_in_state_merkle_db(
state_merkle_db: &StateMerkleDb,
) -> Result<Option<Version>> {
find_closest_node_version_at_or_before(state_merkle_db.metadata_db(), u64::max_value())
find_closest_node_version_at_or_before(state_merkle_db.metadata_db(), Version::MAX)
}

pub(crate) fn get_max_version_in_state_merkle_db(
state_merkle_db: &StateMerkleDb,
) -> Result<Option<Version>> {
let mut version = get_current_version_in_state_merkle_db(state_merkle_db)?;
for shard_id in 0..16 {
let shard_version = find_closest_node_version_at_or_before(
state_merkle_db.db_shard(shard_id),
u64::max_value(),
)?;
if version.is_none() {
version = shard_version;
} else {
if let Some(shard_version) = shard_version {
let num_real_shards = state_merkle_db.hack_num_real_shards() as u8;
if num_real_shards > 1 {
for shard_id in 0..num_real_shards {
let shard_version = find_closest_node_version_at_or_before(
state_merkle_db.db_shard(shard_id),
Version::MAX,
)?;
if version.is_none() {
version = shard_version;
} else if let Some(shard_version) = shard_version {
if shard_version > version.unwrap() {
version = Some(shard_version);
}
}
}
}

Ok(version)
}

Expand Down
2 changes: 1 addition & 1 deletion testsuite/smoke-test/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ async fn test_db_restart() {
quit_flag.clone(),
));

for round in 0..10 {
for round in 0..3 {
info!("{LINE} Restart round {round}");
for (v, vid) in restarting_validator_ids.iter().enumerate() {
info!("{LINE} Round {round}: Restarting validator {v}.");
Expand Down

0 comments on commit a6dd53d

Please sign in to comment.