From 759b85dd4e004c0ed22311130ef827b9f0d930c0 Mon Sep 17 00:00:00 2001 From: Guoteng Rao <3603304+grao1991@users.noreply.github.com> Date: Tue, 28 Feb 2023 14:36:39 -0800 Subject: [PATCH 1/3] [Storage][Pruner] Rename state pruner to state merkle pruner. --- state-sync/storage-service/server/src/lib.rs | 2 +- .../storage-service/server/src/tests.rs | 6 ++-- storage/aptosdb/src/aptosdb_test.rs | 13 ++++---- storage/aptosdb/src/fake_aptosdb.rs | 4 +-- storage/aptosdb/src/lib.rs | 25 ++++++++------ storage/aptosdb/src/pruner/mod.rs | 4 +-- storage/aptosdb/src/pruner/pruner_utils.rs | 2 +- ...ager.rs => state_merkle_pruner_manager.rs} | 28 ++++++++-------- ...orker.rs => state_merkle_pruner_worker.rs} | 8 ++--- storage/aptosdb/src/pruner/state_store/mod.rs | 15 +++++---- .../aptosdb/src/pruner/state_store/test.rs | 33 +++++++++++-------- storage/aptosdb/src/state_store/mod.rs | 18 +++++----- .../state_merkle_batch_committer.rs | 2 +- storage/storage-interface/src/lib.rs | 2 +- 14 files changed, 87 insertions(+), 75 deletions(-) rename storage/aptosdb/src/pruner/{state_pruner_manager.rs => state_merkle_pruner_manager.rs} (81%) rename storage/aptosdb/src/pruner/{state_pruner_worker.rs => state_merkle_pruner_worker.rs} (93%) diff --git a/state-sync/storage-service/server/src/lib.rs b/state-sync/storage-service/server/src/lib.rs index b5671d50deaaf..3a1e0ce67a5a3 100644 --- a/state-sync/storage-service/server/src/lib.rs +++ b/state-sync/storage-service/server/src/lib.rs @@ -1092,7 +1092,7 @@ impl StorageReader { ) -> Result>, Error> { let pruner_enabled = self .storage - .is_state_pruner_enabled() + .is_state_merkle_pruner_enabled() .map_err(|error| Error::StorageErrorEncountered(error.to_string()))?; if !pruner_enabled { return Ok(*transactions_range); diff --git a/state-sync/storage-service/server/src/tests.rs b/state-sync/storage-service/server/src/tests.rs index 322c4e9c38ef4..ef73a039ba2a7 100644 --- a/state-sync/storage-service/server/src/tests.rs +++ b/state-sync/storage-service/server/src/tests.rs @@ -1658,7 +1658,7 @@ async fn test_get_storage_server_summary() { .times(1) .returning(move || Ok(state_prune_window)); db_reader - .expect_is_state_pruner_enabled() + .expect_is_state_merkle_pruner_enabled() .returning(move || Ok(true)); // Create the storage client and server @@ -3133,7 +3133,7 @@ fn create_mock_db_for_subscription( .expect_get_epoch_snapshot_prune_window() .returning(move || Ok(100)); db_reader - .expect_is_state_pruner_enabled() + .expect_is_state_merkle_pruner_enabled() .returning(move || Ok(true)); db_reader } @@ -3775,6 +3775,6 @@ mock! { fn get_epoch_snapshot_prune_window(&self) -> Result; - fn is_state_pruner_enabled(&self) -> Result; + fn is_state_merkle_pruner_enabled(&self) -> Result; } } diff --git a/storage/aptosdb/src/aptosdb_test.rs b/storage/aptosdb/src/aptosdb_test.rs index 848046b197ff8..e8d9515e67381 100644 --- a/storage/aptosdb/src/aptosdb_test.rs +++ b/storage/aptosdb/src/aptosdb_test.rs @@ -4,7 +4,8 @@ use crate::{ get_first_seq_num_and_limit, pruner::{ - ledger_pruner_manager::LedgerPrunerManager, state_pruner_manager::StatePrunerManager, + ledger_pruner_manager::LedgerPrunerManager, + state_merkle_pruner_manager::StateMerklePrunerManager, }, test_helper, test_helper::{arb_blocks_to_commit, put_as_state_root, put_transaction_info}, @@ -91,7 +92,7 @@ fn test_pruner_config() { let tmp_dir = TempPath::new(); let aptos_db = AptosDB::new_for_test(&tmp_dir); for enable in [false, true] { - let state_pruner = StatePrunerManager::::new( + let state_merkle_pruner = StateMerklePrunerManager::::new( Arc::clone(&aptos_db.state_merkle_db), StateMerklePrunerConfig { enable, @@ -99,8 +100,8 @@ fn test_pruner_config() { batch_size: 1, }, ); - assert_eq!(state_pruner.is_pruner_enabled(), enable); - assert_eq!(state_pruner.get_prune_window(), 20); + assert_eq!(state_merkle_pruner.is_pruner_enabled(), enable); + assert_eq!(state_merkle_pruner.get_prune_window(), 20); let ledger_pruner = LedgerPrunerManager::new( Arc::clone(&aptos_db.ledger_db), @@ -123,7 +124,7 @@ fn test_error_if_version_pruned() { let db = AptosDB::new_for_test(&tmp_dir); db.state_store .state_db - .state_pruner + .state_merkle_pruner .testonly_update_min_version(5); db.ledger_pruner.testonly_update_min_version(10); assert_eq!( @@ -252,7 +253,7 @@ pub fn test_state_merkle_pruning_impl( .collect(); // Prune till the oldest snapshot readable. - let pruner = &db.state_store.state_db.state_pruner; + let pruner = &db.state_store.state_db.state_merkle_pruner; let epoch_snapshot_pruner = &db.state_store.state_db.epoch_snapshot_pruner; pruner .pruner_worker diff --git a/storage/aptosdb/src/fake_aptosdb.rs b/storage/aptosdb/src/fake_aptosdb.rs index cdf0e40fce2fd..b340142ecd61c 100644 --- a/storage/aptosdb/src/fake_aptosdb.rs +++ b/storage/aptosdb/src/fake_aptosdb.rs @@ -811,8 +811,8 @@ impl DbReader for FakeAptosDB { .get_state_value_chunk_with_proof(version, start_idx, chunk_size) } - fn is_state_pruner_enabled(&self) -> Result { - self.inner.is_state_pruner_enabled() + fn is_state_merkle_pruner_enabled(&self) -> Result { + self.inner.is_state_merkle_pruner_enabled() } fn get_epoch_snapshot_prune_window(&self) -> Result { diff --git a/storage/aptosdb/src/lib.rs b/storage/aptosdb/src/lib.rs index 10136b2752bad..fc121c903ef77 100644 --- a/storage/aptosdb/src/lib.rs +++ b/storage/aptosdb/src/lib.rs @@ -59,7 +59,8 @@ use crate::{ pruner::{ db_pruner::DBPruner, ledger_pruner_manager::LedgerPrunerManager, ledger_store::ledger_store_pruner::LedgerPruner, pruner_manager::PrunerManager, - pruner_utils, state_pruner_manager::StatePrunerManager, state_store::StateMerklePruner, + pruner_utils, state_merkle_pruner_manager::StateMerklePrunerManager, + state_store::StateMerklePruner, }, schema::*, stale_node_index::StaleNodeIndexSchema, @@ -280,11 +281,11 @@ impl AptosDB { } else { Arc::clone(&arc_ledger_rocksdb) }; - let state_pruner = StatePrunerManager::new( + let state_merkle_pruner = StateMerklePrunerManager::new( Arc::clone(&arc_state_merkle_rocksdb), pruner_config.state_merkle_pruner_config, ); - let epoch_snapshot_pruner = StatePrunerManager::new( + let epoch_snapshot_pruner = StateMerklePrunerManager::new( Arc::clone(&arc_state_merkle_rocksdb), pruner_config.epoch_snapshot_pruner_config.into(), ); @@ -292,7 +293,7 @@ impl AptosDB { Arc::clone(&arc_ledger_rocksdb), Arc::clone(&arc_state_merkle_rocksdb), Arc::clone(&arc_state_kv_rocksdb), - state_pruner, + state_merkle_pruner, epoch_snapshot_pruner, buffered_state_target_items, max_nodes_per_lru_cache_shard, @@ -979,7 +980,7 @@ impl AptosDB { let min_readable_version = self .state_store .state_db - .state_pruner + .state_merkle_pruner .get_min_readable_version(); if version >= min_readable_version { return Ok(()); @@ -1655,9 +1656,13 @@ impl DbReader for AptosDB { }) } - fn is_state_pruner_enabled(&self) -> Result { - gauged_api("is_state_pruner_enabled", || { - Ok(self.state_store.state_db.state_pruner.is_pruner_enabled()) + fn is_state_merkle_pruner_enabled(&self) -> Result { + gauged_api("is_state_merkle_pruner_enabled", || { + Ok(self + .state_store + .state_db + .state_merkle_pruner + .is_pruner_enabled()) }) } @@ -2005,7 +2010,7 @@ impl DbWriter for AptosDB { )?; self.state_store - .state_pruner + .state_merkle_pruner .pruner() .save_min_readable_version(version, &state_merkle_batch)?; self.state_store @@ -2023,7 +2028,7 @@ impl DbWriter for AptosDB { self.ledger_pruner.pruner().record_progress(version); self.state_store - .state_pruner + .state_merkle_pruner .pruner() .record_progress(version); self.state_store diff --git a/storage/aptosdb/src/pruner/mod.rs b/storage/aptosdb/src/pruner/mod.rs index b642ab4fb73aa..f05ea4004e440 100644 --- a/storage/aptosdb/src/pruner/mod.rs +++ b/storage/aptosdb/src/pruner/mod.rs @@ -9,11 +9,11 @@ pub(crate) mod ledger_pruner_worker; pub(crate) mod ledger_store; pub(crate) mod pruner_manager; pub mod pruner_utils; -pub(crate) mod state_pruner_worker; +pub(crate) mod state_merkle_pruner_worker; pub(crate) mod state_store; pub(crate) mod transaction_store; // This module provides `Pruner` which manages a thread pruning old data in the background and is // meant to be triggered by other threads as they commit new data to the DB. pub(crate) mod ledger_pruner_manager; -pub(crate) mod state_pruner_manager; +pub(crate) mod state_merkle_pruner_manager; diff --git a/storage/aptosdb/src/pruner/pruner_utils.rs b/storage/aptosdb/src/pruner/pruner_utils.rs index ad963bd83925a..4833cb39690f1 100644 --- a/storage/aptosdb/src/pruner/pruner_utils.rs +++ b/storage/aptosdb/src/pruner/pruner_utils.rs @@ -15,7 +15,7 @@ use aptos_schemadb::{schema::KeyCodec, DB}; use std::sync::Arc; /// A utility function to instantiate the state pruner -pub fn create_state_pruner( +pub fn create_state_merkle_pruner( state_merkle_db: Arc, ) -> Arc> where diff --git a/storage/aptosdb/src/pruner/state_pruner_manager.rs b/storage/aptosdb/src/pruner/state_merkle_pruner_manager.rs similarity index 81% rename from storage/aptosdb/src/pruner/state_pruner_manager.rs rename to storage/aptosdb/src/pruner/state_merkle_pruner_manager.rs index e094e29061b1a..016fe002e021b 100644 --- a/storage/aptosdb/src/pruner/state_pruner_manager.rs +++ b/storage/aptosdb/src/pruner/state_merkle_pruner_manager.rs @@ -9,7 +9,7 @@ use crate::{ pruner::{ db_pruner::DBPruner, pruner_manager::PrunerManager, - state_pruner_worker::StatePrunerWorker, + state_merkle_pruner_worker::StateMerklePrunerWorker, state_store::{generics::StaleNodeIndexSchemaTrait, StateMerklePruner}, }, pruner_utils, @@ -28,7 +28,7 @@ use std::{sync::Arc, thread::JoinHandle}; /// destruction. When destructed, it quits the worker thread eagerly without waiting for all /// pending work to be done. #[derive(Debug)] -pub struct StatePrunerManager +pub struct StateMerklePrunerManager where StaleNodeIndex: KeyCodec, { @@ -40,8 +40,8 @@ where /// of the min_readable_version. pruner: Arc>, /// Wrapper class of the state pruner. - pub(crate) pruner_worker: Arc>, - /// The worker thread handle for state_pruner, created upon Pruner instance construction and + pub(crate) pruner_worker: Arc>, + /// The worker thread handle for state_merkle_pruner, created upon Pruner instance construction and /// joined upon its destruction. It is `None` when state pruner is not enabled or it only /// becomes `None` after joined in `drop()`. worker_thread: Option>, @@ -52,7 +52,7 @@ where latest_version: Arc>, } -impl PrunerManager for StatePrunerManager +impl PrunerManager for StateMerklePrunerManager where StaleNodeIndex: KeyCodec, { @@ -97,14 +97,14 @@ where } } -impl StatePrunerManager +impl StateMerklePrunerManager where StaleNodeIndex: KeyCodec, { /// Creates a worker thread that waits on a channel for pruning commands. pub fn new(state_merkle_rocksdb: Arc, config: StateMerklePrunerConfig) -> Self { let state_db_clone = Arc::clone(&state_merkle_rocksdb); - let pruner = pruner_utils::create_state_pruner(state_db_clone); + let pruner = pruner_utils::create_state_merkle_pruner(state_db_clone); if config.enable { PRUNER_WINDOW @@ -116,14 +116,14 @@ where .set(config.batch_size as i64); } - let pruner_worker = Arc::new(StatePrunerWorker::new(Arc::clone(&pruner), config)); - let state_pruner_worker_clone = Arc::clone(&pruner_worker); + let pruner_worker = Arc::new(StateMerklePrunerWorker::new(Arc::clone(&pruner), config)); + let state_merkle_pruner_worker_clone = Arc::clone(&pruner_worker); let worker_thread = if config.enable { Some( std::thread::Builder::new() - .name("aptosdb_state_pruner".into()) - .spawn(move || state_pruner_worker_clone.as_ref().work()) + .name("aptosdb_state_merkle_pruner".into()) + .spawn(move || state_merkle_pruner_worker_clone.as_ref().work()) .expect("Creating state pruner thread should succeed."), ) } else { @@ -148,7 +148,7 @@ where } } -impl Drop for StatePrunerManager +impl Drop for StateMerklePrunerManager where StaleNodeIndex: KeyCodec, { @@ -158,9 +158,9 @@ where assert!(self.worker_thread.is_some()); self.worker_thread .take() - .expect("Ledger pruner worker thread must exist.") + .expect("State merkle pruner worker thread must exist.") .join() - .expect("Ledger pruner worker thread should join peacefully."); + .expect("State merkle pruner worker thread should join peacefully."); } } } diff --git a/storage/aptosdb/src/pruner/state_pruner_worker.rs b/storage/aptosdb/src/pruner/state_merkle_pruner_worker.rs similarity index 93% rename from storage/aptosdb/src/pruner/state_pruner_worker.rs rename to storage/aptosdb/src/pruner/state_merkle_pruner_worker.rs index ee81a69707340..5d03a6c827dc3 100644 --- a/storage/aptosdb/src/pruner/state_pruner_worker.rs +++ b/storage/aptosdb/src/pruner/state_merkle_pruner_worker.rs @@ -24,7 +24,7 @@ use std::{ /// Maintains the state store pruner and periodically calls the db_pruner's prune method to prune /// the DB. This also exposes API to report the progress to the parent thread. #[derive(Debug)] -pub struct StatePrunerWorker { +pub struct StateMerklePrunerWorker { /// The worker will sleep for this period of time after pruning each batch. pruning_time_interval_in_ms: u64, /// State store pruner. @@ -37,17 +37,17 @@ pub struct StatePrunerWorker { _phantom: std::marker::PhantomData, } -impl StatePrunerWorker +impl StateMerklePrunerWorker where StaleNodeIndex: KeyCodec, { pub(crate) fn new( - state_pruner: Arc>, + state_merkle_pruner: Arc>, state_merkle_pruner_config: StateMerklePrunerConfig, ) -> Self { Self { pruning_time_interval_in_ms: if cfg!(test) { 100 } else { 1 }, - pruner: state_pruner, + pruner: state_merkle_pruner, max_node_to_prune_per_batch: state_merkle_pruner_config.batch_size as u64, quit_worker: AtomicBool::new(false), _phantom: std::marker::PhantomData, diff --git a/storage/aptosdb/src/pruner/state_store/mod.rs b/storage/aptosdb/src/pruner/state_store/mod.rs index 1dc94a9502a39..6224756a19863 100644 --- a/storage/aptosdb/src/pruner/state_store/mod.rs +++ b/storage/aptosdb/src/pruner/state_store/mod.rs @@ -147,7 +147,7 @@ where Ok(target_version) } else { let _timer = OTHER_TIMERS_SECONDS - .with_label_values(&["state_pruner_commit"]) + .with_label_values(&["state_merkle_pruner_commit"]) .start_timer(); let new_min_readable_version = indices.last().expect("Should exist.").stale_since_version; @@ -226,13 +226,14 @@ impl StateMerklePruner { let target_version = 1; // The genesis version is 0. Delete [0,1) (exclusive) let max_version = 1; // We should only be pruning a single version - let state_pruner = - pruner_utils::create_state_pruner::(state_merkle_db); - state_pruner.set_target_version(target_version); + let state_merkle_pruner = pruner_utils::create_state_merkle_pruner::< + StaleNodeIndexCrossEpochSchema, + >(state_merkle_db); + state_merkle_pruner.set_target_version(target_version); - let min_readable_version = state_pruner.min_readable_version(); - let target_version = state_pruner.target_version(); - state_pruner.prune_state_merkle( + let min_readable_version = state_merkle_pruner.min_readable_version(); + let target_version = state_merkle_pruner.target_version(); + state_merkle_pruner.prune_state_merkle( min_readable_version, target_version, max_version, diff --git a/storage/aptosdb/src/pruner/state_store/test.rs b/storage/aptosdb/src/pruner/state_store/test.rs index c2a4351bc5d11..551cfc58bb781 100644 --- a/storage/aptosdb/src/pruner/state_store/test.rs +++ b/storage/aptosdb/src/pruner/state_store/test.rs @@ -2,12 +2,12 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - pruner::{state_pruner_worker::StatePrunerWorker, *}, + pruner::{state_merkle_pruner_worker::StateMerklePrunerWorker, *}, stale_node_index::StaleNodeIndexSchema, stale_state_value_index::StaleStateValueIndexSchema, state_store::StateStore, test_helper::{arb_state_kv_sets, update_store}, - AptosDB, LedgerPrunerManager, PrunerManager, StatePrunerManager, + AptosDB, LedgerPrunerManager, PrunerManager, StateMerklePrunerManager, }; use aptos_config::config::{LedgerPrunerConfig, StateMerklePrunerConfig}; use aptos_crypto::HashValue; @@ -78,11 +78,11 @@ fn verify_state_in_store( assert_eq!(value.as_ref(), expected_value); } -fn create_state_pruner_manager( +fn create_state_merkle_pruner_manager( state_merkle_db: &Arc, prune_batch_size: usize, -) -> StatePrunerManager { - StatePrunerManager::new(Arc::clone(state_merkle_db), StateMerklePrunerConfig { +) -> StateMerklePrunerManager { + StateMerklePrunerManager::new(Arc::clone(state_merkle_db), StateMerklePrunerConfig { enable: true, prune_window: 0, batch_size: prune_batch_size, @@ -113,7 +113,8 @@ fn test_state_store_pruner() { // Prune till version=0. This should basically be a no-op. Create a new pruner everytime to // test the min_readable_version initialization logic. { - let pruner = create_state_pruner_manager(&aptos_db.state_merkle_db, prune_batch_size); + let pruner = + create_state_merkle_pruner_manager(&aptos_db.state_merkle_db, prune_batch_size); pruner.wake_and_wait_pruner(0 /* latest_version */).unwrap(); for i in 0..num_versions { verify_state_in_store( @@ -129,7 +130,8 @@ fn test_state_store_pruner() { // we expect versions 0 to 9 to be pruned. Create a new pruner everytime to test the // min_readable_version initialization logic. { - let pruner = create_state_pruner_manager(&aptos_db.state_merkle_db, prune_batch_size); + let pruner = + create_state_merkle_pruner_manager(&aptos_db.state_merkle_db, prune_batch_size); pruner .wake_and_wait_pruner(prune_batch_size as u64 /* latest_version */) .unwrap(); @@ -203,7 +205,8 @@ fn test_state_store_pruner_partial_version() { // Prune till version=0. This should basically be a no-op. Create a new pruner every time // to test the min_readable_version initialization logic. { - let pruner = create_state_pruner_manager(&aptos_db.state_merkle_db, prune_batch_size); + let pruner = + create_state_merkle_pruner_manager(&aptos_db.state_merkle_db, prune_batch_size); pruner.wake_and_wait_pruner(0 /* latest_version */).unwrap(); verify_state_in_store(state_store, key1.clone(), Some(&value1), 1); verify_state_in_store(state_store, key2.clone(), Some(&value2_update), 1); @@ -214,7 +217,8 @@ fn test_state_store_pruner_partial_version() { // should prune 1 stale node with the version 0. Create a new pruner everytime to test the // min_readable_version initialization logic. { - let pruner = create_state_pruner_manager(&aptos_db.state_merkle_db, prune_batch_size); + let pruner = + create_state_merkle_pruner_manager(&aptos_db.state_merkle_db, prune_batch_size); assert!(pruner.wake_and_wait_pruner(1 /* latest_version */,).is_ok()); assert!(state_store .get_state_value_with_proof_by_version(&key1, 0_u64) @@ -227,7 +231,8 @@ fn test_state_store_pruner_partial_version() { // Prune 3 more times. All version 0 and 1 stale nodes should be gone. Create a new pruner // everytime to test the min_readable_version initialization logic. { - let pruner = create_state_pruner_manager(&aptos_db.state_merkle_db, prune_batch_size); + let pruner = + create_state_merkle_pruner_manager(&aptos_db.state_merkle_db, prune_batch_size); assert!(pruner.wake_and_wait_pruner(2 /* latest_version */,).is_ok()); assert!(pruner.wake_and_wait_pruner(2 /* latest_version */,).is_ok()); @@ -336,10 +341,10 @@ fn test_worker_quit_eagerly() { ); { - let state_pruner = pruner_utils::create_state_pruner::(Arc::clone( - &aptos_db.state_merkle_db, - )); - let worker = StatePrunerWorker::new(state_pruner, StateMerklePrunerConfig { + let state_merkle_pruner = pruner_utils::create_state_merkle_pruner::( + Arc::clone(&aptos_db.state_merkle_db), + ); + let worker = StateMerklePrunerWorker::new(state_merkle_pruner, StateMerklePrunerConfig { enable: true, prune_window: 1, batch_size: 100, diff --git a/storage/aptosdb/src/state_store/mod.rs b/storage/aptosdb/src/state_store/mod.rs index 98582a43b887e..e5d6f73ea338c 100644 --- a/storage/aptosdb/src/state_store/mod.rs +++ b/storage/aptosdb/src/state_store/mod.rs @@ -19,7 +19,7 @@ use crate::{ }, version_data::VersionDataSchema, AptosDbError, LedgerStore, StaleNodeIndexCrossEpochSchema, StaleNodeIndexSchema, - StatePrunerManager, TransactionStore, OTHER_TIMERS_SECONDS, + StateMerklePrunerManager, TransactionStore, OTHER_TIMERS_SECONDS, }; use anyhow::{ensure, format_err, Result}; use aptos_crypto::{ @@ -84,8 +84,8 @@ pub(crate) struct StateDb { pub ledger_db: Arc, pub state_merkle_db: Arc, pub state_kv_db: Arc, - pub state_pruner: StatePrunerManager, - pub epoch_snapshot_pruner: StatePrunerManager, + pub state_merkle_pruner: StateMerklePrunerManager, + pub epoch_snapshot_pruner: StateMerklePrunerManager, } pub(crate) struct StateStore { @@ -276,8 +276,8 @@ impl StateStore { ledger_db: Arc, state_merkle_db: Arc, state_kv_db: Arc, - state_pruner: StatePrunerManager, - epoch_snapshot_pruner: StatePrunerManager, + state_merkle_pruner: StateMerklePrunerManager, + epoch_snapshot_pruner: StateMerklePrunerManager, buffered_state_target_items: usize, max_nodes_per_lru_cache_shard: usize, hack_for_tests: bool, @@ -346,7 +346,7 @@ impl StateStore { ledger_db, state_merkle_db, state_kv_db, - state_pruner, + state_merkle_pruner, epoch_snapshot_pruner, }); let buffered_state = Mutex::new( @@ -373,11 +373,11 @@ impl StateStore { use aptos_config::config::NO_OP_STORAGE_PRUNER_CONFIG; let arc_state_merkle_rocksdb = Arc::new(state_merkle_db); - let state_pruner = StatePrunerManager::new( + let state_merkle_pruner = StateMerklePrunerManager::new( Arc::clone(&arc_state_merkle_rocksdb), NO_OP_STORAGE_PRUNER_CONFIG.state_merkle_pruner_config, ); - let epoch_snapshot_pruner = StatePrunerManager::new( + let epoch_snapshot_pruner = StateMerklePrunerManager::new( Arc::clone(&arc_state_merkle_rocksdb), NO_OP_STORAGE_PRUNER_CONFIG.state_merkle_pruner_config, ); @@ -387,7 +387,7 @@ impl StateStore { ledger_db, state_merkle_db, state_kv_db, - state_pruner, + state_merkle_pruner, epoch_snapshot_pruner, }); let buffered_state = Self::create_buffered_state_from_latest_snapshot( diff --git a/storage/aptosdb/src/state_store/state_merkle_batch_committer.rs b/storage/aptosdb/src/state_store/state_merkle_batch_committer.rs index 66403b3366b49..16fd8b15d75e1 100644 --- a/storage/aptosdb/src/state_store/state_merkle_batch_committer.rs +++ b/storage/aptosdb/src/state_store/state_merkle_batch_committer.rs @@ -75,7 +75,7 @@ impl StateMerkleBatchCommitter { .expect("Current version should not be None"); LATEST_SNAPSHOT_VERSION.set(current_version as i64); self.state_db - .state_pruner + .state_merkle_pruner .maybe_set_pruner_target_db_version(current_version); self.state_db .epoch_snapshot_pruner diff --git a/storage/storage-interface/src/lib.rs b/storage/storage-interface/src/lib.rs index dd195a928ced0..5e9adfd9327d6 100644 --- a/storage/storage-interface/src/lib.rs +++ b/storage/storage-interface/src/lib.rs @@ -501,7 +501,7 @@ pub trait DbReader: Send + Sync { } /// Returns if the state store pruner is enabled. - fn is_state_pruner_enabled(&self) -> Result { + fn is_state_merkle_pruner_enabled(&self) -> Result { unimplemented!() } From ffa28c82aa19bd65ce01e0ee2f217c1c13229480 Mon Sep 17 00:00:00 2001 From: Guoteng Rao <3603304+grao1991@users.noreply.github.com> Date: Fri, 3 Mar 2023 15:38:26 -0800 Subject: [PATCH 2/3] [Storage][Pruner] Split state k/v pruning to a separate pruner. --- config/src/config/storage_config.rs | 39 ++++- execution/executor-benchmark/src/main.rs | 17 +- storage/aptosdb/src/aptosdb_test.rs | 16 +- storage/aptosdb/src/lib.rs | 55 +++++-- .../aptosdb/src/pruner/event_store/test.rs | 16 +- .../src/pruner/ledger_pruner_manager.rs | 10 +- .../ledger_store/ledger_store_pruner.rs | 19 +-- .../aptosdb/src/pruner/ledger_store/mod.rs | 1 + .../ledger_store/version_data_pruner.rs | 28 ++++ storage/aptosdb/src/pruner/mod.rs | 3 + storage/aptosdb/src/pruner/pruner_utils.rs | 14 +- storage/aptosdb/src/pruner/state_kv_pruner.rs | 133 ++++++++++++++++ .../src/pruner/state_kv_pruner_manager.rs | 147 ++++++++++++++++++ .../src/pruner/state_kv_pruner_worker.rs | 76 +++++++++ .../pruner/state_store/state_value_pruner.rs | 28 +++- .../aptosdb/src/pruner/state_store/test.rs | 27 +--- .../src/pruner/transaction_store/test.rs | 40 +++-- storage/aptosdb/src/schema/db_metadata/mod.rs | 1 + storage/aptosdb/src/state_store/mod.rs | 38 ++--- 19 files changed, 575 insertions(+), 133 deletions(-) create mode 100644 storage/aptosdb/src/pruner/ledger_store/version_data_pruner.rs create mode 100644 storage/aptosdb/src/pruner/state_kv_pruner.rs create mode 100644 storage/aptosdb/src/pruner/state_kv_pruner_manager.rs create mode 100644 storage/aptosdb/src/pruner/state_kv_pruner_worker.rs diff --git a/config/src/config/storage_config.rs b/config/src/config/storage_config.rs index 833afff167f8c..d6251fc2866d3 100644 --- a/config/src/config/storage_config.rs +++ b/config/src/config/storage_config.rs @@ -117,6 +117,11 @@ pub const NO_OP_STORAGE_PRUNER_CONFIG: PrunerConfig = PrunerConfig { prune_window: 0, batch_size: 0, }, + state_kv_pruner_config: StateKvPrunerConfig { + enable: false, + prune_window: 0, + batch_size: 0, + }, }; #[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)] @@ -140,13 +145,12 @@ pub struct LedgerPrunerConfig { #[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)] #[serde(default, deny_unknown_fields)] pub struct StateMerklePrunerConfig { - /// Boolean to enable/disable the state store pruner. The state pruner is responsible for - /// pruning state tree nodes. + /// Boolean to enable/disable the state merkle pruner. The state merkle pruner is responsible + /// for pruning state tree nodes. pub enable: bool, - /// The size of the window should be calculated based on disk space availability and system TPS. + /// Window size in versions. pub prune_window: u64, - /// Similar to the variable above but for state store pruner. It means the number of stale - /// nodes to prune a time. + /// Number of stale nodes to prune a time. pub batch_size: usize, } @@ -161,6 +165,19 @@ pub struct EpochSnapshotPrunerConfig { pub batch_size: usize, } +#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)] +#[serde(default, deny_unknown_fields)] +pub struct StateKvPrunerConfig { + /// Boolean to enable/disable the state kv pruner. The state pruner is responsible for + /// pruning state tree nodes. + pub enable: bool, + /// Window size in versions. + pub prune_window: u64, + /// Similar to the variable above but for state kv pruner. It means the number of versions to + /// prune a time. + pub batch_size: usize, +} + // Config for the epoch ending state pruner is actually in the same format as the state merkle // pruner, but it has it's own type hence separate default values. This converts it to the same // type, to use the same pruner implementation (but parameterized on the stale node index DB schema). @@ -180,6 +197,7 @@ pub struct PrunerConfig { pub ledger_pruner_config: LedgerPrunerConfig, pub state_merkle_pruner_config: StateMerklePrunerConfig, pub epoch_snapshot_pruner_config: EpochSnapshotPrunerConfig, + pub state_kv_pruner_config: StateKvPrunerConfig, } impl Default for LedgerPrunerConfig { @@ -230,6 +248,17 @@ impl Default for EpochSnapshotPrunerConfig { } } +impl Default for StateKvPrunerConfig { + fn default() -> Self { + Self { + // TODO(grao): Keep it the same as ledger pruner config for now, will revisit later. + enable: true, + prune_window: 150_000_000, + batch_size: 500, + } + } +} + impl Default for StorageConfig { fn default() -> StorageConfig { StorageConfig { diff --git a/execution/executor-benchmark/src/main.rs b/execution/executor-benchmark/src/main.rs index 544c316a2a017..b2aebde23f059 100644 --- a/execution/executor-benchmark/src/main.rs +++ b/execution/executor-benchmark/src/main.rs @@ -3,7 +3,8 @@ // SPDX-License-Identifier: Apache-2.0 use aptos_config::config::{ - EpochSnapshotPrunerConfig, LedgerPrunerConfig, PrunerConfig, StateMerklePrunerConfig, + EpochSnapshotPrunerConfig, LedgerPrunerConfig, PrunerConfig, StateKvPrunerConfig, + StateMerklePrunerConfig, }; use aptos_executor::block_executor::TransactionBlockExecutor; use aptos_executor_benchmark::{ @@ -29,6 +30,9 @@ struct PrunerOpt { #[structopt(long)] enable_ledger_pruner: bool, + #[structopt(long)] + enable_state_kv_pruner: bool, + #[structopt(long, default_value = "100000")] state_prune_window: u64, @@ -38,6 +42,9 @@ struct PrunerOpt { #[structopt(long, default_value = "100000")] ledger_prune_window: u64, + #[structopt(long, default_value = "100000")] + state_kv_prune_window: u64, + #[structopt(long, default_value = "500")] ledger_pruning_batch_size: usize, @@ -46,6 +53,9 @@ struct PrunerOpt { #[structopt(long, default_value = "500")] epoch_snapshot_pruning_batch_size: usize, + + #[structopt(long, default_value = "500")] + state_kv_pruning_batch_size: usize, } impl PrunerOpt { @@ -67,6 +77,11 @@ impl PrunerOpt { batch_size: self.ledger_pruning_batch_size, user_pruning_window_offset: 0, }, + state_kv_pruner_config: StateKvPrunerConfig { + enable: self.enable_state_kv_pruner, + prune_window: self.state_kv_prune_window, + batch_size: self.state_kv_pruning_batch_size, + }, } } } diff --git a/storage/aptosdb/src/aptosdb_test.rs b/storage/aptosdb/src/aptosdb_test.rs index e8d9515e67381..c632cc8c6b312 100644 --- a/storage/aptosdb/src/aptosdb_test.rs +++ b/storage/aptosdb/src/aptosdb_test.rs @@ -13,7 +13,7 @@ use crate::{ }; use aptos_config::config::{ EpochSnapshotPrunerConfig, LedgerPrunerConfig, PrunerConfig, RocksdbConfigs, - StateMerklePrunerConfig, BUFFERED_STATE_TARGET_ITEMS, + StateKvPrunerConfig, StateMerklePrunerConfig, BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, }; use aptos_crypto::{hash::CryptoHash, HashValue}; @@ -103,16 +103,13 @@ fn test_pruner_config() { assert_eq!(state_merkle_pruner.is_pruner_enabled(), enable); assert_eq!(state_merkle_pruner.get_prune_window(), 20); - let ledger_pruner = LedgerPrunerManager::new( - Arc::clone(&aptos_db.ledger_db), - Arc::clone(&aptos_db.state_store), - LedgerPrunerConfig { + let ledger_pruner = + LedgerPrunerManager::new(Arc::clone(&aptos_db.ledger_db), LedgerPrunerConfig { enable, prune_window: 100, batch_size: 1, user_pruning_window_offset: 0, - }, - ); + }); assert_eq!(ledger_pruner.is_pruner_enabled(), enable); assert_eq!(ledger_pruner.get_prune_window(), 100); } @@ -203,6 +200,11 @@ pub fn test_state_merkle_pruning_impl( prune_window: 10, batch_size: 1, }, + state_kv_pruner_config: StateKvPrunerConfig { + enable: true, + prune_window: 10, + batch_size: 1, + }, }, RocksdbConfigs::default(), false, /* enable_indexer */ diff --git a/storage/aptosdb/src/lib.rs b/storage/aptosdb/src/lib.rs index fc121c903ef77..514d371b7a4b9 100644 --- a/storage/aptosdb/src/lib.rs +++ b/storage/aptosdb/src/lib.rs @@ -59,8 +59,9 @@ use crate::{ pruner::{ db_pruner::DBPruner, ledger_pruner_manager::LedgerPrunerManager, ledger_store::ledger_store_pruner::LedgerPruner, pruner_manager::PrunerManager, - pruner_utils, state_merkle_pruner_manager::StateMerklePrunerManager, - state_store::StateMerklePruner, + pruner_utils, state_kv_pruner::StateKvPruner, + state_kv_pruner_manager::StateKvPrunerManager, + state_merkle_pruner_manager::StateMerklePrunerManager, state_store::StateMerklePruner, }, schema::*, stale_node_index::StaleNodeIndexSchema, @@ -289,20 +290,24 @@ impl AptosDB { Arc::clone(&arc_state_merkle_rocksdb), pruner_config.epoch_snapshot_pruner_config.into(), ); + let state_kv_pruner = StateKvPrunerManager::new( + Arc::clone(&arc_state_kv_rocksdb), + pruner_config.state_kv_pruner_config, + ); let state_store = Arc::new(StateStore::new( Arc::clone(&arc_ledger_rocksdb), Arc::clone(&arc_state_merkle_rocksdb), Arc::clone(&arc_state_kv_rocksdb), state_merkle_pruner, epoch_snapshot_pruner, + state_kv_pruner, buffered_state_target_items, max_nodes_per_lru_cache_shard, hack_for_tests, )); - // TODO(grao): Handle state kv db pruning. + let ledger_pruner = LedgerPrunerManager::new( Arc::clone(&arc_ledger_rocksdb), - Arc::clone(&state_store), pruner_config.ledger_pruner_config, ); @@ -1003,6 +1008,18 @@ impl AptosDB { ) } } + + fn error_if_state_kv_pruned(&self, data_type: &str, version: Version) -> Result<()> { + let min_readable_version = self.state_store.state_kv_pruner.get_min_readable_version(); + ensure!( + version >= min_readable_version, + "{} at version {} is pruned, min available version is {}.", + data_type, + version, + min_readable_version + ); + Ok(()) + } } impl DbReader for AptosDB { @@ -1025,7 +1042,7 @@ impl DbReader for AptosDB { version: Version, ) -> Result> + '_>> { gauged_api("get_prefixed_state_value_iterator", || { - self.error_if_ledger_pruned("State", version)?; + self.error_if_state_kv_pruned("StateValue", version)?; Ok(Box::new( self.state_store @@ -1427,7 +1444,7 @@ impl DbReader for AptosDB { version: Version, ) -> Result> { gauged_api("get_state_value_by_version", || { - self.error_if_ledger_pruned("State", version)?; + self.error_if_state_kv_pruned("StateValue", version)?; self.state_store .get_state_value_by_version(state_store_key, version) @@ -1875,10 +1892,14 @@ impl DbWriter for AptosDB { let last_version = first_version + num_txns - 1; COMMITTED_TXNS.inc_by(num_txns); LATEST_TXN_VERSION.set(last_version as i64); - // Activate the ledger pruner. Note the state merkle pruner is activated when - // state snapshots are persisted in their async thread. + // Activate the ledger pruner and state kv pruner. + // Note the state merkle pruner is activated when state snapshots are persisted + // in their async thread. self.ledger_pruner .maybe_set_pruner_target_db_version(last_version); + self.state_store + .state_kv_pruner + .maybe_set_pruner_target_db_version(last_version); } // Note: this must happen after txns have been saved to db because types can be newly @@ -1993,11 +2014,7 @@ impl DbWriter for AptosDB { )?; // Delete the genesis transaction - LedgerPruner::prune_genesis( - self.ledger_db.clone(), - self.state_store.clone(), - &mut batch, - )?; + LedgerPruner::prune_genesis(self.ledger_db.clone(), &mut batch)?; self.ledger_pruner .pruner() @@ -2018,11 +2035,23 @@ impl DbWriter for AptosDB { .pruner() .save_min_readable_version(version, &state_merkle_batch)?; + let mut state_kv_batch = SchemaBatch::new(); + StateKvPruner::prune_genesis( + self.state_store.state_kv_db.clone(), + &mut state_kv_batch, + )?; + self.state_store + .state_kv_pruner + .pruner() + .save_min_readable_version(version, &state_kv_batch)?; + // Apply the change set writes to the database (atomically) and update in-memory state self.ledger_db.clone().write_schemas(batch)?; self.state_merkle_db .clone() .write_schemas(state_merkle_batch)?; + self.state_kv_db.clone().write_schemas(state_kv_batch)?; + restore_utils::update_latest_ledger_info(self.ledger_store.clone(), ledger_infos)?; self.state_store.reset(); diff --git a/storage/aptosdb/src/pruner/event_store/test.rs b/storage/aptosdb/src/pruner/event_store/test.rs index a7d1149a7eeb4..1fccbe9675723 100644 --- a/storage/aptosdb/src/pruner/event_store/test.rs +++ b/storage/aptosdb/src/pruner/event_store/test.rs @@ -67,16 +67,12 @@ fn verify_event_store_pruner(events: Vec>) { } aptos_db.ledger_db.write_schemas(batch).unwrap(); - let pruner = LedgerPrunerManager::new( - Arc::clone(&aptos_db.ledger_db), - Arc::clone(&aptos_db.state_store), - LedgerPrunerConfig { - enable: true, - prune_window: 0, - batch_size: 1, - user_pruning_window_offset: 0, - }, - ); + let pruner = LedgerPrunerManager::new(Arc::clone(&aptos_db.ledger_db), LedgerPrunerConfig { + enable: true, + prune_window: 0, + batch_size: 1, + user_pruning_window_offset: 0, + }); // start pruning events batches of size 2 and verify transactions have been pruned from DB for i in (0..=num_versions).step_by(2) { pruner diff --git a/storage/aptosdb/src/pruner/ledger_pruner_manager.rs b/storage/aptosdb/src/pruner/ledger_pruner_manager.rs index 2ee544b4503dd..f2918956fe208 100644 --- a/storage/aptosdb/src/pruner/ledger_pruner_manager.rs +++ b/storage/aptosdb/src/pruner/ledger_pruner_manager.rs @@ -7,7 +7,7 @@ use crate::{ db_pruner::DBPruner, ledger_pruner_worker::LedgerPrunerWorker, ledger_store::ledger_store_pruner::LedgerPruner, pruner_manager::PrunerManager, }, - pruner_utils, StateStore, + pruner_utils, }; use aptos_config::config::LedgerPrunerConfig; use aptos_infallible::Mutex; @@ -99,12 +99,8 @@ impl PrunerManager for LedgerPrunerManager { impl LedgerPrunerManager { /// Creates a worker thread that waits on a channel for pruning commands. - pub fn new( - ledger_rocksdb: Arc, - state_store: Arc, - ledger_pruner_config: LedgerPrunerConfig, - ) -> Self { - let ledger_pruner = pruner_utils::create_ledger_pruner(ledger_rocksdb, state_store); + pub fn new(ledger_rocksdb: Arc, ledger_pruner_config: LedgerPrunerConfig) -> Self { + let ledger_pruner = pruner_utils::create_ledger_pruner(ledger_rocksdb); if ledger_pruner_config.enable { PRUNER_WINDOW diff --git a/storage/aptosdb/src/pruner/ledger_store/ledger_store_pruner.rs b/storage/aptosdb/src/pruner/ledger_store/ledger_store_pruner.rs index cca5450febfea..333e4850aa0ca 100644 --- a/storage/aptosdb/src/pruner/ledger_store/ledger_store_pruner.rs +++ b/storage/aptosdb/src/pruner/ledger_store/ledger_store_pruner.rs @@ -8,7 +8,7 @@ use crate::{ db_pruner::DBPruner, db_sub_pruner::DBSubPruner, event_store::event_store_pruner::EventStorePruner, - state_store::state_value_pruner::StateValuePruner, + ledger_store::version_data_pruner::VersionDataPruner, transaction_store::{ transaction_store_pruner::TransactionStorePruner, write_set_pruner::WriteSetPruner, }, @@ -18,7 +18,7 @@ use crate::{ db_metadata::{DbMetadataKey, DbMetadataValue}, transaction::TransactionSchema, }, - EventStore, StateStore, TransactionStore, + EventStore, TransactionStore, }; use aptos_logger::warn; use aptos_schemadb::{ReadOptions, SchemaBatch, DB}; @@ -34,7 +34,7 @@ pub(crate) struct LedgerPruner { target_version: AtomicVersion, min_readable_version: AtomicVersion, transaction_store_pruner: Arc, - state_value_pruner: Arc, + version_data_pruner: Arc, event_store_pruner: Arc, write_set_pruner: Arc, } @@ -137,7 +137,6 @@ impl LedgerPruner { db: Arc, transaction_store: Arc, event_store: Arc, - state_store: Arc, ) -> Self { let pruner = LedgerPruner { db, @@ -146,24 +145,20 @@ impl LedgerPruner { transaction_store_pruner: Arc::new(TransactionStorePruner::new( transaction_store.clone(), )), - state_value_pruner: Arc::new(StateValuePruner::new(state_store)), event_store_pruner: Arc::new(EventStorePruner::new(event_store)), write_set_pruner: Arc::new(WriteSetPruner::new(transaction_store)), + version_data_pruner: Arc::new(VersionDataPruner::new()), }; pruner.initialize(); pruner } /// Prunes the genesis transaction and saves the db alterations to the given change set - pub fn prune_genesis( - ledger_db: Arc, - state_store: Arc, - db_batch: &mut SchemaBatch, - ) -> anyhow::Result<()> { + pub fn prune_genesis(ledger_db: Arc, db_batch: &mut SchemaBatch) -> anyhow::Result<()> { let target_version = 1; // The genesis version is 0. Delete [0,1) (exclusive) let max_version = 1; // We should only be pruning a single version - let ledger_pruner = pruner_utils::create_ledger_pruner(ledger_db, state_store); + let ledger_pruner = pruner_utils::create_ledger_pruner(ledger_db); ledger_pruner.set_target_version(target_version); ledger_pruner.prune_inner(max_version, db_batch)?; @@ -192,7 +187,7 @@ impl LedgerPruner { )?; self.write_set_pruner .prune(db_batch, min_readable_version, current_target_version)?; - self.state_value_pruner + self.version_data_pruner .prune(db_batch, min_readable_version, current_target_version)?; self.event_store_pruner .prune(db_batch, min_readable_version, current_target_version)?; diff --git a/storage/aptosdb/src/pruner/ledger_store/mod.rs b/storage/aptosdb/src/pruner/ledger_store/mod.rs index 09394b5c4749b..d0f57aa5727f5 100644 --- a/storage/aptosdb/src/pruner/ledger_store/mod.rs +++ b/storage/aptosdb/src/pruner/ledger_store/mod.rs @@ -2,3 +2,4 @@ // SPDX-License-Identifier: Apache-2.0 pub(crate) mod ledger_store_pruner; +pub(crate) mod version_data_pruner; diff --git a/storage/aptosdb/src/pruner/ledger_store/version_data_pruner.rs b/storage/aptosdb/src/pruner/ledger_store/version_data_pruner.rs new file mode 100644 index 0000000000000..672f9bd84f88e --- /dev/null +++ b/storage/aptosdb/src/pruner/ledger_store/version_data_pruner.rs @@ -0,0 +1,28 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{pruner::db_sub_pruner::DBSubPruner, schema::version_data::VersionDataSchema}; +use aptos_schemadb::SchemaBatch; + +#[derive(Debug)] +pub struct VersionDataPruner {} + +impl DBSubPruner for VersionDataPruner { + fn prune( + &self, + db_batch: &mut SchemaBatch, + min_readable_version: u64, + target_version: u64, + ) -> anyhow::Result<()> { + for version in min_readable_version..target_version { + db_batch.delete::(&version)?; + } + Ok(()) + } +} + +impl VersionDataPruner { + pub(in crate::pruner) fn new() -> Self { + VersionDataPruner {} + } +} diff --git a/storage/aptosdb/src/pruner/mod.rs b/storage/aptosdb/src/pruner/mod.rs index f05ea4004e440..c09e264e6ab33 100644 --- a/storage/aptosdb/src/pruner/mod.rs +++ b/storage/aptosdb/src/pruner/mod.rs @@ -9,6 +9,8 @@ pub(crate) mod ledger_pruner_worker; pub(crate) mod ledger_store; pub(crate) mod pruner_manager; pub mod pruner_utils; +pub(crate) mod state_kv_pruner; +pub(crate) mod state_kv_pruner_worker; pub(crate) mod state_merkle_pruner_worker; pub(crate) mod state_store; pub(crate) mod transaction_store; @@ -16,4 +18,5 @@ pub(crate) mod transaction_store; // This module provides `Pruner` which manages a thread pruning old data in the background and is // meant to be triggered by other threads as they commit new data to the DB. pub(crate) mod ledger_pruner_manager; +pub(crate) mod state_kv_pruner_manager; pub(crate) mod state_merkle_pruner_manager; diff --git a/storage/aptosdb/src/pruner/pruner_utils.rs b/storage/aptosdb/src/pruner/pruner_utils.rs index 4833cb39690f1..3352457d89e0a 100644 --- a/storage/aptosdb/src/pruner/pruner_utils.rs +++ b/storage/aptosdb/src/pruner/pruner_utils.rs @@ -6,9 +6,10 @@ use crate::{ pruner::{ ledger_store::ledger_store_pruner::LedgerPruner, + state_kv_pruner::StateKvPruner, state_store::{generics::StaleNodeIndexSchemaTrait, StateMerklePruner}, }, - EventStore, StateStore, TransactionStore, + EventStore, TransactionStore, }; use aptos_jellyfish_merkle::StaleNodeIndex; use aptos_schemadb::{schema::KeyCodec, DB}; @@ -25,14 +26,15 @@ where } /// A utility function to instantiate the ledger pruner -pub(crate) fn create_ledger_pruner( - ledger_db: Arc, - state_store: Arc, -) -> Arc { +pub(crate) fn create_ledger_pruner(ledger_db: Arc) -> Arc { Arc::new(LedgerPruner::new( Arc::clone(&ledger_db), Arc::new(TransactionStore::new(Arc::clone(&ledger_db))), Arc::new(EventStore::new(Arc::clone(&ledger_db))), - state_store, )) } + +/// A utility function to instantiate the state kv pruner. +pub(crate) fn create_state_kv_pruner(state_kv_db: Arc) -> Arc { + Arc::new(StateKvPruner::new(state_kv_db)) +} diff --git a/storage/aptosdb/src/pruner/state_kv_pruner.rs b/storage/aptosdb/src/pruner/state_kv_pruner.rs new file mode 100644 index 0000000000000..05a023fb6b5b9 --- /dev/null +++ b/storage/aptosdb/src/pruner/state_kv_pruner.rs @@ -0,0 +1,133 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + db_metadata::DbMetadataSchema, + metrics::PRUNER_LEAST_READABLE_VERSION, + pruner::{ + db_pruner::DBPruner, db_sub_pruner::DBSubPruner, + state_store::state_value_pruner::StateValuePruner, + }, + pruner_utils, + schema::db_metadata::{DbMetadataKey, DbMetadataValue}, +}; +use aptos_schemadb::{SchemaBatch, DB}; +use aptos_types::transaction::{AtomicVersion, Version}; +use std::sync::{atomic::Ordering, Arc}; + +pub const STATE_KV_PRUNER_NAME: &str = "state_kv_pruner"; + +/// Responsible for pruning state kv db. +pub(crate) struct StateKvPruner { + state_kv_db: Arc, + /// Keeps track of the target version that the pruner needs to achieve. + target_version: AtomicVersion, + min_readable_version: AtomicVersion, + state_value_pruner: Arc, +} + +impl DBPruner for StateKvPruner { + fn name(&self) -> &'static str { + STATE_KV_PRUNER_NAME + } + + fn prune(&self, max_versions: usize) -> anyhow::Result { + if !self.is_pruning_pending() { + return Ok(self.min_readable_version()); + } + + let mut db_batch = SchemaBatch::new(); + let current_target_version = self.prune_inner(max_versions, &mut db_batch)?; + self.save_min_readable_version(current_target_version, &db_batch)?; + self.state_kv_db.write_schemas(db_batch)?; + self.record_progress(current_target_version); + + Ok(current_target_version) + } + + fn save_min_readable_version( + &self, + version: Version, + batch: &SchemaBatch, + ) -> anyhow::Result<()> { + batch.put::( + &DbMetadataKey::StateKvPrunerProgress, + &DbMetadataValue::Version(version), + ) + } + + fn initialize_min_readable_version(&self) -> anyhow::Result { + Ok(self + .state_kv_db + .get::(&DbMetadataKey::StateKvPrunerProgress)? + .map_or(0, |v| v.expect_version())) + } + + fn min_readable_version(&self) -> Version { + self.min_readable_version.load(Ordering::Relaxed) + } + + fn set_target_version(&self, target_version: Version) { + self.target_version.store(target_version, Ordering::Relaxed) + } + + fn target_version(&self) -> Version { + self.target_version.load(Ordering::Relaxed) + } + + fn record_progress(&self, min_readable_version: Version) { + self.min_readable_version + .store(min_readable_version, Ordering::Relaxed); + PRUNER_LEAST_READABLE_VERSION + .with_label_values(&["state_kv_pruner"]) + .set(min_readable_version as i64); + } + + /// (For tests only.) Updates the minimal readable version kept by pruner. + fn testonly_update_min_version(&self, version: Version) { + self.min_readable_version.store(version, Ordering::Relaxed) + } +} + +impl StateKvPruner { + pub fn new(state_kv_db: Arc) -> Self { + let pruner = StateKvPruner { + state_kv_db: Arc::clone(&state_kv_db), + target_version: AtomicVersion::new(0), + min_readable_version: AtomicVersion::new(0), + state_value_pruner: Arc::new(StateValuePruner::new(state_kv_db)), + }; + pruner.initialize(); + pruner + } + + /// Prunes the genesis transaction and saves the db alterations to the given change set + pub fn prune_genesis(state_kv_db: Arc, db_batch: &mut SchemaBatch) -> anyhow::Result<()> { + let target_version = 1; // The genesis version is 0. Delete [0,1) (exclusive) + let max_version = 1; // We should only be pruning a single version + + let state_kv_pruner = pruner_utils::create_state_kv_pruner(state_kv_db); + state_kv_pruner.set_target_version(target_version); + state_kv_pruner.prune_inner(max_version, db_batch)?; + + Ok(()) + } + + fn prune_inner( + &self, + max_versions: usize, + db_batch: &mut SchemaBatch, + ) -> anyhow::Result { + let min_readable_version = self.min_readable_version(); + + let current_target_version = self.get_current_batch_target(max_versions as Version); + if current_target_version < min_readable_version { + return Ok(min_readable_version); + } + + self.state_value_pruner + .prune(db_batch, min_readable_version, current_target_version)?; + + Ok(current_target_version) + } +} diff --git a/storage/aptosdb/src/pruner/state_kv_pruner_manager.rs b/storage/aptosdb/src/pruner/state_kv_pruner_manager.rs new file mode 100644 index 0000000000000..627f219ec690b --- /dev/null +++ b/storage/aptosdb/src/pruner/state_kv_pruner_manager.rs @@ -0,0 +1,147 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + metrics::{PRUNER_BATCH_SIZE, PRUNER_WINDOW}, + pruner::{ + db_pruner::DBPruner, pruner_manager::PrunerManager, state_kv_pruner::StateKvPruner, + state_kv_pruner_worker::StateKvPrunerWorker, + }, + pruner_utils, +}; +use aptos_config::config::StateKvPrunerConfig; +use aptos_infallible::Mutex; +use aptos_schemadb::DB; +use aptos_types::transaction::Version; +use std::{sync::Arc, thread::JoinHandle}; + +/// The `PrunerManager` for `StateKvPruner`. +pub(crate) struct StateKvPrunerManager { + pruner_enabled: bool, + /// DB version window, which dictates how many version of state values to keep. + prune_window: Version, + /// State kv pruner. Is always initialized regardless if the pruner is enabled to keep tracks + /// of the min_readable_version. + pruner: Arc, + /// Wrapper class of the state kv pruner. + pruner_worker: Arc, + /// The worker thread handle for state_kv_pruner, created upon Pruner instance construction and + /// joined upon its destruction. It is `None` when the state kv pruner is not enabled or it only + /// becomes `None` after joined in `drop()`. + worker_thread: Option>, + /// We send a batch of version to the underlying pruners for performance reason. This tracks the + /// last version we sent to the pruners. Will only be set if the pruner is enabled. + pub(crate) last_version_sent_to_pruner: Arc>, + /// Ideal batch size of the versions to be sent to the state kv pruner. + pruning_batch_size: usize, + /// latest version + latest_version: Arc>, +} + +impl PrunerManager for StateKvPrunerManager { + type Pruner = StateKvPruner; + + fn pruner(&self) -> &Self::Pruner { + &self.pruner + } + + fn is_pruner_enabled(&self) -> bool { + self.pruner_enabled + } + + fn get_prune_window(&self) -> Version { + self.prune_window + } + + fn get_min_readable_version(&self) -> Version { + self.pruner.as_ref().min_readable_version() + } + + fn get_min_viable_version(&self) -> Version { + unimplemented!() + } + + /// Sets pruner target version when necessary. + fn maybe_set_pruner_target_db_version(&self, latest_version: Version) { + *self.latest_version.lock() = latest_version; + + if self.pruner_enabled + && latest_version + >= *self.last_version_sent_to_pruner.as_ref().lock() + + self.pruning_batch_size as u64 + { + self.set_pruner_target_db_version(latest_version); + *self.last_version_sent_to_pruner.as_ref().lock() = latest_version; + } + } + + fn set_pruner_target_db_version(&self, latest_version: Version) { + assert!(self.pruner_enabled); + self.pruner_worker + .as_ref() + .set_target_db_version(latest_version.saturating_sub(self.prune_window)); + } +} + +impl StateKvPrunerManager { + /// Creates a worker thread that waits on a channel for pruning commands. + pub fn new(state_kv_db: Arc, state_kv_pruner_config: StateKvPrunerConfig) -> Self { + let state_kv_pruner = pruner_utils::create_state_kv_pruner(state_kv_db); + + if state_kv_pruner_config.enable { + PRUNER_WINDOW + .with_label_values(&["state_kv_pruner"]) + .set(state_kv_pruner_config.prune_window as i64); + + PRUNER_BATCH_SIZE + .with_label_values(&["state_kv_pruner"]) + .set(state_kv_pruner_config.batch_size as i64); + } + + let state_kv_pruner_worker = Arc::new(StateKvPrunerWorker::new( + Arc::clone(&state_kv_pruner), + state_kv_pruner_config, + )); + + let state_kv_pruner_worker_clone = Arc::clone(&state_kv_pruner_worker); + + let state_kv_pruner_worker_thread = if state_kv_pruner_config.enable { + Some( + std::thread::Builder::new() + .name("aptosdb_state_kv_pruner".into()) + .spawn(move || state_kv_pruner_worker_clone.as_ref().work()) + .expect("Creating state kv pruner thread should succeed."), + ) + } else { + None + }; + + let min_readable_version = state_kv_pruner.min_readable_version(); + + Self { + pruner_enabled: state_kv_pruner_config.enable, + prune_window: state_kv_pruner_config.prune_window, + pruner: state_kv_pruner, + pruner_worker: state_kv_pruner_worker, + worker_thread: state_kv_pruner_worker_thread, + last_version_sent_to_pruner: Arc::new(Mutex::new(min_readable_version)), + pruning_batch_size: state_kv_pruner_config.batch_size, + latest_version: Arc::new(Mutex::new(min_readable_version)), + } + } +} + +impl Drop for StateKvPrunerManager { + fn drop(&mut self) { + if self.pruner_enabled { + self.pruner_worker.stop_pruning(); + + assert!(self.worker_thread.is_some()); + self.worker_thread + .take() + .expect("State kv pruner worker thread must exist.") + .join() + .expect("State kv pruner worker thread should join peacefully."); + } + } +} diff --git a/storage/aptosdb/src/pruner/state_kv_pruner_worker.rs b/storage/aptosdb/src/pruner/state_kv_pruner_worker.rs new file mode 100644 index 0000000000000..0725d51e9983c --- /dev/null +++ b/storage/aptosdb/src/pruner/state_kv_pruner_worker.rs @@ -0,0 +1,76 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::pruner::{db_pruner::DBPruner, state_kv_pruner::StateKvPruner}; +use aptos_config::config::StateKvPrunerConfig; +use aptos_logger::{ + error, + prelude::{sample, SampleRate}, +}; +use aptos_types::transaction::Version; +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::sleep, + time::Duration, +}; + +/// Maintains the state kv pruner and periodically calls the db_pruner's prune method to prune the DB. +/// This also exposes API to report the progress to the parent thread. +pub struct StateKvPrunerWorker { + /// The worker will sleep for this period of time after pruning each batch. + pruning_time_interval_in_ms: u64, + /// State kv pruner. + pruner: Arc, + /// Max number of versions to prune per batch. + max_versions_to_prune_per_batch: u64, + /// Indicates whether the pruning loop should be running. Will only be set to true on pruner + /// destruction. + quit_worker: AtomicBool, +} + +impl StateKvPrunerWorker { + pub(crate) fn new( + state_kv_pruner: Arc, + state_kv_pruner_config: StateKvPrunerConfig, + ) -> Self { + Self { + pruning_time_interval_in_ms: if cfg!(test) { 100 } else { 1 }, + pruner: state_kv_pruner, + max_versions_to_prune_per_batch: state_kv_pruner_config.batch_size as u64, + quit_worker: AtomicBool::new(false), + } + } + + // Loop that does the real pruning job. + pub(crate) fn work(&self) { + while !self.quit_worker.load(Ordering::Relaxed) { + let pruner_result = self + .pruner + .prune(self.max_versions_to_prune_per_batch as usize); + if pruner_result.is_err() { + sample!( + SampleRate::Duration(Duration::from_secs(1)), + error!(error = ?pruner_result.err().unwrap(), + "State kv pruner has error.") + ); + sleep(Duration::from_millis(self.pruning_time_interval_in_ms)); + return; + } + if !self.pruner.is_pruning_pending() { + sleep(Duration::from_millis(self.pruning_time_interval_in_ms)); + } + } + } + + pub fn set_target_db_version(&self, target_db_version: Version) { + assert!(target_db_version >= self.pruner.target_version()); + self.pruner.set_target_version(target_db_version); + } + + pub fn stop_pruning(&self) { + self.quit_worker.store(true, Ordering::Relaxed); + } +} diff --git a/storage/aptosdb/src/pruner/state_store/state_value_pruner.rs b/storage/aptosdb/src/pruner/state_store/state_value_pruner.rs index 98d7effefc033..ef84ed96448bc 100644 --- a/storage/aptosdb/src/pruner/state_store/state_value_pruner.rs +++ b/storage/aptosdb/src/pruner/state_store/state_value_pruner.rs @@ -1,11 +1,15 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::{pruner::db_sub_pruner::DBSubPruner, StateStore}; -use aptos_schemadb::SchemaBatch; + +use crate::{ + pruner::db_sub_pruner::DBSubPruner, + schema::{stale_state_value_index::StaleStateValueIndexSchema, state_value::StateValueSchema}, +}; +use aptos_schemadb::{ReadOptions, SchemaBatch, DB}; use std::sync::Arc; pub struct StateValuePruner { - state_store: Arc, + state_kv_db: Arc, } impl DBSubPruner for StateValuePruner { @@ -15,14 +19,24 @@ impl DBSubPruner for StateValuePruner { min_readable_version: u64, target_version: u64, ) -> anyhow::Result<()> { - self.state_store - .prune_state_values(min_readable_version, target_version, db_batch)?; + let mut iter = self + .state_kv_db + .iter::(ReadOptions::default())?; + iter.seek(&min_readable_version)?; + for item in iter { + let (index, _) = item?; + if index.stale_since_version > target_version { + break; + } + db_batch.delete::(&index)?; + db_batch.delete::(&(index.state_key, index.version))?; + } Ok(()) } } impl StateValuePruner { - pub(in crate::pruner) fn new(state_store: Arc) -> Self { - StateValuePruner { state_store } + pub(in crate::pruner) fn new(state_kv_db: Arc) -> Self { + StateValuePruner { state_kv_db } } } diff --git a/storage/aptosdb/src/pruner/state_store/test.rs b/storage/aptosdb/src/pruner/state_store/test.rs index 551cfc58bb781..b8eaca55081d4 100644 --- a/storage/aptosdb/src/pruner/state_store/test.rs +++ b/storage/aptosdb/src/pruner/state_store/test.rs @@ -7,9 +7,9 @@ use crate::{ stale_state_value_index::StaleStateValueIndexSchema, state_store::StateStore, test_helper::{arb_state_kv_sets, update_store}, - AptosDB, LedgerPrunerManager, PrunerManager, StateMerklePrunerManager, + AptosDB, PrunerManager, StateKvPrunerManager, StateMerklePrunerManager, }; -use aptos_config::config::{LedgerPrunerConfig, StateMerklePrunerConfig}; +use aptos_config::config::{StateKvPrunerConfig, StateMerklePrunerConfig}; use aptos_crypto::HashValue; use aptos_schemadb::{ReadOptions, SchemaBatch, DB}; use aptos_storage_interface::{jmt_update_refs, jmt_updates, DbReader}; @@ -378,16 +378,11 @@ fn verify_state_value_pruner(inputs: Vec)>>) { let mut version = 0; let mut current_state_values = HashMap::new(); - let pruner = LedgerPrunerManager::new( - Arc::clone(&db.ledger_db), - Arc::clone(store), - LedgerPrunerConfig { - enable: true, - prune_window: 0, - batch_size: 1, - user_pruning_window_offset: 0, - }, - ); + let pruner = StateKvPrunerManager::new(Arc::clone(&db.ledger_db), StateKvPrunerConfig { + enable: true, + prune_window: 0, + batch_size: 1, + }); for batch in inputs { update_store(store, batch.clone().into_iter(), version); for (k, v) in batch.iter() { @@ -423,7 +418,7 @@ fn verify_state_value<'a, I: Iterator(&StaleStateValueIndex { stale_since_version: version, version: *old_version, @@ -433,10 +428,4 @@ fn verify_state_value<'a, I: Iterator) { let transaction_store = &aptos_db.transaction_store; let num_write_sets = write_sets.len(); - let pruner = LedgerPrunerManager::new( - Arc::clone(&aptos_db.ledger_db), - Arc::clone(&aptos_db.state_store), - LedgerPrunerConfig { - enable: true, - prune_window: 0, - batch_size: 1, - user_pruning_window_offset: 0, - }, - ); + let pruner = LedgerPrunerManager::new(Arc::clone(&aptos_db.ledger_db), LedgerPrunerConfig { + enable: true, + prune_window: 0, + batch_size: 1, + user_pruning_window_offset: 0, + }); // write sets let batch = SchemaBatch::new(); @@ -102,21 +102,25 @@ fn verify_txn_store_pruner( &txns, ); + let batch = SchemaBatch::new(); + for i in 0..=num_transaction as u64 { + let usage = StateStorageUsage::zero(); + batch.put::(&i, &usage.into()).unwrap(); + } + aptos_db.ledger_db.write_schemas(batch).unwrap(); + // start pruning transactions batches of size step_size and verify transactions have been pruned // from DB for i in (0..=num_transaction).step_by(step_size) { // Initialize a pruner in every iteration to test the min_readable_version initialization // logic. - let pruner = LedgerPrunerManager::new( - Arc::clone(&aptos_db.ledger_db), - Arc::clone(&aptos_db.state_store), - LedgerPrunerConfig { + let pruner = + LedgerPrunerManager::new(Arc::clone(&aptos_db.ledger_db), LedgerPrunerConfig { enable: true, prune_window: 0, batch_size: 1, user_pruning_window_offset: 0, - }, - ); + }); pruner .wake_and_wait_pruner(i as u64 /* latest_version */) .unwrap(); @@ -137,6 +141,7 @@ fn verify_txn_store_pruner( .get_transaction_proof(j as u64, ledger_version) .is_err()); } + assert!(aptos_db.state_store.get_usage(Some(j as u64)).is_err()); } // ensure all other are valid in DB for j in i..num_transaction { @@ -148,6 +153,7 @@ fn verify_txn_store_pruner( ledger_version, ); aptos_db.get_accumulator_summary(j as Version).unwrap(); + assert!(aptos_db.state_store.get_usage(Some(j as u64)).is_ok()); } verify_transaction_accumulator_pruned(&ledger_store, i as u64); } diff --git a/storage/aptosdb/src/schema/db_metadata/mod.rs b/storage/aptosdb/src/schema/db_metadata/mod.rs index 25280b754ecea..0b3ea6e54579a 100644 --- a/storage/aptosdb/src/schema/db_metadata/mod.rs +++ b/storage/aptosdb/src/schema/db_metadata/mod.rs @@ -47,6 +47,7 @@ pub enum DbMetadataKey { LedgerPrunerProgress, StateMerklePrunerProgress, EpochEndingStateMerklePrunerProgress, + StateKvPrunerProgress, StateSnapshotRestoreProgress(Version), LedgerCommitProgress, StateKVCommitProgress, diff --git a/storage/aptosdb/src/state_store/mod.rs b/storage/aptosdb/src/state_store/mod.rs index e5d6f73ea338c..23edb6a426be7 100644 --- a/storage/aptosdb/src/state_store/mod.rs +++ b/storage/aptosdb/src/state_store/mod.rs @@ -19,7 +19,7 @@ use crate::{ }, version_data::VersionDataSchema, AptosDbError, LedgerStore, StaleNodeIndexCrossEpochSchema, StaleNodeIndexSchema, - StateMerklePrunerManager, TransactionStore, OTHER_TIMERS_SECONDS, + StateKvPrunerManager, StateMerklePrunerManager, TransactionStore, OTHER_TIMERS_SECONDS, }; use anyhow::{ensure, format_err, Result}; use aptos_crypto::{ @@ -86,6 +86,7 @@ pub(crate) struct StateDb { pub state_kv_db: Arc, pub state_merkle_pruner: StateMerklePrunerManager, pub epoch_snapshot_pruner: StateMerklePrunerManager, + pub state_kv_pruner: StateKvPrunerManager, } pub(crate) struct StateStore { @@ -278,6 +279,7 @@ impl StateStore { state_kv_db: Arc, state_merkle_pruner: StateMerklePrunerManager, epoch_snapshot_pruner: StateMerklePrunerManager, + state_kv_pruner: StateKvPrunerManager, buffered_state_target_items: usize, max_nodes_per_lru_cache_shard: usize, hack_for_tests: bool, @@ -348,6 +350,7 @@ impl StateStore { state_kv_db, state_merkle_pruner, epoch_snapshot_pruner, + state_kv_pruner, }); let buffered_state = Mutex::new( Self::create_buffered_state_from_latest_snapshot( @@ -383,12 +386,17 @@ impl StateStore { ); let state_merkle_db = Arc::new(StateMerkleDb::new(arc_state_merkle_rocksdb, 0)); let state_kv_db = Arc::clone(&ledger_db); + let state_kv_pruner = StateKvPrunerManager::new( + Arc::clone(&state_kv_db), + NO_OP_STORAGE_PRUNER_CONFIG.state_kv_pruner_config, + ); let state_db = Arc::new(StateDb { ledger_db, state_merkle_db, state_kv_db, state_merkle_pruner, epoch_snapshot_pruner, + state_kv_pruner, }); let buffered_state = Self::create_buffered_state_from_latest_snapshot( &state_db, 0, /*hack_for_tests=*/ false, @@ -809,34 +817,6 @@ impl StateStore { )?)) } - /// Prune the stale state value schema generated between a range of version in (begin, end] - pub fn prune_state_values( - &self, - begin: Version, - end: Version, - db_batch: &SchemaBatch, - ) -> Result<()> { - // TODO(grao): Replace ledger_db by state_kv_db. - let mut iter = self - .state_db - .ledger_db - .iter::(ReadOptions::default())?; - iter.seek(&begin)?; - for item in iter { - let (index, _) = item?; - if index.stale_since_version > end { - break; - } - // Prune the stale state value index itself first. - db_batch.delete::(&index)?; - db_batch.delete::(&(index.state_key, index.version))?; - } - for version in begin..end { - db_batch.delete::(&version)?; - } - Ok(()) - } - #[cfg(test)] pub fn get_all_jmt_nodes_referenced( &self, From ddec83654581bf351bc472a92a3be6d3a3db6338 Mon Sep 17 00:00:00 2001 From: Guoteng Rao <3603304+grao1991@users.noreply.github.com> Date: Mon, 6 Mar 2023 14:42:53 -0800 Subject: [PATCH 3/3] [Storage] Change StateValueWriter to use state_kv_db. --- storage/aptosdb/src/state_store/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/storage/aptosdb/src/state_store/mod.rs b/storage/aptosdb/src/state_store/mod.rs index 23edb6a426be7..9a07693d2381d 100644 --- a/storage/aptosdb/src/state_store/mod.rs +++ b/storage/aptosdb/src/state_store/mod.rs @@ -851,7 +851,6 @@ impl StateValueWriter for StateStore { let _timer = OTHER_TIMERS_SECONDS .with_label_values(&["state_value_writer_write_chunk"]) .start_timer(); - // TODO(grao): Support state kv db here. let batch = SchemaBatch::new(); node_batch .par_iter() @@ -861,7 +860,7 @@ impl StateValueWriter for StateStore { &DbMetadataKey::StateSnapshotRestoreProgress(version), &DbMetadataValue::StateSnapshotProgress(progress), )?; - self.ledger_db.write_schemas(batch) + self.state_kv_db.write_schemas(batch) } fn write_usage(&self, version: Version, usage: StateStorageUsage) -> Result<()> { @@ -871,7 +870,7 @@ impl StateValueWriter for StateStore { fn get_progress(&self, version: Version) -> Result> { Ok(self - .ledger_db + .state_kv_db .get::(&DbMetadataKey::StateSnapshotRestoreProgress(version))? .map(|v| v.expect_state_snapshot_progress())) }