diff --git a/storage/aptosdb/src/lib.rs b/storage/aptosdb/src/lib.rs index cbffabc926ca0..c81228c1d529c 100644 --- a/storage/aptosdb/src/lib.rs +++ b/storage/aptosdb/src/lib.rs @@ -57,7 +57,7 @@ use crate::{ ledger_store::ledger_store_pruner::LedgerPruner, pruner_manager::PrunerManager, pruner_utils, state_kv_pruner::StateKvPruner, state_kv_pruner_manager::StateKvPrunerManager, - state_merkle_pruner_manager::StateMerklePrunerManager, state_store::StateMerklePruner, + state_merkle_pruner_manager::StateMerklePrunerManager, }, schema::*, stale_node_index::StaleNodeIndexSchema, @@ -2176,12 +2176,8 @@ impl DbWriter for AptosDB { .pruner() .save_min_readable_version(version, &batch)?; - let mut state_merkle_batch = SchemaBatch::new(); - StateMerklePruner::prune_genesis( - self.state_merkle_db.clone(), - &mut state_merkle_batch, - )?; - + // TODO(joshlind): Figure out a way to not write genesis. + let state_merkle_batch = SchemaBatch::new(); self.state_store .state_merkle_pruner .pruner() diff --git a/storage/aptosdb/src/pruner/state_store/generics.rs b/storage/aptosdb/src/pruner/state_store/generics.rs index 362cb7e334b3c..8b2fdc1164408 100644 --- a/storage/aptosdb/src/pruner/state_store/generics.rs +++ b/storage/aptosdb/src/pruner/state_store/generics.rs @@ -12,13 +12,17 @@ pub trait StaleNodeIndexSchemaTrait: Schema where StaleNodeIndex: KeyCodec, { - fn tag() -> DbMetadataKey; + fn tag(shard_id: Option) -> DbMetadataKey; fn name() -> &'static str; } impl StaleNodeIndexSchemaTrait for StaleNodeIndexSchema { - fn tag() -> DbMetadataKey { - DbMetadataKey::StateMerklePrunerProgress + fn tag(shard_id: Option) -> DbMetadataKey { + if let Some(shard_id) = shard_id { + DbMetadataKey::StateMerkleShardPrunerProgress(shard_id as usize) + } else { + DbMetadataKey::StateMerklePrunerProgress + } } fn name() -> &'static str { @@ -27,8 +31,12 @@ impl StaleNodeIndexSchemaTrait for StaleNodeIndexSchema { } impl StaleNodeIndexSchemaTrait for StaleNodeIndexCrossEpochSchema { - fn tag() -> DbMetadataKey { - DbMetadataKey::EpochEndingStateMerklePrunerProgress + fn tag(shard_id: Option) -> DbMetadataKey { + if let Some(shard_id) = shard_id { + DbMetadataKey::EpochEndingStateMerkleShardPrunerProgress(shard_id as usize) + } else { + DbMetadataKey::EpochEndingStateMerklePrunerProgress + } } fn name() -> &'static str { diff --git a/storage/aptosdb/src/pruner/state_store/mod.rs b/storage/aptosdb/src/pruner/state_store/mod.rs index f4a0aa418ab98..349333f6cb487 100644 --- a/storage/aptosdb/src/pruner/state_store/mod.rs +++ b/storage/aptosdb/src/pruner/state_store/mod.rs @@ -6,17 +6,17 @@ use crate::{ jellyfish_merkle_node::JellyfishMerkleNodeSchema, metrics::PRUNER_VERSIONS, pruner::{db_pruner::DBPruner, state_store::generics::StaleNodeIndexSchemaTrait}, - pruner_utils, schema::db_metadata::DbMetadataValue, state_merkle_db::StateMerkleDb, - StaleNodeIndexCrossEpochSchema, OTHER_TIMERS_SECONDS, + OTHER_TIMERS_SECONDS, }; use anyhow::Result; use aptos_infallible::Mutex; use aptos_jellyfish_merkle::{node_type::NodeKey, StaleNodeIndex}; -use aptos_logger::error; -use aptos_schemadb::{schema::KeyCodec, ReadOptions, SchemaBatch}; +use aptos_schemadb::{schema::KeyCodec, ReadOptions, SchemaBatch, DB}; use aptos_types::transaction::{AtomicVersion, Version}; +use claims::{assert_ge, assert_lt}; +use once_cell::sync::Lazy; use std::sync::{atomic::Ordering, Arc}; pub mod generics; @@ -25,7 +25,13 @@ pub(crate) mod state_value_pruner; #[cfg(test)] mod test; -pub const STATE_MERKLE_PRUNER_NAME: &str = "state_merkle_pruner"; +static TREE_PRUNER_WORKER_POOL: Lazy = Lazy::new(|| { + rayon::ThreadPoolBuilder::new() + .num_threads(16) + .thread_name(|index| format!("tree_pruner_worker_{}", index)) + .build() + .unwrap() +}); /// Responsible for pruning the state tree. #[derive(Debug)] @@ -34,9 +40,12 @@ pub struct StateMerklePruner { state_merkle_db: Arc, /// Keeps track of the target version that the pruner needs to achieve. target_version: AtomicVersion, - /// 1. min readable version - /// 2. if things before that version fully cleaned - progress: Mutex<(Version, bool)>, + /// Overall min readable version. + progress: Mutex, + /// Min readable version for each shard. + shard_progresses: Mutex>, + /// The version that is going to be pruned next. + next_version: Mutex>, _phantom: std::marker::PhantomData, } @@ -45,7 +54,7 @@ where StaleNodeIndex: KeyCodec, { fn name(&self) -> &'static str { - STATE_MERKLE_PRUNER_NAME + S::name() } fn prune(&self, batch_size: usize) -> Result { @@ -55,17 +64,7 @@ where let min_readable_version = self.min_readable_version(); let target_version = self.target_version(); - match self.prune_state_merkle(min_readable_version, target_version, batch_size, None) { - Ok(new_min_readable_version) => Ok(new_min_readable_version), - Err(e) => { - error!( - error = ?e, - "Error pruning stale states.", - ); - Err(e) - // On error, stop retrying vigorously by making next recv() blocking. - }, - } + self.prune_state_merkle(min_readable_version, target_version, batch_size) } fn save_min_readable_version( @@ -73,20 +72,22 @@ where version: Version, batch: &SchemaBatch, ) -> anyhow::Result<()> { - batch.put::(&S::tag(), &DbMetadataValue::Version(version)) + // TODO(grao): Support sharding here. + batch.put::(&S::tag(None), &DbMetadataValue::Version(version)) } fn initialize_min_readable_version(&self) -> Result { - Ok(self + let min_readable_version = self .state_merkle_db .metadata_db() - .get::(&S::tag())? - .map_or(0, |v| v.expect_version())) + .get::(&S::tag(None))? + .map_or(0, |v| v.expect_version()); + self.finish_pending_pruning(min_readable_version)?; + Ok(min_readable_version) } fn min_readable_version(&self) -> Version { - let (version, _) = *self.progress.lock(); - version + *self.progress.lock() } fn set_target_version(&self, target_version: Version) { @@ -102,17 +103,19 @@ where // used only by blanket `initialize()`, use the underlying implementation instead elsewhere. fn record_progress(&self, min_readable_version: Version) { - self.record_progress_impl(min_readable_version, false /* is_fully_pruned */); + *self.progress.lock() = min_readable_version; + PRUNER_VERSIONS + .with_label_values(&[S::name(), "min_readable"]) + .set(min_readable_version as i64); } fn is_pruning_pending(&self) -> bool { - let (min_readable_version, fully_pruned) = *self.progress.lock(); - self.target_version() > min_readable_version || !fully_pruned + self.target_version() > *self.progress.lock() } /// (For tests only.) Updates the minimal readable version kept by pruner. fn testonly_update_min_version(&self, version: Version) { - self.record_progress_impl(version, true /* is_fully_pruned */); + self.record_progress(version); } } @@ -121,96 +124,193 @@ where StaleNodeIndex: KeyCodec, { pub fn new(state_merkle_db: Arc) -> Self { + let num_shards = state_merkle_db.num_shards(); + let mut shard_progresses = Vec::with_capacity(num_shards as usize); + for shard_id in 0..num_shards { + let db_shard = state_merkle_db.db_shard(shard_id); + shard_progresses + .push(Self::get_progress(db_shard, Some(shard_id)).expect("Must succeed.")); + } + let pruner = StateMerklePruner { state_merkle_db, target_version: AtomicVersion::new(0), - progress: Mutex::new((0, true)), + progress: Mutex::new(0), + shard_progresses: Mutex::new(shard_progresses), + next_version: Mutex::new(None), _phantom: std::marker::PhantomData, }; pruner.initialize(); pruner } - // If the existing schema batch is not none, this function only adds items need to be - // deleted to the schema batch and the caller is responsible for committing the schema batches - // to the DB. - pub fn prune_state_merkle( + fn get_progress(state_merkle_db_shard: &DB, shard_id: Option) -> Result { + Ok(state_merkle_db_shard + .get::(&S::tag(shard_id))? + .map_or(0, |v| v.expect_version())) + } + + fn prune_state_merkle( &self, min_readable_version: Version, target_version: Version, batch_size: usize, - existing_schema_batch: Option<&mut SchemaBatch>, - ) -> anyhow::Result { - assert_ne!(batch_size, 0); - if target_version < min_readable_version { - return Ok(min_readable_version); - } - let (indices, is_end_of_target_version) = - self.get_stale_node_indices(min_readable_version, target_version, batch_size)?; - if indices.is_empty() { - self.record_progress_impl(target_version, is_end_of_target_version); - Ok(target_version) - } else { - let _timer = OTHER_TIMERS_SECONDS - .with_label_values(&["state_merkle_pruner_commit"]) - .start_timer(); - let new_min_readable_version = - indices.last().expect("Should exist.").stale_since_version; - - // Delete stale nodes. - if let Some(existing_schema_batch) = existing_schema_batch { - indices.into_iter().try_for_each(|index| { - existing_schema_batch.delete::(&index.node_key)?; - existing_schema_batch.delete::(&index) - })?; + ) -> Result { + let mut min_readable_version = min_readable_version; + let mut target_version_for_this_batch = + self.next_version.lock().unwrap_or(min_readable_version); + while target_version_for_this_batch <= target_version { + self.record_progress(target_version_for_this_batch); + let next_version = + self.prune_top_levels(min_readable_version, target_version_for_this_batch)?; + *self.next_version.lock() = next_version; + self.prune_shards(target_version_for_this_batch, batch_size)?; + min_readable_version = target_version_for_this_batch; + if let Some(next_version) = next_version { + target_version_for_this_batch = next_version; } else { + break; + } + } + self.record_progress(target_version); + Ok(target_version) + } + + fn prune_top_levels( + &self, + min_readable_version: Version, + target_version: Version, + ) -> Result> { + let batch = SchemaBatch::new(); + let next_version = self.prune_state_merkle_shard( + self.state_merkle_db.metadata_db(), + min_readable_version, + target_version, + usize::max_value(), + &batch, + )?; + batch.put::(&S::tag(None), &DbMetadataValue::Version(target_version))?; + self.state_merkle_db.metadata_db().write_schemas(batch)?; + + Ok(next_version) + } + + fn prune_single_shard( + &self, + shard_id: u8, + target_version: Version, + batch_size: usize, + ) -> Result<()> { + let _timer = OTHER_TIMERS_SECONDS + .with_label_values(&["state_merkle_pruner___prune_single_shard"]) + .start_timer(); + let shard_min_readable_version = self.get_shard_progress(shard_id); + if shard_min_readable_version != target_version { + assert_lt!(shard_min_readable_version, target_version); + self.update_shard_progress(shard_id, target_version); + let db_shard = self.state_merkle_db.db_shard(shard_id); + loop { let batch = SchemaBatch::new(); - indices.into_iter().try_for_each(|index| { - batch.delete::(&index.node_key)?; - batch.delete::(&index) - })?; + let next_version = self.prune_state_merkle_shard( + db_shard, + shard_min_readable_version, + target_version, + batch_size, + &batch, + )?; + if let Some(next_version) = next_version { + if next_version <= target_version { + db_shard.write_schemas(batch)?; + continue; + } + } + batch.put::( + &S::tag(Some(shard_id)), + &DbMetadataValue::Version(target_version), + )?; + db_shard.write_schemas(batch)?; + break; + } + } - self.save_min_readable_version(new_min_readable_version, &batch)?; + Ok(()) + } - // TODO(grao): Support sharding here. - self.state_merkle_db.metadata_db().write_schemas(batch)?; + fn prune_shards(&self, target_version: Version, batch_size: usize) -> Result<()> { + let num_shards = self.state_merkle_db.num_shards(); + TREE_PRUNER_WORKER_POOL.scope(|s| { + for shard_id in 0..num_shards { + s.spawn(move |_| { + self.prune_single_shard(shard_id, target_version, batch_size) + .unwrap_or_else(|_| { + panic!("Failed to prune state merkle shard {shard_id}.") + }); + }); } + }); - // TODO(zcc): recording progress after writing schemas might provide wrong answers to - // API calls when they query min_readable_version while the write_schemas are still in - // progress. - self.record_progress_impl(new_min_readable_version, is_end_of_target_version); - Ok(new_min_readable_version) - } + Ok(()) } - fn record_progress_impl(&self, min_readable_version: Version, is_fully_pruned: bool) { - *self.progress.lock() = (min_readable_version, is_fully_pruned); - PRUNER_VERSIONS - .with_label_values(&[S::name(), "min_readable"]) - .set(min_readable_version as i64); + fn finish_pending_pruning(&self, min_readable_version: Version) -> Result<()> { + self.prune_shards(min_readable_version, usize::max_value()) + } + + fn get_shard_progress(&self, shard_id: u8) -> Version { + self.shard_progresses.lock()[shard_id as usize] + } + + fn update_shard_progress(&self, shard_id: u8, progress: Version) { + self.shard_progresses.lock()[shard_id as usize] = progress; + } + + // If the existing schema batch is not none, this function only adds items need to be + // deleted to the schema batch and the caller is responsible for committing the schema batches + // to the DB. + fn prune_state_merkle_shard( + &self, + state_merkle_db_shard: &DB, + min_readable_version: Version, + target_version: Version, + batch_size: usize, + batch: &SchemaBatch, + ) -> Result> { + assert_ne!(batch_size, 0); + assert_ge!(target_version, min_readable_version); + let (indices, next_version) = self.get_stale_node_indices( + state_merkle_db_shard, + min_readable_version, + target_version, + batch_size, + )?; + + indices.into_iter().try_for_each(|index| { + batch.delete::(&index.node_key)?; + batch.delete::(&index) + })?; + + Ok(next_version) } fn get_stale_node_indices( &self, + state_merkle_db_shard: &DB, start_version: Version, target_version: Version, batch_size: usize, - ) -> Result<(Vec, bool)> { + ) -> Result<(Vec, Option)> { let mut indices = Vec::new(); - // TODO(grao): Support sharding here. - let mut iter = self - .state_merkle_db - .metadata_db() - .iter::(ReadOptions::default())?; + let mut iter = state_merkle_db_shard.iter::(ReadOptions::default())?; iter.seek(&StaleNodeIndex { stale_since_version: start_version, node_key: NodeKey::new_empty_path(0), })?; + let mut next_version = None; // over fetch by 1 for _ in 0..=batch_size { if let Some((index, _)) = iter.next().transpose()? { + next_version = Some(index.stale_since_version); if index.stale_since_version <= target_version { indices.push(index); continue; @@ -219,39 +319,9 @@ where break; } - let is_end_of_target_version = if indices.len() > batch_size { + if indices.len() > batch_size { indices.pop(); - false - } else { - true - }; - Ok((indices, is_end_of_target_version)) - } -} - -impl StateMerklePruner { - /// Prunes the genesis state and saves the db alterations to the given change set - pub fn prune_genesis( - state_merkle_db: Arc, - batch: &mut SchemaBatch, - ) -> Result<()> { - let target_version = 1; // The genesis version is 0. Delete [0,1) (exclusive) - let max_version = 1; // We should only be pruning a single version - - let state_merkle_pruner = pruner_utils::create_state_merkle_pruner::< - StaleNodeIndexCrossEpochSchema, - >(state_merkle_db); - state_merkle_pruner.set_target_version(target_version); - - let min_readable_version = state_merkle_pruner.min_readable_version(); - let target_version = state_merkle_pruner.target_version(); - state_merkle_pruner.prune_state_merkle( - min_readable_version, - target_version, - max_version, - Some(batch), - )?; - - Ok(()) + } + Ok((indices, next_version)) } } diff --git a/storage/aptosdb/src/schema/db_metadata/mod.rs b/storage/aptosdb/src/schema/db_metadata/mod.rs index fc5dbb137fbaa..fcc4ca8cf86a4 100644 --- a/storage/aptosdb/src/schema/db_metadata/mod.rs +++ b/storage/aptosdb/src/schema/db_metadata/mod.rs @@ -57,6 +57,8 @@ pub enum DbMetadataKey { StateKvShardCommitProgress(ShardId), StateMerkleCommitProgress, StateMerkleShardCommitProgress(ShardId), + StateMerkleShardPrunerProgress(ShardId), + EpochEndingStateMerkleShardPrunerProgress(ShardId), } define_schema!( diff --git a/storage/aptosdb/src/state_merkle_db.rs b/storage/aptosdb/src/state_merkle_db.rs index 57ddd55afd293..6c81c313f6f03 100644 --- a/storage/aptosdb/src/state_merkle_db.rs +++ b/storage/aptosdb/src/state_merkle_db.rs @@ -362,6 +362,10 @@ impl StateMerkleDb { &self.lru_cache } + pub(crate) fn num_shards(&self) -> u8 { + NUM_STATE_SHARDS as u8 + } + fn db_by_key(&self, node_key: &NodeKey) -> &DB { if let Some(shard_id) = node_key.get_shard_id() { self.db_shard(shard_id)