Skip to content

Commit

Permalink
[Fix][Storage] Truncate state merkle db at start so it doesn't go bey…
Browse files Browse the repository at this point in the history
…ond the overall commit progress.
  • Loading branch information
grao1991 authored and msmouse committed Oct 26, 2024
1 parent 4c0fb1e commit d6068b9
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 70 deletions.
65 changes: 5 additions & 60 deletions storage/aptosdb/src/db_debugger/truncate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ use crate::{
state_merkle_db::StateMerkleDb,
state_store::StateStore,
utils::truncation_helper::{
find_closest_node_version_at_or_before, get_current_version_in_state_merkle_db,
get_state_kv_commit_progress, truncate_state_merkle_db,
find_closest_node_version_at_or_before, find_tree_root_at_or_before,
get_current_version_in_state_merkle_db, get_state_kv_commit_progress,
truncate_state_merkle_db,
},
};
use aptos_config::config::{RocksdbConfigs, StorageDirPaths};
Expand Down Expand Up @@ -127,30 +128,7 @@ impl Cmd {
.0;
}

// TODO(grao): We are using a brute force implementation for now. We might be able to make
// it faster, since our data is append only.
if target_version < state_merkle_db_version {
let state_merkle_target_version = Self::find_tree_root_at_or_before(
&ledger_db.metadata_db_arc(),
&state_merkle_db,
target_version,
)?
.unwrap_or_else(|| {
panic!(
"Could not find a valid root before or at version {}, maybe it was pruned?",
target_version
)
});

println!(
"Starting state merkle db truncation... target_version: {}",
state_merkle_target_version
);
truncate_state_merkle_db(&state_merkle_db, state_merkle_target_version)?;
println!("Done!");
}

println!("Starting ledger db and state kv db truncation...");
println!("Starting db truncation...");
let batch = SchemaBatch::new();
batch.put::<DbMetadataSchema>(
&DbMetadataKey::OverallCommitProgress,
Expand All @@ -161,6 +139,7 @@ impl Cmd {
StateStore::sync_commit_progress(
Arc::clone(&ledger_db),
Arc::clone(&state_kv_db),
Arc::clone(&state_merkle_db),
/*crash_if_difference_is_too_large=*/ false,
);
println!("Done!");
Expand All @@ -183,40 +162,6 @@ impl Cmd {

Ok(())
}

fn find_tree_root_at_or_before(
ledger_metadata_db: &DB,
state_merkle_db: &StateMerkleDb,
version: Version,
) -> Result<Option<Version>> {
match find_closest_node_version_at_or_before(state_merkle_db, version)? {
Some(closest_version) => {
if Self::root_exists_at_version(state_merkle_db, closest_version)? {
return Ok(Some(closest_version));
}
let mut iter = ledger_metadata_db.iter::<EpochByVersionSchema>()?;
iter.seek_for_prev(&version)?;
match iter.next().transpose()? {
Some((closest_epoch_version, _)) => {
if Self::root_exists_at_version(state_merkle_db, closest_epoch_version)? {
Ok(Some(closest_epoch_version))
} else {
Ok(None)
}
},
None => Ok(None),
}
},
None => Ok(None),
}
}

fn root_exists_at_version(state_merkle_db: &StateMerkleDb, version: Version) -> Result<bool> {
Ok(state_merkle_db
.metadata_db()
.get::<JellyfishMerkleNodeSchema>(&NodeKey::new_empty_path(version))?
.is_some())
}
}

#[cfg(test)]
Expand Down
43 changes: 41 additions & 2 deletions storage/aptosdb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ use crate::{
utils::{
iterators::PrefixedStateValueIterator,
new_sharded_kv_schema_batch,
truncation_helper::{truncate_ledger_db, truncate_state_kv_db},
truncation_helper::{
find_tree_root_at_or_before, get_max_version_in_state_merkle_db, truncate_ledger_db,
truncate_state_kv_db, truncate_state_merkle_db,
},
ShardedStateKvSchemaBatch,
},
};
Expand Down Expand Up @@ -297,6 +300,7 @@ impl StateStore {
Self::sync_commit_progress(
Arc::clone(&ledger_db),
Arc::clone(&state_kv_db),
Arc::clone(&state_merkle_db),
/*crash_if_difference_is_too_large=*/ true,
);
}
Expand Down Expand Up @@ -339,6 +343,7 @@ impl StateStore {
pub fn sync_commit_progress(
ledger_db: Arc<LedgerDb>,
state_kv_db: Arc<StateKvDb>,
state_merkle_db: Arc<StateMerkleDb>,
crash_if_difference_is_too_large: bool,
) {
let ledger_metadata_db = ledger_db.metadata_db();
Expand Down Expand Up @@ -393,6 +398,36 @@ impl StateStore {
std::cmp::max(difference as usize, 1), /* batch_size */
)
.expect("Failed to truncate state K/V db.");

let state_merkle_max_version = get_max_version_in_state_merkle_db(&state_merkle_db)
.expect("Failed to get state merkle max version.")
.expect("State merkle max version cannot be None.");
if state_merkle_max_version > overall_commit_progress {
let difference = state_merkle_max_version - overall_commit_progress;
if crash_if_difference_is_too_large {
assert_le!(difference, MAX_COMMIT_PROGRESS_DIFFERENCE);
}

let state_merkle_target_version = find_tree_root_at_or_before(
&ledger_db.metadata_db_arc(),
&state_merkle_db,
overall_commit_progress,
)?
.unwrap_or_else(|| {
panic!(
"Could not find a valid root before or at version {}, maybe it was pruned?",
overall_commit_progress
)
});

info!(
state_merkle_max_version = state_merkle_max_version,
target_version = state_merkle_target_version,
"Start state merkle truncation..."
);
truncate_state_merkle_db(&state_merkle_db, state_merkle_target_version)
.expect("Failed to truncate state merkle db.");
}
} else {
info!("No overall commit progress was found!");
}
Expand Down Expand Up @@ -448,7 +483,7 @@ impl StateStore {

let latest_snapshot_version = state_db
.state_merkle_db
.get_state_snapshot_version_before(Version::MAX)
.get_state_snapshot_version_before(u64::MAX)
.expect("Failed to query latest node on initialization.");

info!(
Expand All @@ -457,6 +492,10 @@ impl StateStore {
"Initializing BufferedState."
);
let latest_snapshot_root_hash = if let Some(version) = latest_snapshot_version {
ensure!(
version < num_transactions,
"State merkle commit progress cannot go beyond overall commit progress."
);
state_db
.state_merkle_db
.get_root_hash(version)
Expand Down
76 changes: 68 additions & 8 deletions storage/aptosdb/src/utils/truncation_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,11 @@ pub(crate) fn truncate_state_merkle_db(
break;
}

let version_before =
find_closest_node_version_at_or_before(state_merkle_db, current_version - 1)?
.expect("Must exist.");
let version_before = find_closest_node_version_at_or_before(
state_merkle_db.metadata_db(),
current_version - 1,
)?
.expect("Must exist.");

let top_levels_batch = SchemaBatch::new();

Expand Down Expand Up @@ -195,19 +197,77 @@ pub(crate) fn truncate_state_merkle_db_single_shard(
state_merkle_db.commit_single_shard(target_version, shard_id, batch)
}

pub(crate) fn find_tree_root_at_or_before(
ledger_metadata_db: &DB,
state_merkle_db: &StateMerkleDb,
version: Version,
) -> Result<Option<Version>> {
match find_closest_node_version_at_or_before(state_merkle_db, version)? {
Some(closest_version) => {
if root_exists_at_version(state_merkle_db, closest_version)? {
return Ok(Some(closest_version));
}
let mut iter = ledger_metadata_db.iter::<EpochByVersionSchema>()?;
iter.seek_for_prev(&version)?;
match iter.next().transpose()? {
Some((closest_epoch_version, _)) => {
if root_exists_at_version(state_merkle_db, closest_epoch_version)? {
Ok(Some(closest_epoch_version))
} else {
Ok(None)
}
},
None => Ok(None),
}
},
None => Ok(None),
}
}

pub(crate) fn root_exists_at_version(
state_merkle_db: &StateMerkleDb,
version: Version,
) -> Result<bool> {
Ok(state_merkle_db
.metadata_db()
.get::<JellyfishMerkleNodeSchema>(&NodeKey::new_empty_path(version))?
.is_some())
}

pub(crate) fn get_current_version_in_state_merkle_db(
state_merkle_db: &StateMerkleDb,
) -> Result<Option<Version>> {
find_closest_node_version_at_or_before(state_merkle_db, u64::max_value())
find_closest_node_version_at_or_before(state_merkle_db.metadata_db(), u64::max_value())
}

pub(crate) fn find_closest_node_version_at_or_before(
pub(crate) fn get_max_version_in_state_merkle_db(
state_merkle_db: &StateMerkleDb,
) -> Result<Option<Version>> {
let mut version = get_current_version_in_state_merkle_db(state_merkle_db)?;
for shard_id in 0..16 {
let shard_version = find_closest_node_version_at_or_before(
state_merkle_db.db_shard(shard_id),
u64::max_value(),
)?;
if version.is_none() {
version = shard_version;
} else {
if let Some(shard_version) = shard_version {
if shard_version > version.unwrap() {
version = Some(shard_version);
}
}
}
}

Ok(version)
}

pub(crate) fn find_closest_node_version_at_or_before(
db: &DB,
version: Version,
) -> Result<Option<Version>> {
let mut iter = state_merkle_db
.metadata_db()
.rev_iter::<JellyfishMerkleNodeSchema>()?;
let mut iter = db.rev_iter::<JellyfishMerkleNodeSchema>()?;
iter.seek_for_prev(&NodeKey::new_empty_path(version))?;
Ok(iter.next().transpose()?.map(|item| item.0.version()))
}
Expand Down

0 comments on commit d6068b9

Please sign in to comment.