From 2d6740f212241bdd806edffabf44ce5aa73b53dd Mon Sep 17 00:00:00 2001 From: Guoteng Rao <3603304+grao1991@users.noreply.github.com> Date: Mon, 10 Apr 2023 18:47:21 -0700 Subject: [PATCH] [Storage][Sharding] Sharded state merkle pruner. --- storage/aptosdb/src/aptosdb_test.rs | 1 + storage/aptosdb/src/pruner/db_pruner.rs | 20 +- .../pruner/event_store/event_store_pruner.rs | 6 +- .../ledger_store/ledger_store_pruner.rs | 19 +- storage/aptosdb/src/pruner/pruner_utils.rs | 15 +- storage/aptosdb/src/pruner/state_kv_pruner.rs | 38 +-- .../src/pruner/state_merkle_pruner_manager.rs | 5 +- .../src/pruner/state_store/generics.rs | 18 +- storage/aptosdb/src/pruner/state_store/mod.rs | 261 +++++++++--------- .../state_merkle_metadata_pruner.rs | 83 ++++++ .../state_store/state_merkle_shard_pruner.rs | 96 +++++++ .../transaction_accumulator_pruner.rs | 6 +- .../transaction_info_pruner.rs | 6 +- .../transaction_store/transaction_pruner.rs | 6 +- .../transaction_store/write_set_pruner.rs | 6 +- storage/aptosdb/src/schema/db_metadata/mod.rs | 2 + storage/aptosdb/src/state_merkle_db.rs | 19 ++ storage/aptosdb/src/utils/mod.rs | 14 +- 18 files changed, 401 insertions(+), 220 deletions(-) create mode 100644 storage/aptosdb/src/pruner/state_store/state_merkle_metadata_pruner.rs create mode 100644 storage/aptosdb/src/pruner/state_store/state_merkle_shard_pruner.rs diff --git a/storage/aptosdb/src/aptosdb_test.rs b/storage/aptosdb/src/aptosdb_test.rs index 02fc5617c7980e..e77846920f0ad4 100644 --- a/storage/aptosdb/src/aptosdb_test.rs +++ b/storage/aptosdb/src/aptosdb_test.rs @@ -288,6 +288,7 @@ proptest! { #[test] fn test_state_merkle_pruning(input in arb_blocks_to_commit()) { + aptos_logger::Logger::new().init(); test_state_merkle_pruning_impl(input); } } diff --git a/storage/aptosdb/src/pruner/db_pruner.rs b/storage/aptosdb/src/pruner/db_pruner.rs index acbbb9f63fe2b6..b06dcf38fa0f43 100644 --- a/storage/aptosdb/src/pruner/db_pruner.rs +++ b/storage/aptosdb/src/pruner/db_pruner.rs @@ -1,36 +1,18 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use anyhow::{Context, Result}; -use aptos_logger::info; +use anyhow::Result; use aptos_types::transaction::Version; use std::cmp::min; /// Defines the trait for pruner for different DB pub trait DBPruner: Send + Sync { - /// Find out the first undeleted item in the stale node index. - fn initialize(&self) { - let min_readable_version = self - .initialize_min_readable_version() - .context(self.name()) - .expect("Pruner failed to initialize."); - info!( - min_readable_version = min_readable_version, - "{} initialized.", - self.name() - ); - self.record_progress(min_readable_version); - } - fn name(&self) -> &'static str; /// Performs the actual pruning, a target version is passed, which is the target the pruner /// tries to prune. fn prune(&self, batch_size: usize) -> Result; - /// Initializes the least readable version stored in underlying DB storage - fn initialize_min_readable_version(&self) -> Result; - /// Returns the progress of the pruner. fn progress(&self) -> Version; diff --git a/storage/aptosdb/src/pruner/event_store/event_store_pruner.rs b/storage/aptosdb/src/pruner/event_store/event_store_pruner.rs index b35ba237bd4b98..27f76724c93b69 100644 --- a/storage/aptosdb/src/pruner/event_store/event_store_pruner.rs +++ b/storage/aptosdb/src/pruner/event_store/event_store_pruner.rs @@ -2,9 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - pruner::{ - db_sub_pruner::DBSubPruner, pruner_utils::get_or_initialize_ledger_subpruner_progress, - }, + pruner::{db_sub_pruner::DBSubPruner, pruner_utils::get_or_initialize_subpruner_progress}, schema::db_metadata::{DbMetadataKey, DbMetadataSchema, DbMetadataValue}, EventStore, }; @@ -38,7 +36,7 @@ impl EventStorePruner { event_db: Arc, metadata_progress: Version, ) -> Result { - let progress = get_or_initialize_ledger_subpruner_progress( + let progress = get_or_initialize_subpruner_progress( &event_db, &DbMetadataKey::EventPrunerProgress, metadata_progress, diff --git a/storage/aptosdb/src/pruner/ledger_store/ledger_store_pruner.rs b/storage/aptosdb/src/pruner/ledger_store/ledger_store_pruner.rs index 19d66aae57fe28..251985525d68ef 100644 --- a/storage/aptosdb/src/pruner/ledger_store/ledger_store_pruner.rs +++ b/storage/aptosdb/src/pruner/ledger_store/ledger_store_pruner.rs @@ -18,6 +18,7 @@ use crate::{ EventStore, TransactionStore, }; use anyhow::Result; +use aptos_logger::info; use aptos_types::transaction::{AtomicVersion, Version}; use std::{ cmp::min, @@ -66,10 +67,6 @@ impl DBPruner for LedgerPruner { Ok(target_version) } - fn initialize_min_readable_version(&self) -> Result { - self.ledger_metadata_pruner.progress() - } - fn progress(&self) -> Version { self.progress.load(Ordering::SeqCst) } @@ -85,16 +82,18 @@ impl DBPruner for LedgerPruner { self.target_version.load(Ordering::SeqCst) } - fn record_progress(&self, min_readable_version: Version) { - self.progress.store(min_readable_version, Ordering::SeqCst); + fn record_progress(&self, progress: Version) { + self.progress.store(progress, Ordering::SeqCst); PRUNER_VERSIONS .with_label_values(&["ledger_pruner", "progress"]) - .set(min_readable_version as i64); + .set(progress as i64); } } impl LedgerPruner { pub fn new(ledger_db: Arc) -> Result { + info!(name = LEDGER_PRUNER_NAME, "Initializing..."); + let ledger_metadata_pruner = Box::new( LedgerMetadataPruner::new(ledger_db.metadata_db_arc()) .expect("Failed to initialize ledger_metadata_pruner."), @@ -143,7 +142,11 @@ impl LedgerPruner { ], }; - pruner.initialize(); + info!( + name = pruner.name(), + progress = metadata_progress, + "Initialized." + ); Ok(pruner) } diff --git a/storage/aptosdb/src/pruner/pruner_utils.rs b/storage/aptosdb/src/pruner/pruner_utils.rs index 72433a55b3ad63..2ed8b49ba38688 100644 --- a/storage/aptosdb/src/pruner/pruner_utils.rs +++ b/storage/aptosdb/src/pruner/pruner_utils.rs @@ -31,7 +31,10 @@ pub fn create_state_merkle_pruner( where StaleNodeIndex: KeyCodec, { - Arc::new(StateMerklePruner::::new(Arc::clone(&state_merkle_db))) + Arc::new( + StateMerklePruner::::new(Arc::clone(&state_merkle_db)) + .expect("Failed to create state merkle pruner."), + ) } /// A utility function to instantiate the ledger pruner @@ -41,7 +44,7 @@ pub(crate) fn create_ledger_pruner(ledger_db: Arc) -> Arc) -> Arc { - Arc::new(StateKvPruner::new(state_kv_db)) + Arc::new(StateKvPruner::new(state_kv_db).expect("Failed to create state kv pruner.")) } pub(crate) fn get_ledger_pruner_progress(ledger_db: &LedgerDb) -> Result { @@ -78,10 +81,14 @@ pub(crate) fn get_state_merkle_pruner_progress( where StaleNodeIndex: KeyCodec, { - Ok(get_progress(state_merkle_db.metadata_db(), &S::tag())?.unwrap_or(0)) + Ok(get_progress( + state_merkle_db.metadata_db(), + &S::progress_metadata_key(None), + )? + .unwrap_or(0)) } -pub(crate) fn get_or_initialize_ledger_subpruner_progress( +pub(crate) fn get_or_initialize_subpruner_progress( sub_db: &DB, progress_key: &DbMetadataKey, metadata_progress: Version, diff --git a/storage/aptosdb/src/pruner/state_kv_pruner.rs b/storage/aptosdb/src/pruner/state_kv_pruner.rs index 70a6f317effa68..e22f58e16cae57 100644 --- a/storage/aptosdb/src/pruner/state_kv_pruner.rs +++ b/storage/aptosdb/src/pruner/state_kv_pruner.rs @@ -9,6 +9,7 @@ use crate::{ state_kv_db::StateKvDb, }; use anyhow::Result; +use aptos_logger::info; use aptos_schemadb::SchemaBatch; use aptos_types::transaction::{AtomicVersion, Version}; use std::sync::{atomic::Ordering, Arc}; @@ -43,47 +44,48 @@ impl DBPruner for StateKvPruner { Ok(current_target_version) } - fn initialize_min_readable_version(&self) -> anyhow::Result { - Ok(self - .state_kv_db - .metadata_db() - .get::(&DbMetadataKey::StateKvPrunerProgress)? - .map_or(0, |v| v.expect_version())) - } - fn progress(&self) -> Version { self.progress.load(Ordering::SeqCst) } fn set_target_version(&self, target_version: Version) { - self.target_version.store(target_version, Ordering::Relaxed); + self.target_version.store(target_version, Ordering::SeqCst); PRUNER_VERSIONS .with_label_values(&["state_kv_pruner", "target"]) .set(target_version as i64); } fn target_version(&self) -> Version { - self.target_version.load(Ordering::Relaxed) + self.target_version.load(Ordering::SeqCst) } - fn record_progress(&self, min_readable_version: Version) { - self.progress.store(min_readable_version, Ordering::Relaxed); + fn record_progress(&self, progress: Version) { + self.progress.store(progress, Ordering::SeqCst); PRUNER_VERSIONS .with_label_values(&["state_kv_pruner", "progress"]) - .set(min_readable_version as i64); + .set(progress as i64); } } impl StateKvPruner { - pub fn new(state_kv_db: Arc) -> Self { + pub fn new(state_kv_db: Arc) -> Result { + info!(name = STATE_KV_PRUNER_NAME, "Initializing..."); + + let progress = state_kv_db + .metadata_db() + .get::(&DbMetadataKey::StateKvPrunerProgress)? + .map_or(0, |v| v.expect_version()); + let pruner = StateKvPruner { state_kv_db: Arc::clone(&state_kv_db), - target_version: AtomicVersion::new(0), - progress: AtomicVersion::new(0), + target_version: AtomicVersion::new(progress), + progress: AtomicVersion::new(progress), state_value_pruner: Arc::new(StateValuePruner::new(state_kv_db)), }; - pruner.initialize(); - pruner + + info!(name = pruner.name(), progress = progress, "Initialized."); + + Ok(pruner) } fn prune_inner( diff --git a/storage/aptosdb/src/pruner/state_merkle_pruner_manager.rs b/storage/aptosdb/src/pruner/state_merkle_pruner_manager.rs index 8008e06a4ece09..ecb1feb3d27e87 100644 --- a/storage/aptosdb/src/pruner/state_merkle_pruner_manager.rs +++ b/storage/aptosdb/src/pruner/state_merkle_pruner_manager.rs @@ -35,12 +35,11 @@ where StaleNodeIndex: KeyCodec, { state_merkle_db: Arc, - /// DB version window, which dictates how many versions of state store - /// to keep. + /// DB version window, which dictates how many versions of state merkle data to keep. prune_window: Version, /// It is None iff the pruner is not enabled. pruner_worker: Option, - /// The minimal readable version for the ledger data. + /// The minimal readable version for the state merkle data. min_readable_version: AtomicVersion, _phantom: PhantomData, diff --git a/storage/aptosdb/src/pruner/state_store/generics.rs b/storage/aptosdb/src/pruner/state_store/generics.rs index 362cb7e334b3c4..243cb1d7d71432 100644 --- a/storage/aptosdb/src/pruner/state_store/generics.rs +++ b/storage/aptosdb/src/pruner/state_store/generics.rs @@ -12,13 +12,17 @@ pub trait StaleNodeIndexSchemaTrait: Schema where StaleNodeIndex: KeyCodec, { - fn tag() -> DbMetadataKey; + fn progress_metadata_key(shard_id: Option) -> DbMetadataKey; fn name() -> &'static str; } impl StaleNodeIndexSchemaTrait for StaleNodeIndexSchema { - fn tag() -> DbMetadataKey { - DbMetadataKey::StateMerklePrunerProgress + fn progress_metadata_key(shard_id: Option) -> DbMetadataKey { + if let Some(shard_id) = shard_id { + DbMetadataKey::StateMerkleShardPrunerProgress(shard_id as usize) + } else { + DbMetadataKey::StateMerklePrunerProgress + } } fn name() -> &'static str { @@ -27,8 +31,12 @@ impl StaleNodeIndexSchemaTrait for StaleNodeIndexSchema { } impl StaleNodeIndexSchemaTrait for StaleNodeIndexCrossEpochSchema { - fn tag() -> DbMetadataKey { - DbMetadataKey::EpochEndingStateMerklePrunerProgress + fn progress_metadata_key(shard_id: Option) -> DbMetadataKey { + if let Some(shard_id) = shard_id { + DbMetadataKey::EpochEndingStateMerkleShardPrunerProgress(shard_id as usize) + } else { + DbMetadataKey::EpochEndingStateMerklePrunerProgress + } } fn name() -> &'static str { diff --git a/storage/aptosdb/src/pruner/state_store/mod.rs b/storage/aptosdb/src/pruner/state_store/mod.rs index 56c1c94526e6d6..e80155de943d4a 100644 --- a/storage/aptosdb/src/pruner/state_store/mod.rs +++ b/storage/aptosdb/src/pruner/state_store/mod.rs @@ -2,41 +2,57 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - db_metadata::DbMetadataSchema, - jellyfish_merkle_node::JellyfishMerkleNodeSchema, metrics::PRUNER_VERSIONS, - pruner::{db_pruner::DBPruner, state_store::generics::StaleNodeIndexSchemaTrait}, - schema::db_metadata::DbMetadataValue, + pruner::{ + db_pruner::DBPruner, + state_store::{ + generics::StaleNodeIndexSchemaTrait, + state_merkle_metadata_pruner::StateMerkleMetadataPruner, + state_merkle_shard_pruner::StateMerkleShardPruner, + }, + }, state_merkle_db::StateMerkleDb, OTHER_TIMERS_SECONDS, }; use anyhow::Result; -use aptos_infallible::Mutex; use aptos_jellyfish_merkle::{node_type::NodeKey, StaleNodeIndex}; -use aptos_logger::error; -use aptos_schemadb::{schema::KeyCodec, ReadOptions, SchemaBatch}; +use aptos_logger::info; +use aptos_schemadb::{schema::KeyCodec, ReadOptions, DB}; use aptos_types::transaction::{AtomicVersion, Version}; -use std::sync::{atomic::Ordering, Arc}; +use once_cell::sync::Lazy; +use std::{ + marker::PhantomData, + sync::{atomic::Ordering, Arc}, +}; pub mod generics; +mod state_merkle_metadata_pruner; +mod state_merkle_shard_pruner; pub(crate) mod state_value_pruner; #[cfg(test)] mod test; -pub const STATE_MERKLE_PRUNER_NAME: &str = "state_merkle_pruner"; +static TREE_PRUNER_WORKER_POOL: Lazy = Lazy::new(|| { + rayon::ThreadPoolBuilder::new() + .num_threads(16) + .thread_name(|index| format!("tree_pruner_worker_{}", index)) + .build() + .unwrap() +}); /// Responsible for pruning the state tree. -#[derive(Debug)] pub struct StateMerklePruner { - /// State DB. - state_merkle_db: Arc, /// Keeps track of the target version that the pruner needs to achieve. target_version: AtomicVersion, - /// 1. min readable version - /// 2. if things before that version fully cleaned - progress: Mutex<(Version, bool)>, - _phantom: std::marker::PhantomData, + /// Overall progress, updated when the whole version is done. + progress: AtomicVersion, + + metadata_pruner: StateMerkleMetadataPruner, + // Non-empty iff sharding is enabled. + shard_pruners: Vec>, + + _phantom: PhantomData, } impl DBPruner for StateMerklePruner @@ -44,61 +60,68 @@ where StaleNodeIndex: KeyCodec, { fn name(&self) -> &'static str { - STATE_MERKLE_PRUNER_NAME + S::name() } - fn prune(&self, batch_size: usize) -> Result { - if !self.is_pruning_pending() { - return Ok(self.progress()); - } - let progress = self.progress(); + fn prune(&self, _batch_size: usize) -> Result { + // TODO(grao): Consider separate pruner metrics, and have a label for pruner name. + let _timer = OTHER_TIMERS_SECONDS + .with_label_values(&["state_merkle_pruner__prune"]) + .start_timer(); + let mut progress = self.progress(); let target_version = self.target_version(); - match self.prune_state_merkle(progress, target_version, batch_size, None) { - Ok(new_min_readable_version) => Ok(new_min_readable_version), - Err(e) => { - error!( - error = ?e, - "Error pruning stale states.", - ); - Err(e) - // On error, stop retrying vigorously by making next recv() blocking. - }, + if progress >= target_version { + return Ok(progress); + } + + info!( + name = S::name(), + current_progress = progress, + target_version = target_version, + "Start pruning..." + ); + + while progress < target_version { + if let Some(target_version_for_this_round) = self + .metadata_pruner + .maybe_prune_single_version(progress, target_version)? + { + self.prune_shards(progress, target_version_for_this_round)?; + progress = target_version_for_this_round; + info!(name = S::name(), progress = progress); + self.record_progress(target_version_for_this_round); + } else { + self.record_progress(target_version); + break; + } } - } - fn initialize_min_readable_version(&self) -> Result { - Ok(self - .state_merkle_db - .metadata_db() - .get::(&S::tag())? - .map_or(0, |v| v.expect_version())) + info!(name = S::name(), progress = target_version, "Done pruning."); + + Ok(target_version) } fn progress(&self) -> Version { - let (version, _) = *self.progress.lock(); - version + self.progress.load(Ordering::SeqCst) } fn set_target_version(&self, target_version: Version) { - self.target_version.store(target_version, Ordering::Relaxed); + self.target_version.store(target_version, Ordering::SeqCst); PRUNER_VERSIONS .with_label_values(&[S::name(), "target"]) .set(target_version as i64); } fn target_version(&self) -> Version { - self.target_version.load(Ordering::Relaxed) - } - - // used only by blanket `initialize()`, use the underlying implementation instead elsewhere. - fn record_progress(&self, min_readable_version: Version) { - self.record_progress_impl(min_readable_version, false /* is_fully_pruned */); + self.target_version.load(Ordering::SeqCst) } - fn is_pruning_pending(&self) -> bool { - let (min_readable_version, fully_pruned) = *self.progress.lock(); - self.target_version() > min_readable_version || !fully_pruned + fn record_progress(&self, progress: Version) { + self.progress.store(progress, Ordering::SeqCst); + PRUNER_VERSIONS + .with_label_values(&[S::name(), "progress"]) + .set(progress as i64); } } @@ -106,97 +129,79 @@ impl StateMerklePruner where StaleNodeIndex: KeyCodec, { - pub fn new(state_merkle_db: Arc) -> Self { + pub fn new(state_merkle_db: Arc) -> Result { + info!(name = S::name(), "Initializing..."); + + let metadata_pruner = StateMerkleMetadataPruner::new(state_merkle_db.metadata_db_arc()); + let metadata_progress = metadata_pruner.progress()?; + + let shard_pruners = if state_merkle_db.sharding_enabled() { + let num_shards = state_merkle_db.num_shards(); + let mut shard_pruners = Vec::with_capacity(num_shards as usize); + for shard_id in 0..num_shards { + shard_pruners.push(StateMerkleShardPruner::new( + shard_id, + state_merkle_db.db_shard_arc(shard_id), + metadata_progress, + )?); + } + shard_pruners + } else { + Vec::new() + }; + let pruner = StateMerklePruner { - state_merkle_db, - target_version: AtomicVersion::new(0), - progress: Mutex::new((0, true)), + target_version: AtomicVersion::new(metadata_progress), + progress: AtomicVersion::new(metadata_progress), + metadata_pruner, + shard_pruners, _phantom: std::marker::PhantomData, }; - pruner.initialize(); - pruner - } - // If the existing schema batch is not none, this function only adds items need to be - // deleted to the schema batch and the caller is responsible for committing the schema batches - // to the DB. - pub fn prune_state_merkle( - &self, - min_readable_version: Version, - target_version: Version, - batch_size: usize, - existing_schema_batch: Option<&mut SchemaBatch>, - ) -> anyhow::Result { - assert_ne!(batch_size, 0); - if target_version < min_readable_version { - return Ok(min_readable_version); - } - let (indices, is_end_of_target_version) = - self.get_stale_node_indices(min_readable_version, target_version, batch_size)?; - if indices.is_empty() { - self.record_progress_impl(target_version, is_end_of_target_version); - Ok(target_version) - } else { - let _timer = OTHER_TIMERS_SECONDS - .with_label_values(&["state_merkle_pruner_commit"]) - .start_timer(); - let new_min_readable_version = - indices.last().expect("Should exist.").stale_since_version; - - // Delete stale nodes. - if let Some(existing_schema_batch) = existing_schema_batch { - indices.into_iter().try_for_each(|index| { - existing_schema_batch.delete::(&index.node_key)?; - existing_schema_batch.delete::(&index) - })?; - } else { - let batch = SchemaBatch::new(); - indices.into_iter().try_for_each(|index| { - batch.delete::(&index.node_key)?; - batch.delete::(&index) - })?; + info!( + name = pruner.name(), + progress = metadata_progress, + "Initialized." + ); - self.save_progress(new_min_readable_version, &batch)?; + Ok(pruner) + } - // TODO(grao): Support sharding here. - self.state_merkle_db.metadata_db().write_schemas(batch)?; + fn prune_shards(&self, current_progress: Version, target_version: Version) -> Result<()> { + TREE_PRUNER_WORKER_POOL.scope(|s| { + for shard_pruner in &self.shard_pruners { + s.spawn(move |_| { + shard_pruner + .prune(current_progress, target_version) + .unwrap_or_else(|_| { + panic!( + "Failed to prune state merkle shard {}.", + shard_pruner.shard_id() + ) + }); + }); } + }); - // TODO(zcc): recording progress after writing schemas might provide wrong answers to - // API calls when they query min_readable_version while the write_schemas are still in - // progress. - self.record_progress_impl(new_min_readable_version, is_end_of_target_version); - Ok(new_min_readable_version) - } - } - - fn record_progress_impl(&self, min_readable_version: Version, is_fully_pruned: bool) { - *self.progress.lock() = (min_readable_version, is_fully_pruned); - PRUNER_VERSIONS - .with_label_values(&[S::name(), "progress"]) - .set(min_readable_version as i64); + Ok(()) } fn get_stale_node_indices( - &self, + state_merkle_db_shard: &DB, start_version: Version, target_version: Version, - batch_size: usize, - ) -> Result<(Vec, bool)> { + ) -> Result<(Vec, Option)> { let mut indices = Vec::new(); - // TODO(grao): Support sharding here. - let mut iter = self - .state_merkle_db - .metadata_db() - .iter::(ReadOptions::default())?; + let mut iter = state_merkle_db_shard.iter::(ReadOptions::default())?; iter.seek(&StaleNodeIndex { stale_since_version: start_version, node_key: NodeKey::new_empty_path(0), })?; - // over fetch by 1 - for _ in 0..=batch_size { + let mut next_version = None; + loop { if let Some((index, _)) = iter.next().transpose()? { + next_version = Some(index.stale_since_version); if index.stale_since_version <= target_version { indices.push(index); continue; @@ -205,16 +210,6 @@ where break; } - let is_end_of_target_version = if indices.len() > batch_size { - indices.pop(); - false - } else { - true - }; - Ok((indices, is_end_of_target_version)) - } - - fn save_progress(&self, version: Version, batch: &SchemaBatch) -> anyhow::Result<()> { - batch.put::(&S::tag(), &DbMetadataValue::Version(version)) + Ok((indices, next_version)) } } diff --git a/storage/aptosdb/src/pruner/state_store/state_merkle_metadata_pruner.rs b/storage/aptosdb/src/pruner/state_store/state_merkle_metadata_pruner.rs new file mode 100644 index 00000000000000..7c30e446256490 --- /dev/null +++ b/storage/aptosdb/src/pruner/state_store/state_merkle_metadata_pruner.rs @@ -0,0 +1,83 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + pruner::state_store::{generics::StaleNodeIndexSchemaTrait, StateMerklePruner}, + schema::{ + db_metadata::{DbMetadataSchema, DbMetadataValue}, + jellyfish_merkle_node::JellyfishMerkleNodeSchema, + }, + utils::get_progress, +}; +use anyhow::Result; +use aptos_jellyfish_merkle::StaleNodeIndex; +use aptos_schemadb::{schema::KeyCodec, SchemaBatch, DB}; +use aptos_types::transaction::{AtomicVersion, Version}; +use std::{ + cmp::max, + marker::PhantomData, + sync::{atomic::Ordering, Arc}, +}; + +pub(in crate::pruner) struct StateMerkleMetadataPruner { + metadata_db: Arc, + next_version: AtomicVersion, + _phantom: PhantomData, +} + +impl StateMerkleMetadataPruner +where + StaleNodeIndex: KeyCodec, +{ + pub(in crate::pruner) fn new(metadata_db: Arc) -> Self { + Self { + metadata_db, + next_version: AtomicVersion::new(0), + _phantom: PhantomData, + } + } + + pub(in crate::pruner) fn maybe_prune_single_version( + &self, + current_progress: Version, + target_version: Version, + ) -> Result> { + let next_version = self.next_version.load(Ordering::SeqCst); + // This max here is only to handle the case when next version is not initialized. + let target_version_for_this_round = max(next_version, current_progress); + if target_version_for_this_round > target_version { + return Ok(None); + } + + // When next_version is not initialized, this call is used to initialize it. + let (indices, next_version) = StateMerklePruner::get_stale_node_indices( + &self.metadata_db, + current_progress, + target_version_for_this_round, + )?; + + let batch = SchemaBatch::new(); + indices.into_iter().try_for_each(|index| { + batch.delete::(&index.node_key)?; + batch.delete::(&index) + })?; + + batch.put::( + &S::progress_metadata_key(None), + &DbMetadataValue::Version(target_version_for_this_round), + )?; + + self.metadata_db.write_schemas(batch)?; + + self.next_version + // If next_version is None, meaning we've already reached the end of stale index. + // Updating it to the target_version to make sure it's still making progress. + .store(next_version.unwrap_or(target_version), Ordering::SeqCst); + + Ok(Some(target_version_for_this_round)) + } + + pub(in crate::pruner) fn progress(&self) -> Result { + Ok(get_progress(&self.metadata_db, &S::progress_metadata_key(None))?.unwrap_or(0)) + } +} diff --git a/storage/aptosdb/src/pruner/state_store/state_merkle_shard_pruner.rs b/storage/aptosdb/src/pruner/state_store/state_merkle_shard_pruner.rs new file mode 100644 index 00000000000000..e344063f0d78a0 --- /dev/null +++ b/storage/aptosdb/src/pruner/state_store/state_merkle_shard_pruner.rs @@ -0,0 +1,96 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + pruner::{ + pruner_utils::get_or_initialize_subpruner_progress, + state_store::{generics::StaleNodeIndexSchemaTrait, StateMerklePruner}, + }, + schema::{ + db_metadata::{DbMetadataSchema, DbMetadataValue}, + jellyfish_merkle_node::JellyfishMerkleNodeSchema, + }, +}; +use anyhow::Result; +use aptos_jellyfish_merkle::StaleNodeIndex; +use aptos_schemadb::{schema::KeyCodec, SchemaBatch, DB}; +use aptos_types::transaction::Version; +use std::{marker::PhantomData, sync::Arc}; + +pub(in crate::pruner) struct StateMerkleShardPruner { + shard_id: u8, + db_shard: Arc, + _phantom: PhantomData, +} + +impl StateMerkleShardPruner +where + StaleNodeIndex: KeyCodec, +{ + pub(in crate::pruner) fn new( + shard_id: u8, + db_shard: Arc, + metadata_progress: Version, + ) -> Result { + let progress = get_or_initialize_subpruner_progress( + &db_shard, + &S::progress_metadata_key(Some(shard_id)), + metadata_progress, + )?; + let myself = Self { + shard_id, + db_shard, + _phantom: PhantomData, + }; + + myself.prune(progress, metadata_progress)?; + + Ok(myself) + } + + pub(in crate::pruner) fn prune( + &self, + current_progress: Version, + target_version: Version, + ) -> Result<()> { + loop { + let batch = SchemaBatch::new(); + let (indices, next_version) = StateMerklePruner::get_stale_node_indices( + &self.db_shard, + current_progress, + target_version, + )?; + + indices.into_iter().try_for_each(|index| { + batch.delete::(&index.node_key)?; + batch.delete::(&index) + })?; + + let mut done = true; + if let Some(next_version) = next_version { + if next_version <= target_version { + done = false; + } + } + + if done { + batch.put::( + &S::progress_metadata_key(Some(self.shard_id)), + &DbMetadataValue::Version(target_version), + )?; + } + + self.db_shard.write_schemas(batch)?; + + if done { + break; + } + } + + Ok(()) + } + + pub(in crate::pruner) fn shard_id(&self) -> u8 { + self.shard_id + } +} diff --git a/storage/aptosdb/src/pruner/transaction_store/transaction_accumulator_pruner.rs b/storage/aptosdb/src/pruner/transaction_store/transaction_accumulator_pruner.rs index 47769b16aaf0e0..9f7f9cb7f110f9 100644 --- a/storage/aptosdb/src/pruner/transaction_store/transaction_accumulator_pruner.rs +++ b/storage/aptosdb/src/pruner/transaction_store/transaction_accumulator_pruner.rs @@ -2,9 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - pruner::{ - db_sub_pruner::DBSubPruner, pruner_utils::get_or_initialize_ledger_subpruner_progress, - }, + pruner::{db_sub_pruner::DBSubPruner, pruner_utils::get_or_initialize_subpruner_progress}, schema::db_metadata::{DbMetadataKey, DbMetadataSchema, DbMetadataValue}, TransactionStore, }; @@ -41,7 +39,7 @@ impl TransactionAccumulatorPruner { transaction_accumulator_db: Arc, metadata_progress: Version, ) -> Result { - let progress = get_or_initialize_ledger_subpruner_progress( + let progress = get_or_initialize_subpruner_progress( &transaction_accumulator_db, &DbMetadataKey::TransactionAccumulatorPrunerProgress, metadata_progress, diff --git a/storage/aptosdb/src/pruner/transaction_store/transaction_info_pruner.rs b/storage/aptosdb/src/pruner/transaction_store/transaction_info_pruner.rs index 4887f23e652099..6720e54c120fc7 100644 --- a/storage/aptosdb/src/pruner/transaction_store/transaction_info_pruner.rs +++ b/storage/aptosdb/src/pruner/transaction_store/transaction_info_pruner.rs @@ -2,9 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - pruner::{ - db_sub_pruner::DBSubPruner, pruner_utils::get_or_initialize_ledger_subpruner_progress, - }, + pruner::{db_sub_pruner::DBSubPruner, pruner_utils::get_or_initialize_subpruner_progress}, schema::db_metadata::{DbMetadataKey, DbMetadataSchema, DbMetadataValue}, TransactionStore, }; @@ -41,7 +39,7 @@ impl TransactionInfoPruner { transaction_info_db: Arc, metadata_progress: Version, ) -> Result { - let progress = get_or_initialize_ledger_subpruner_progress( + let progress = get_or_initialize_subpruner_progress( &transaction_info_db, &DbMetadataKey::TransactionInfoPrunerProgress, metadata_progress, diff --git a/storage/aptosdb/src/pruner/transaction_store/transaction_pruner.rs b/storage/aptosdb/src/pruner/transaction_store/transaction_pruner.rs index e5e8e21ebedf42..5e39939fddb743 100644 --- a/storage/aptosdb/src/pruner/transaction_store/transaction_pruner.rs +++ b/storage/aptosdb/src/pruner/transaction_store/transaction_pruner.rs @@ -2,9 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - pruner::{ - db_sub_pruner::DBSubPruner, pruner_utils::get_or_initialize_ledger_subpruner_progress, - }, + pruner::{db_sub_pruner::DBSubPruner, pruner_utils::get_or_initialize_subpruner_progress}, schema::db_metadata::{DbMetadataKey, DbMetadataSchema, DbMetadataValue}, TransactionStore, }; @@ -47,7 +45,7 @@ impl TransactionPruner { transaction_db: Arc, metadata_progress: Version, ) -> Result { - let progress = get_or_initialize_ledger_subpruner_progress( + let progress = get_or_initialize_subpruner_progress( &transaction_db, &DbMetadataKey::TransactionPrunerProgress, metadata_progress, diff --git a/storage/aptosdb/src/pruner/transaction_store/write_set_pruner.rs b/storage/aptosdb/src/pruner/transaction_store/write_set_pruner.rs index dab713b98aa0f7..1d0ac1f627e99e 100644 --- a/storage/aptosdb/src/pruner/transaction_store/write_set_pruner.rs +++ b/storage/aptosdb/src/pruner/transaction_store/write_set_pruner.rs @@ -2,9 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - pruner::{ - db_sub_pruner::DBSubPruner, pruner_utils::get_or_initialize_ledger_subpruner_progress, - }, + pruner::{db_sub_pruner::DBSubPruner, pruner_utils::get_or_initialize_subpruner_progress}, schema::db_metadata::{DbMetadataKey, DbMetadataSchema, DbMetadataValue}, TransactionStore, }; @@ -38,7 +36,7 @@ impl WriteSetPruner { write_set_db: Arc, metadata_progress: Version, ) -> Result { - let progress = get_or_initialize_ledger_subpruner_progress( + let progress = get_or_initialize_subpruner_progress( &write_set_db, &DbMetadataKey::WriteSetPrunerProgress, metadata_progress, diff --git a/storage/aptosdb/src/schema/db_metadata/mod.rs b/storage/aptosdb/src/schema/db_metadata/mod.rs index 61958912d39bd1..9458a6928f1d71 100644 --- a/storage/aptosdb/src/schema/db_metadata/mod.rs +++ b/storage/aptosdb/src/schema/db_metadata/mod.rs @@ -62,6 +62,8 @@ pub enum DbMetadataKey { TransactionInfoPrunerProgress, TransactionPrunerProgress, WriteSetPrunerProgress, + StateMerkleShardPrunerProgress(ShardId), + EpochEndingStateMerkleShardPrunerProgress(ShardId), } define_schema!( diff --git a/storage/aptosdb/src/state_merkle_db.rs b/storage/aptosdb/src/state_merkle_db.rs index e44c93073e36a7..6766b44d821d0a 100644 --- a/storage/aptosdb/src/state_merkle_db.rs +++ b/storage/aptosdb/src/state_merkle_db.rs @@ -62,6 +62,7 @@ pub struct StateMerkleDb { state_merkle_metadata_db: Arc, // Stores sharded part of tree nodes. state_merkle_db_shards: [Arc; NUM_STATE_SHARDS], + enable_sharding: bool, enable_cache: bool, version_cache: VersionedNodeCache, lru_cache: LruNodeCache, @@ -94,6 +95,7 @@ impl StateMerkleDb { return Ok(Self { state_merkle_metadata_db: Arc::clone(&db), state_merkle_db_shards: arr![Arc::clone(&db); 16], + enable_sharding: false, enable_cache, version_cache, lru_cache, @@ -178,10 +180,18 @@ impl StateMerkleDb { &self.state_merkle_metadata_db } + pub(crate) fn metadata_db_arc(&self) -> Arc { + Arc::clone(&self.state_merkle_metadata_db) + } + pub(crate) fn db_shard(&self, shard_id: u8) -> &DB { &self.state_merkle_db_shards[shard_id as usize] } + pub(crate) fn db_shard_arc(&self, shard_id: u8) -> Arc { + Arc::clone(&self.state_merkle_db_shards[shard_id as usize]) + } + pub(crate) fn commit_top_levels(&self, version: Version, batch: SchemaBatch) -> Result<()> { batch.put::( &DbMetadataKey::StateMerkleCommitProgress, @@ -350,6 +360,10 @@ impl StateMerkleDb { Ok((top_levels_batch, sharded_batch, new_root_hash)) } + pub(crate) fn sharding_enabled(&self) -> bool { + self.enable_sharding + } + pub(crate) fn cache_enabled(&self) -> bool { self.enable_cache } @@ -369,6 +383,10 @@ impl StateMerkleDb { ) } + pub(crate) fn num_shards(&self) -> u8 { + NUM_STATE_SHARDS as u8 + } + fn db_by_key(&self, node_key: &NodeKey) -> &DB { if let Some(shard_id) = node_key.get_shard_id() { self.db_shard(shard_id) @@ -410,6 +428,7 @@ impl StateMerkleDb { let state_merkle_db = Self { state_merkle_metadata_db, state_merkle_db_shards, + enable_sharding: true, enable_cache, version_cache, lru_cache, diff --git a/storage/aptosdb/src/utils/mod.rs b/storage/aptosdb/src/utils/mod.rs index 1ebc49f52b5791..6bfaa3dec8d591 100644 --- a/storage/aptosdb/src/utils/mod.rs +++ b/storage/aptosdb/src/utils/mod.rs @@ -4,19 +4,13 @@ pub mod iterators; pub(crate) mod truncation_helper; -use crate::schema::db_metadata::{DbMetadataKey, DbMetadataSchema, DbMetadataValue}; +use crate::schema::db_metadata::{DbMetadataKey, DbMetadataSchema}; use anyhow::Result; use aptos_schemadb::DB; use aptos_types::transaction::Version; pub(crate) fn get_progress(db: &DB, progress_key: &DbMetadataKey) -> Result> { - Ok( - if let Some(DbMetadataValue::Version(progress)) = - db.get::(progress_key)? - { - Some(progress) - } else { - None - }, - ) + Ok(db + .get::(progress_key)? + .map(|v| v.expect_version())) }