diff --git a/config/src/config/storage_config.rs b/config/src/config/storage_config.rs index 833afff167f8c0..d6251fc2866d3e 100644 --- a/config/src/config/storage_config.rs +++ b/config/src/config/storage_config.rs @@ -117,6 +117,11 @@ pub const NO_OP_STORAGE_PRUNER_CONFIG: PrunerConfig = PrunerConfig { prune_window: 0, batch_size: 0, }, + state_kv_pruner_config: StateKvPrunerConfig { + enable: false, + prune_window: 0, + batch_size: 0, + }, }; #[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)] @@ -140,13 +145,12 @@ pub struct LedgerPrunerConfig { #[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)] #[serde(default, deny_unknown_fields)] pub struct StateMerklePrunerConfig { - /// Boolean to enable/disable the state store pruner. The state pruner is responsible for - /// pruning state tree nodes. + /// Boolean to enable/disable the state merkle pruner. The state merkle pruner is responsible + /// for pruning state tree nodes. pub enable: bool, - /// The size of the window should be calculated based on disk space availability and system TPS. + /// Window size in versions. pub prune_window: u64, - /// Similar to the variable above but for state store pruner. It means the number of stale - /// nodes to prune a time. + /// Number of stale nodes to prune a time. pub batch_size: usize, } @@ -161,6 +165,19 @@ pub struct EpochSnapshotPrunerConfig { pub batch_size: usize, } +#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)] +#[serde(default, deny_unknown_fields)] +pub struct StateKvPrunerConfig { + /// Boolean to enable/disable the state kv pruner. The state pruner is responsible for + /// pruning state tree nodes. + pub enable: bool, + /// Window size in versions. + pub prune_window: u64, + /// Similar to the variable above but for state kv pruner. It means the number of versions to + /// prune a time. + pub batch_size: usize, +} + // Config for the epoch ending state pruner is actually in the same format as the state merkle // pruner, but it has it's own type hence separate default values. This converts it to the same // type, to use the same pruner implementation (but parameterized on the stale node index DB schema). @@ -180,6 +197,7 @@ pub struct PrunerConfig { pub ledger_pruner_config: LedgerPrunerConfig, pub state_merkle_pruner_config: StateMerklePrunerConfig, pub epoch_snapshot_pruner_config: EpochSnapshotPrunerConfig, + pub state_kv_pruner_config: StateKvPrunerConfig, } impl Default for LedgerPrunerConfig { @@ -230,6 +248,17 @@ impl Default for EpochSnapshotPrunerConfig { } } +impl Default for StateKvPrunerConfig { + fn default() -> Self { + Self { + // TODO(grao): Keep it the same as ledger pruner config for now, will revisit later. + enable: true, + prune_window: 150_000_000, + batch_size: 500, + } + } +} + impl Default for StorageConfig { fn default() -> StorageConfig { StorageConfig { diff --git a/execution/executor-benchmark/src/main.rs b/execution/executor-benchmark/src/main.rs index 544c316a2a0172..b2aebde23f0599 100644 --- a/execution/executor-benchmark/src/main.rs +++ b/execution/executor-benchmark/src/main.rs @@ -3,7 +3,8 @@ // SPDX-License-Identifier: Apache-2.0 use aptos_config::config::{ - EpochSnapshotPrunerConfig, LedgerPrunerConfig, PrunerConfig, StateMerklePrunerConfig, + EpochSnapshotPrunerConfig, LedgerPrunerConfig, PrunerConfig, StateKvPrunerConfig, + StateMerklePrunerConfig, }; use aptos_executor::block_executor::TransactionBlockExecutor; use aptos_executor_benchmark::{ @@ -29,6 +30,9 @@ struct PrunerOpt { #[structopt(long)] enable_ledger_pruner: bool, + #[structopt(long)] + enable_state_kv_pruner: bool, + #[structopt(long, default_value = "100000")] state_prune_window: u64, @@ -38,6 +42,9 @@ struct PrunerOpt { #[structopt(long, default_value = "100000")] ledger_prune_window: u64, + #[structopt(long, default_value = "100000")] + state_kv_prune_window: u64, + #[structopt(long, default_value = "500")] ledger_pruning_batch_size: usize, @@ -46,6 +53,9 @@ struct PrunerOpt { #[structopt(long, default_value = "500")] epoch_snapshot_pruning_batch_size: usize, + + #[structopt(long, default_value = "500")] + state_kv_pruning_batch_size: usize, } impl PrunerOpt { @@ -67,6 +77,11 @@ impl PrunerOpt { batch_size: self.ledger_pruning_batch_size, user_pruning_window_offset: 0, }, + state_kv_pruner_config: StateKvPrunerConfig { + enable: self.enable_state_kv_pruner, + prune_window: self.state_kv_prune_window, + batch_size: self.state_kv_pruning_batch_size, + }, } } } diff --git a/storage/aptosdb/src/aptosdb_test.rs b/storage/aptosdb/src/aptosdb_test.rs index e8d9515e67381b..c632cc8c6b3122 100644 --- a/storage/aptosdb/src/aptosdb_test.rs +++ b/storage/aptosdb/src/aptosdb_test.rs @@ -13,7 +13,7 @@ use crate::{ }; use aptos_config::config::{ EpochSnapshotPrunerConfig, LedgerPrunerConfig, PrunerConfig, RocksdbConfigs, - StateMerklePrunerConfig, BUFFERED_STATE_TARGET_ITEMS, + StateKvPrunerConfig, StateMerklePrunerConfig, BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, }; use aptos_crypto::{hash::CryptoHash, HashValue}; @@ -103,16 +103,13 @@ fn test_pruner_config() { assert_eq!(state_merkle_pruner.is_pruner_enabled(), enable); assert_eq!(state_merkle_pruner.get_prune_window(), 20); - let ledger_pruner = LedgerPrunerManager::new( - Arc::clone(&aptos_db.ledger_db), - Arc::clone(&aptos_db.state_store), - LedgerPrunerConfig { + let ledger_pruner = + LedgerPrunerManager::new(Arc::clone(&aptos_db.ledger_db), LedgerPrunerConfig { enable, prune_window: 100, batch_size: 1, user_pruning_window_offset: 0, - }, - ); + }); assert_eq!(ledger_pruner.is_pruner_enabled(), enable); assert_eq!(ledger_pruner.get_prune_window(), 100); } @@ -203,6 +200,11 @@ pub fn test_state_merkle_pruning_impl( prune_window: 10, batch_size: 1, }, + state_kv_pruner_config: StateKvPrunerConfig { + enable: true, + prune_window: 10, + batch_size: 1, + }, }, RocksdbConfigs::default(), false, /* enable_indexer */ diff --git a/storage/aptosdb/src/lib.rs b/storage/aptosdb/src/lib.rs index fc121c903ef77d..514d371b7a4b93 100644 --- a/storage/aptosdb/src/lib.rs +++ b/storage/aptosdb/src/lib.rs @@ -59,8 +59,9 @@ use crate::{ pruner::{ db_pruner::DBPruner, ledger_pruner_manager::LedgerPrunerManager, ledger_store::ledger_store_pruner::LedgerPruner, pruner_manager::PrunerManager, - pruner_utils, state_merkle_pruner_manager::StateMerklePrunerManager, - state_store::StateMerklePruner, + pruner_utils, state_kv_pruner::StateKvPruner, + state_kv_pruner_manager::StateKvPrunerManager, + state_merkle_pruner_manager::StateMerklePrunerManager, state_store::StateMerklePruner, }, schema::*, stale_node_index::StaleNodeIndexSchema, @@ -289,20 +290,24 @@ impl AptosDB { Arc::clone(&arc_state_merkle_rocksdb), pruner_config.epoch_snapshot_pruner_config.into(), ); + let state_kv_pruner = StateKvPrunerManager::new( + Arc::clone(&arc_state_kv_rocksdb), + pruner_config.state_kv_pruner_config, + ); let state_store = Arc::new(StateStore::new( Arc::clone(&arc_ledger_rocksdb), Arc::clone(&arc_state_merkle_rocksdb), Arc::clone(&arc_state_kv_rocksdb), state_merkle_pruner, epoch_snapshot_pruner, + state_kv_pruner, buffered_state_target_items, max_nodes_per_lru_cache_shard, hack_for_tests, )); - // TODO(grao): Handle state kv db pruning. + let ledger_pruner = LedgerPrunerManager::new( Arc::clone(&arc_ledger_rocksdb), - Arc::clone(&state_store), pruner_config.ledger_pruner_config, ); @@ -1003,6 +1008,18 @@ impl AptosDB { ) } } + + fn error_if_state_kv_pruned(&self, data_type: &str, version: Version) -> Result<()> { + let min_readable_version = self.state_store.state_kv_pruner.get_min_readable_version(); + ensure!( + version >= min_readable_version, + "{} at version {} is pruned, min available version is {}.", + data_type, + version, + min_readable_version + ); + Ok(()) + } } impl DbReader for AptosDB { @@ -1025,7 +1042,7 @@ impl DbReader for AptosDB { version: Version, ) -> Result> + '_>> { gauged_api("get_prefixed_state_value_iterator", || { - self.error_if_ledger_pruned("State", version)?; + self.error_if_state_kv_pruned("StateValue", version)?; Ok(Box::new( self.state_store @@ -1427,7 +1444,7 @@ impl DbReader for AptosDB { version: Version, ) -> Result> { gauged_api("get_state_value_by_version", || { - self.error_if_ledger_pruned("State", version)?; + self.error_if_state_kv_pruned("StateValue", version)?; self.state_store .get_state_value_by_version(state_store_key, version) @@ -1875,10 +1892,14 @@ impl DbWriter for AptosDB { let last_version = first_version + num_txns - 1; COMMITTED_TXNS.inc_by(num_txns); LATEST_TXN_VERSION.set(last_version as i64); - // Activate the ledger pruner. Note the state merkle pruner is activated when - // state snapshots are persisted in their async thread. + // Activate the ledger pruner and state kv pruner. + // Note the state merkle pruner is activated when state snapshots are persisted + // in their async thread. self.ledger_pruner .maybe_set_pruner_target_db_version(last_version); + self.state_store + .state_kv_pruner + .maybe_set_pruner_target_db_version(last_version); } // Note: this must happen after txns have been saved to db because types can be newly @@ -1993,11 +2014,7 @@ impl DbWriter for AptosDB { )?; // Delete the genesis transaction - LedgerPruner::prune_genesis( - self.ledger_db.clone(), - self.state_store.clone(), - &mut batch, - )?; + LedgerPruner::prune_genesis(self.ledger_db.clone(), &mut batch)?; self.ledger_pruner .pruner() @@ -2018,11 +2035,23 @@ impl DbWriter for AptosDB { .pruner() .save_min_readable_version(version, &state_merkle_batch)?; + let mut state_kv_batch = SchemaBatch::new(); + StateKvPruner::prune_genesis( + self.state_store.state_kv_db.clone(), + &mut state_kv_batch, + )?; + self.state_store + .state_kv_pruner + .pruner() + .save_min_readable_version(version, &state_kv_batch)?; + // Apply the change set writes to the database (atomically) and update in-memory state self.ledger_db.clone().write_schemas(batch)?; self.state_merkle_db .clone() .write_schemas(state_merkle_batch)?; + self.state_kv_db.clone().write_schemas(state_kv_batch)?; + restore_utils::update_latest_ledger_info(self.ledger_store.clone(), ledger_infos)?; self.state_store.reset(); diff --git a/storage/aptosdb/src/pruner/event_store/test.rs b/storage/aptosdb/src/pruner/event_store/test.rs index a7d1149a7eeb41..1fccbe9675723d 100644 --- a/storage/aptosdb/src/pruner/event_store/test.rs +++ b/storage/aptosdb/src/pruner/event_store/test.rs @@ -67,16 +67,12 @@ fn verify_event_store_pruner(events: Vec>) { } aptos_db.ledger_db.write_schemas(batch).unwrap(); - let pruner = LedgerPrunerManager::new( - Arc::clone(&aptos_db.ledger_db), - Arc::clone(&aptos_db.state_store), - LedgerPrunerConfig { - enable: true, - prune_window: 0, - batch_size: 1, - user_pruning_window_offset: 0, - }, - ); + let pruner = LedgerPrunerManager::new(Arc::clone(&aptos_db.ledger_db), LedgerPrunerConfig { + enable: true, + prune_window: 0, + batch_size: 1, + user_pruning_window_offset: 0, + }); // start pruning events batches of size 2 and verify transactions have been pruned from DB for i in (0..=num_versions).step_by(2) { pruner diff --git a/storage/aptosdb/src/pruner/ledger_pruner_manager.rs b/storage/aptosdb/src/pruner/ledger_pruner_manager.rs index 2ee544b4503dd4..f2918956fe208f 100644 --- a/storage/aptosdb/src/pruner/ledger_pruner_manager.rs +++ b/storage/aptosdb/src/pruner/ledger_pruner_manager.rs @@ -7,7 +7,7 @@ use crate::{ db_pruner::DBPruner, ledger_pruner_worker::LedgerPrunerWorker, ledger_store::ledger_store_pruner::LedgerPruner, pruner_manager::PrunerManager, }, - pruner_utils, StateStore, + pruner_utils, }; use aptos_config::config::LedgerPrunerConfig; use aptos_infallible::Mutex; @@ -99,12 +99,8 @@ impl PrunerManager for LedgerPrunerManager { impl LedgerPrunerManager { /// Creates a worker thread that waits on a channel for pruning commands. - pub fn new( - ledger_rocksdb: Arc, - state_store: Arc, - ledger_pruner_config: LedgerPrunerConfig, - ) -> Self { - let ledger_pruner = pruner_utils::create_ledger_pruner(ledger_rocksdb, state_store); + pub fn new(ledger_rocksdb: Arc, ledger_pruner_config: LedgerPrunerConfig) -> Self { + let ledger_pruner = pruner_utils::create_ledger_pruner(ledger_rocksdb); if ledger_pruner_config.enable { PRUNER_WINDOW diff --git a/storage/aptosdb/src/pruner/ledger_store/ledger_store_pruner.rs b/storage/aptosdb/src/pruner/ledger_store/ledger_store_pruner.rs index cca5450febfea8..333e4850aa0ca3 100644 --- a/storage/aptosdb/src/pruner/ledger_store/ledger_store_pruner.rs +++ b/storage/aptosdb/src/pruner/ledger_store/ledger_store_pruner.rs @@ -8,7 +8,7 @@ use crate::{ db_pruner::DBPruner, db_sub_pruner::DBSubPruner, event_store::event_store_pruner::EventStorePruner, - state_store::state_value_pruner::StateValuePruner, + ledger_store::version_data_pruner::VersionDataPruner, transaction_store::{ transaction_store_pruner::TransactionStorePruner, write_set_pruner::WriteSetPruner, }, @@ -18,7 +18,7 @@ use crate::{ db_metadata::{DbMetadataKey, DbMetadataValue}, transaction::TransactionSchema, }, - EventStore, StateStore, TransactionStore, + EventStore, TransactionStore, }; use aptos_logger::warn; use aptos_schemadb::{ReadOptions, SchemaBatch, DB}; @@ -34,7 +34,7 @@ pub(crate) struct LedgerPruner { target_version: AtomicVersion, min_readable_version: AtomicVersion, transaction_store_pruner: Arc, - state_value_pruner: Arc, + version_data_pruner: Arc, event_store_pruner: Arc, write_set_pruner: Arc, } @@ -137,7 +137,6 @@ impl LedgerPruner { db: Arc, transaction_store: Arc, event_store: Arc, - state_store: Arc, ) -> Self { let pruner = LedgerPruner { db, @@ -146,24 +145,20 @@ impl LedgerPruner { transaction_store_pruner: Arc::new(TransactionStorePruner::new( transaction_store.clone(), )), - state_value_pruner: Arc::new(StateValuePruner::new(state_store)), event_store_pruner: Arc::new(EventStorePruner::new(event_store)), write_set_pruner: Arc::new(WriteSetPruner::new(transaction_store)), + version_data_pruner: Arc::new(VersionDataPruner::new()), }; pruner.initialize(); pruner } /// Prunes the genesis transaction and saves the db alterations to the given change set - pub fn prune_genesis( - ledger_db: Arc, - state_store: Arc, - db_batch: &mut SchemaBatch, - ) -> anyhow::Result<()> { + pub fn prune_genesis(ledger_db: Arc, db_batch: &mut SchemaBatch) -> anyhow::Result<()> { let target_version = 1; // The genesis version is 0. Delete [0,1) (exclusive) let max_version = 1; // We should only be pruning a single version - let ledger_pruner = pruner_utils::create_ledger_pruner(ledger_db, state_store); + let ledger_pruner = pruner_utils::create_ledger_pruner(ledger_db); ledger_pruner.set_target_version(target_version); ledger_pruner.prune_inner(max_version, db_batch)?; @@ -192,7 +187,7 @@ impl LedgerPruner { )?; self.write_set_pruner .prune(db_batch, min_readable_version, current_target_version)?; - self.state_value_pruner + self.version_data_pruner .prune(db_batch, min_readable_version, current_target_version)?; self.event_store_pruner .prune(db_batch, min_readable_version, current_target_version)?; diff --git a/storage/aptosdb/src/pruner/ledger_store/mod.rs b/storage/aptosdb/src/pruner/ledger_store/mod.rs index 09394b5c4749ba..d0f57aa5727f56 100644 --- a/storage/aptosdb/src/pruner/ledger_store/mod.rs +++ b/storage/aptosdb/src/pruner/ledger_store/mod.rs @@ -2,3 +2,4 @@ // SPDX-License-Identifier: Apache-2.0 pub(crate) mod ledger_store_pruner; +pub(crate) mod version_data_pruner; diff --git a/storage/aptosdb/src/pruner/mod.rs b/storage/aptosdb/src/pruner/mod.rs index f05ea4004e4404..c09e264e6ab33c 100644 --- a/storage/aptosdb/src/pruner/mod.rs +++ b/storage/aptosdb/src/pruner/mod.rs @@ -9,6 +9,8 @@ pub(crate) mod ledger_pruner_worker; pub(crate) mod ledger_store; pub(crate) mod pruner_manager; pub mod pruner_utils; +pub(crate) mod state_kv_pruner; +pub(crate) mod state_kv_pruner_worker; pub(crate) mod state_merkle_pruner_worker; pub(crate) mod state_store; pub(crate) mod transaction_store; @@ -16,4 +18,5 @@ pub(crate) mod transaction_store; // This module provides `Pruner` which manages a thread pruning old data in the background and is // meant to be triggered by other threads as they commit new data to the DB. pub(crate) mod ledger_pruner_manager; +pub(crate) mod state_kv_pruner_manager; pub(crate) mod state_merkle_pruner_manager; diff --git a/storage/aptosdb/src/pruner/pruner_utils.rs b/storage/aptosdb/src/pruner/pruner_utils.rs index 4833cb39690f19..3352457d89e0a1 100644 --- a/storage/aptosdb/src/pruner/pruner_utils.rs +++ b/storage/aptosdb/src/pruner/pruner_utils.rs @@ -6,9 +6,10 @@ use crate::{ pruner::{ ledger_store::ledger_store_pruner::LedgerPruner, + state_kv_pruner::StateKvPruner, state_store::{generics::StaleNodeIndexSchemaTrait, StateMerklePruner}, }, - EventStore, StateStore, TransactionStore, + EventStore, TransactionStore, }; use aptos_jellyfish_merkle::StaleNodeIndex; use aptos_schemadb::{schema::KeyCodec, DB}; @@ -25,14 +26,15 @@ where } /// A utility function to instantiate the ledger pruner -pub(crate) fn create_ledger_pruner( - ledger_db: Arc, - state_store: Arc, -) -> Arc { +pub(crate) fn create_ledger_pruner(ledger_db: Arc) -> Arc { Arc::new(LedgerPruner::new( Arc::clone(&ledger_db), Arc::new(TransactionStore::new(Arc::clone(&ledger_db))), Arc::new(EventStore::new(Arc::clone(&ledger_db))), - state_store, )) } + +/// A utility function to instantiate the state kv pruner. +pub(crate) fn create_state_kv_pruner(state_kv_db: Arc) -> Arc { + Arc::new(StateKvPruner::new(state_kv_db)) +} diff --git a/storage/aptosdb/src/pruner/state_store/state_value_pruner.rs b/storage/aptosdb/src/pruner/state_store/state_value_pruner.rs index 98d7effefc033f..ef84ed96448bc4 100644 --- a/storage/aptosdb/src/pruner/state_store/state_value_pruner.rs +++ b/storage/aptosdb/src/pruner/state_store/state_value_pruner.rs @@ -1,11 +1,15 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::{pruner::db_sub_pruner::DBSubPruner, StateStore}; -use aptos_schemadb::SchemaBatch; + +use crate::{ + pruner::db_sub_pruner::DBSubPruner, + schema::{stale_state_value_index::StaleStateValueIndexSchema, state_value::StateValueSchema}, +}; +use aptos_schemadb::{ReadOptions, SchemaBatch, DB}; use std::sync::Arc; pub struct StateValuePruner { - state_store: Arc, + state_kv_db: Arc, } impl DBSubPruner for StateValuePruner { @@ -15,14 +19,24 @@ impl DBSubPruner for StateValuePruner { min_readable_version: u64, target_version: u64, ) -> anyhow::Result<()> { - self.state_store - .prune_state_values(min_readable_version, target_version, db_batch)?; + let mut iter = self + .state_kv_db + .iter::(ReadOptions::default())?; + iter.seek(&min_readable_version)?; + for item in iter { + let (index, _) = item?; + if index.stale_since_version > target_version { + break; + } + db_batch.delete::(&index)?; + db_batch.delete::(&(index.state_key, index.version))?; + } Ok(()) } } impl StateValuePruner { - pub(in crate::pruner) fn new(state_store: Arc) -> Self { - StateValuePruner { state_store } + pub(in crate::pruner) fn new(state_kv_db: Arc) -> Self { + StateValuePruner { state_kv_db } } } diff --git a/storage/aptosdb/src/pruner/state_store/test.rs b/storage/aptosdb/src/pruner/state_store/test.rs index 551cfc58bb781e..b8eaca55081d42 100644 --- a/storage/aptosdb/src/pruner/state_store/test.rs +++ b/storage/aptosdb/src/pruner/state_store/test.rs @@ -7,9 +7,9 @@ use crate::{ stale_state_value_index::StaleStateValueIndexSchema, state_store::StateStore, test_helper::{arb_state_kv_sets, update_store}, - AptosDB, LedgerPrunerManager, PrunerManager, StateMerklePrunerManager, + AptosDB, PrunerManager, StateKvPrunerManager, StateMerklePrunerManager, }; -use aptos_config::config::{LedgerPrunerConfig, StateMerklePrunerConfig}; +use aptos_config::config::{StateKvPrunerConfig, StateMerklePrunerConfig}; use aptos_crypto::HashValue; use aptos_schemadb::{ReadOptions, SchemaBatch, DB}; use aptos_storage_interface::{jmt_update_refs, jmt_updates, DbReader}; @@ -378,16 +378,11 @@ fn verify_state_value_pruner(inputs: Vec)>>) { let mut version = 0; let mut current_state_values = HashMap::new(); - let pruner = LedgerPrunerManager::new( - Arc::clone(&db.ledger_db), - Arc::clone(store), - LedgerPrunerConfig { - enable: true, - prune_window: 0, - batch_size: 1, - user_pruning_window_offset: 0, - }, - ); + let pruner = StateKvPrunerManager::new(Arc::clone(&db.ledger_db), StateKvPrunerConfig { + enable: true, + prune_window: 0, + batch_size: 1, + }); for batch in inputs { update_store(store, batch.clone().into_iter(), version); for (k, v) in batch.iter() { @@ -423,7 +418,7 @@ fn verify_state_value<'a, I: Iterator(&StaleStateValueIndex { stale_since_version: version, version: *old_version, @@ -433,10 +428,4 @@ fn verify_state_value<'a, I: Iterator) { let pruner = LedgerPrunerManager::new( Arc::clone(&aptos_db.ledger_db), - Arc::clone(&aptos_db.state_store), LedgerPrunerConfig { enable: true, prune_window: 0, @@ -102,6 +105,18 @@ fn verify_txn_store_pruner( &txns, ); + let batch = SchemaBatch::new(); + for i in 0..=num_transaction as u64 { + let usage = StateStorageUsage::zero(); + batch + .put::(&i, &usage.into()) + .expect("Must succeed."); + } + aptos_db + .ledger_db + .write_schemas(batch) + .expect("Must succeed."); + // start pruning transactions batches of size step_size and verify transactions have been pruned // from DB for i in (0..=num_transaction).step_by(step_size) { @@ -109,7 +124,6 @@ fn verify_txn_store_pruner( // logic. let pruner = LedgerPrunerManager::new( Arc::clone(&aptos_db.ledger_db), - Arc::clone(&aptos_db.state_store), LedgerPrunerConfig { enable: true, prune_window: 0, @@ -137,6 +151,7 @@ fn verify_txn_store_pruner( .get_transaction_proof(j as u64, ledger_version) .is_err()); } + assert!(aptos_db.state_store.get_usage(Some(j as u64)).is_err()); } // ensure all other are valid in DB for j in i..num_transaction { @@ -148,6 +163,7 @@ fn verify_txn_store_pruner( ledger_version, ); aptos_db.get_accumulator_summary(j as Version).unwrap(); + assert!(aptos_db.state_store.get_usage(Some(j as u64)).is_ok()); } verify_transaction_accumulator_pruned(&ledger_store, i as u64); } diff --git a/storage/aptosdb/src/schema/db_metadata/mod.rs b/storage/aptosdb/src/schema/db_metadata/mod.rs index 25280b754eceab..0b3ea6e54579a1 100644 --- a/storage/aptosdb/src/schema/db_metadata/mod.rs +++ b/storage/aptosdb/src/schema/db_metadata/mod.rs @@ -47,6 +47,7 @@ pub enum DbMetadataKey { LedgerPrunerProgress, StateMerklePrunerProgress, EpochEndingStateMerklePrunerProgress, + StateKvPrunerProgress, StateSnapshotRestoreProgress(Version), LedgerCommitProgress, StateKVCommitProgress, diff --git a/storage/aptosdb/src/state_store/mod.rs b/storage/aptosdb/src/state_store/mod.rs index e5d6f73ea338ca..23edb6a426be74 100644 --- a/storage/aptosdb/src/state_store/mod.rs +++ b/storage/aptosdb/src/state_store/mod.rs @@ -19,7 +19,7 @@ use crate::{ }, version_data::VersionDataSchema, AptosDbError, LedgerStore, StaleNodeIndexCrossEpochSchema, StaleNodeIndexSchema, - StateMerklePrunerManager, TransactionStore, OTHER_TIMERS_SECONDS, + StateKvPrunerManager, StateMerklePrunerManager, TransactionStore, OTHER_TIMERS_SECONDS, }; use anyhow::{ensure, format_err, Result}; use aptos_crypto::{ @@ -86,6 +86,7 @@ pub(crate) struct StateDb { pub state_kv_db: Arc, pub state_merkle_pruner: StateMerklePrunerManager, pub epoch_snapshot_pruner: StateMerklePrunerManager, + pub state_kv_pruner: StateKvPrunerManager, } pub(crate) struct StateStore { @@ -278,6 +279,7 @@ impl StateStore { state_kv_db: Arc, state_merkle_pruner: StateMerklePrunerManager, epoch_snapshot_pruner: StateMerklePrunerManager, + state_kv_pruner: StateKvPrunerManager, buffered_state_target_items: usize, max_nodes_per_lru_cache_shard: usize, hack_for_tests: bool, @@ -348,6 +350,7 @@ impl StateStore { state_kv_db, state_merkle_pruner, epoch_snapshot_pruner, + state_kv_pruner, }); let buffered_state = Mutex::new( Self::create_buffered_state_from_latest_snapshot( @@ -383,12 +386,17 @@ impl StateStore { ); let state_merkle_db = Arc::new(StateMerkleDb::new(arc_state_merkle_rocksdb, 0)); let state_kv_db = Arc::clone(&ledger_db); + let state_kv_pruner = StateKvPrunerManager::new( + Arc::clone(&state_kv_db), + NO_OP_STORAGE_PRUNER_CONFIG.state_kv_pruner_config, + ); let state_db = Arc::new(StateDb { ledger_db, state_merkle_db, state_kv_db, state_merkle_pruner, epoch_snapshot_pruner, + state_kv_pruner, }); let buffered_state = Self::create_buffered_state_from_latest_snapshot( &state_db, 0, /*hack_for_tests=*/ false, @@ -809,34 +817,6 @@ impl StateStore { )?)) } - /// Prune the stale state value schema generated between a range of version in (begin, end] - pub fn prune_state_values( - &self, - begin: Version, - end: Version, - db_batch: &SchemaBatch, - ) -> Result<()> { - // TODO(grao): Replace ledger_db by state_kv_db. - let mut iter = self - .state_db - .ledger_db - .iter::(ReadOptions::default())?; - iter.seek(&begin)?; - for item in iter { - let (index, _) = item?; - if index.stale_since_version > end { - break; - } - // Prune the stale state value index itself first. - db_batch.delete::(&index)?; - db_batch.delete::(&(index.state_key, index.version))?; - } - for version in begin..end { - db_batch.delete::(&version)?; - } - Ok(()) - } - #[cfg(test)] pub fn get_all_jmt_nodes_referenced( &self,