Skip to content

Commit

Permalink
[Storage][Pruner] Split state k/v pruning to a separate pruner.
Browse files Browse the repository at this point in the history
  • Loading branch information
grao1991 committed Mar 4, 2023
1 parent 759b85d commit 3bd0238
Show file tree
Hide file tree
Showing 15 changed files with 187 additions and 119 deletions.
39 changes: 34 additions & 5 deletions config/src/config/storage_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ pub const NO_OP_STORAGE_PRUNER_CONFIG: PrunerConfig = PrunerConfig {
prune_window: 0,
batch_size: 0,
},
state_kv_pruner_config: StateKvPrunerConfig {
enable: false,
prune_window: 0,
batch_size: 0,
},
};

#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
Expand All @@ -140,13 +145,12 @@ pub struct LedgerPrunerConfig {
#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct StateMerklePrunerConfig {
/// Boolean to enable/disable the state store pruner. The state pruner is responsible for
/// pruning state tree nodes.
/// Boolean to enable/disable the state merkle pruner. The state merkle pruner is responsible
/// for pruning state tree nodes.
pub enable: bool,
/// The size of the window should be calculated based on disk space availability and system TPS.
/// Window size in versions.
pub prune_window: u64,
/// Similar to the variable above but for state store pruner. It means the number of stale
/// nodes to prune a time.
/// Number of stale nodes to prune a time.
pub batch_size: usize,
}

Expand All @@ -161,6 +165,19 @@ pub struct EpochSnapshotPrunerConfig {
pub batch_size: usize,
}

#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct StateKvPrunerConfig {
/// Boolean to enable/disable the state kv pruner. The state pruner is responsible for
/// pruning state tree nodes.
pub enable: bool,
/// Window size in versions.
pub prune_window: u64,
/// Similar to the variable above but for state kv pruner. It means the number of versions to
/// prune a time.
pub batch_size: usize,
}

// Config for the epoch ending state pruner is actually in the same format as the state merkle
// pruner, but it has it's own type hence separate default values. This converts it to the same
// type, to use the same pruner implementation (but parameterized on the stale node index DB schema).
Expand All @@ -180,6 +197,7 @@ pub struct PrunerConfig {
pub ledger_pruner_config: LedgerPrunerConfig,
pub state_merkle_pruner_config: StateMerklePrunerConfig,
pub epoch_snapshot_pruner_config: EpochSnapshotPrunerConfig,
pub state_kv_pruner_config: StateKvPrunerConfig,
}

impl Default for LedgerPrunerConfig {
Expand Down Expand Up @@ -230,6 +248,17 @@ impl Default for EpochSnapshotPrunerConfig {
}
}

impl Default for StateKvPrunerConfig {
fn default() -> Self {
Self {
// TODO(grao): Keep it the same as ledger pruner config for now, will revisit later.
enable: true,
prune_window: 150_000_000,
batch_size: 500,
}
}
}

impl Default for StorageConfig {
fn default() -> StorageConfig {
StorageConfig {
Expand Down
17 changes: 16 additions & 1 deletion execution/executor-benchmark/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
// SPDX-License-Identifier: Apache-2.0

use aptos_config::config::{
EpochSnapshotPrunerConfig, LedgerPrunerConfig, PrunerConfig, StateMerklePrunerConfig,
EpochSnapshotPrunerConfig, LedgerPrunerConfig, PrunerConfig, StateKvPrunerConfig,
StateMerklePrunerConfig,
};
use aptos_executor::block_executor::TransactionBlockExecutor;
use aptos_executor_benchmark::{
Expand All @@ -29,6 +30,9 @@ struct PrunerOpt {
#[structopt(long)]
enable_ledger_pruner: bool,

#[structopt(long)]
enable_state_kv_pruner: bool,

#[structopt(long, default_value = "100000")]
state_prune_window: u64,

Expand All @@ -38,6 +42,9 @@ struct PrunerOpt {
#[structopt(long, default_value = "100000")]
ledger_prune_window: u64,

#[structopt(long, default_value = "100000")]
state_kv_prune_window: u64,

#[structopt(long, default_value = "500")]
ledger_pruning_batch_size: usize,

Expand All @@ -46,6 +53,9 @@ struct PrunerOpt {

#[structopt(long, default_value = "500")]
epoch_snapshot_pruning_batch_size: usize,

#[structopt(long, default_value = "500")]
state_kv_pruning_batch_size: usize,
}

impl PrunerOpt {
Expand All @@ -67,6 +77,11 @@ impl PrunerOpt {
batch_size: self.ledger_pruning_batch_size,
user_pruning_window_offset: 0,
},
state_kv_pruner_config: StateKvPrunerConfig {
enable: self.enable_state_kv_pruner,
prune_window: self.state_kv_prune_window,
batch_size: self.state_kv_pruning_batch_size,
},
}
}
}
Expand Down
16 changes: 9 additions & 7 deletions storage/aptosdb/src/aptosdb_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
};
use aptos_config::config::{
EpochSnapshotPrunerConfig, LedgerPrunerConfig, PrunerConfig, RocksdbConfigs,
StateMerklePrunerConfig, BUFFERED_STATE_TARGET_ITEMS,
StateKvPrunerConfig, StateMerklePrunerConfig, BUFFERED_STATE_TARGET_ITEMS,
DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD,
};
use aptos_crypto::{hash::CryptoHash, HashValue};
Expand Down Expand Up @@ -103,16 +103,13 @@ fn test_pruner_config() {
assert_eq!(state_merkle_pruner.is_pruner_enabled(), enable);
assert_eq!(state_merkle_pruner.get_prune_window(), 20);

let ledger_pruner = LedgerPrunerManager::new(
Arc::clone(&aptos_db.ledger_db),
Arc::clone(&aptos_db.state_store),
LedgerPrunerConfig {
let ledger_pruner =
LedgerPrunerManager::new(Arc::clone(&aptos_db.ledger_db), LedgerPrunerConfig {
enable,
prune_window: 100,
batch_size: 1,
user_pruning_window_offset: 0,
},
);
});
assert_eq!(ledger_pruner.is_pruner_enabled(), enable);
assert_eq!(ledger_pruner.get_prune_window(), 100);
}
Expand Down Expand Up @@ -203,6 +200,11 @@ pub fn test_state_merkle_pruning_impl(
prune_window: 10,
batch_size: 1,
},
state_kv_pruner_config: StateKvPrunerConfig {
enable: true,
prune_window: 10,
batch_size: 1,
},
},
RocksdbConfigs::default(),
false, /* enable_indexer */
Expand Down
55 changes: 42 additions & 13 deletions storage/aptosdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ use crate::{
pruner::{
db_pruner::DBPruner, ledger_pruner_manager::LedgerPrunerManager,
ledger_store::ledger_store_pruner::LedgerPruner, pruner_manager::PrunerManager,
pruner_utils, state_merkle_pruner_manager::StateMerklePrunerManager,
state_store::StateMerklePruner,
pruner_utils, state_kv_pruner::StateKvPruner,
state_kv_pruner_manager::StateKvPrunerManager,
state_merkle_pruner_manager::StateMerklePrunerManager, state_store::StateMerklePruner,
},
schema::*,
stale_node_index::StaleNodeIndexSchema,
Expand Down Expand Up @@ -289,20 +290,24 @@ impl AptosDB {
Arc::clone(&arc_state_merkle_rocksdb),
pruner_config.epoch_snapshot_pruner_config.into(),
);
let state_kv_pruner = StateKvPrunerManager::new(
Arc::clone(&arc_state_kv_rocksdb),
pruner_config.state_kv_pruner_config,
);
let state_store = Arc::new(StateStore::new(
Arc::clone(&arc_ledger_rocksdb),
Arc::clone(&arc_state_merkle_rocksdb),
Arc::clone(&arc_state_kv_rocksdb),
state_merkle_pruner,
epoch_snapshot_pruner,
state_kv_pruner,
buffered_state_target_items,
max_nodes_per_lru_cache_shard,
hack_for_tests,
));
// TODO(grao): Handle state kv db pruning.

let ledger_pruner = LedgerPrunerManager::new(
Arc::clone(&arc_ledger_rocksdb),
Arc::clone(&state_store),
pruner_config.ledger_pruner_config,
);

Expand Down Expand Up @@ -1003,6 +1008,18 @@ impl AptosDB {
)
}
}

fn error_if_state_kv_pruned(&self, data_type: &str, version: Version) -> Result<()> {
let min_readable_version = self.state_store.state_kv_pruner.get_min_readable_version();
ensure!(
version >= min_readable_version,
"{} at version {} is pruned, min available version is {}.",
data_type,
version,
min_readable_version
);
Ok(())
}
}

impl DbReader for AptosDB {
Expand All @@ -1025,7 +1042,7 @@ impl DbReader for AptosDB {
version: Version,
) -> Result<Box<dyn Iterator<Item = Result<(StateKey, StateValue)>> + '_>> {
gauged_api("get_prefixed_state_value_iterator", || {
self.error_if_ledger_pruned("State", version)?;
self.error_if_state_kv_pruned("StateValue", version)?;

Ok(Box::new(
self.state_store
Expand Down Expand Up @@ -1427,7 +1444,7 @@ impl DbReader for AptosDB {
version: Version,
) -> Result<Option<StateValue>> {
gauged_api("get_state_value_by_version", || {
self.error_if_ledger_pruned("State", version)?;
self.error_if_state_kv_pruned("StateValue", version)?;

self.state_store
.get_state_value_by_version(state_store_key, version)
Expand Down Expand Up @@ -1875,10 +1892,14 @@ impl DbWriter for AptosDB {
let last_version = first_version + num_txns - 1;
COMMITTED_TXNS.inc_by(num_txns);
LATEST_TXN_VERSION.set(last_version as i64);
// Activate the ledger pruner. Note the state merkle pruner is activated when
// state snapshots are persisted in their async thread.
// Activate the ledger pruner and state kv pruner.
// Note the state merkle pruner is activated when state snapshots are persisted
// in their async thread.
self.ledger_pruner
.maybe_set_pruner_target_db_version(last_version);
self.state_store
.state_kv_pruner
.maybe_set_pruner_target_db_version(last_version);
}

// Note: this must happen after txns have been saved to db because types can be newly
Expand Down Expand Up @@ -1993,11 +2014,7 @@ impl DbWriter for AptosDB {
)?;

// Delete the genesis transaction
LedgerPruner::prune_genesis(
self.ledger_db.clone(),
self.state_store.clone(),
&mut batch,
)?;
LedgerPruner::prune_genesis(self.ledger_db.clone(), &mut batch)?;

self.ledger_pruner
.pruner()
Expand All @@ -2018,11 +2035,23 @@ impl DbWriter for AptosDB {
.pruner()
.save_min_readable_version(version, &state_merkle_batch)?;

let mut state_kv_batch = SchemaBatch::new();
StateKvPruner::prune_genesis(
self.state_store.state_kv_db.clone(),
&mut state_kv_batch,
)?;
self.state_store
.state_kv_pruner
.pruner()
.save_min_readable_version(version, &state_kv_batch)?;

// Apply the change set writes to the database (atomically) and update in-memory state
self.ledger_db.clone().write_schemas(batch)?;
self.state_merkle_db
.clone()
.write_schemas(state_merkle_batch)?;
self.state_kv_db.clone().write_schemas(state_kv_batch)?;

restore_utils::update_latest_ledger_info(self.ledger_store.clone(), ledger_infos)?;
self.state_store.reset();

Expand Down
16 changes: 6 additions & 10 deletions storage/aptosdb/src/pruner/event_store/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,12 @@ fn verify_event_store_pruner(events: Vec<Vec<ContractEvent>>) {
}
aptos_db.ledger_db.write_schemas(batch).unwrap();

let pruner = LedgerPrunerManager::new(
Arc::clone(&aptos_db.ledger_db),
Arc::clone(&aptos_db.state_store),
LedgerPrunerConfig {
enable: true,
prune_window: 0,
batch_size: 1,
user_pruning_window_offset: 0,
},
);
let pruner = LedgerPrunerManager::new(Arc::clone(&aptos_db.ledger_db), LedgerPrunerConfig {
enable: true,
prune_window: 0,
batch_size: 1,
user_pruning_window_offset: 0,
});
// start pruning events batches of size 2 and verify transactions have been pruned from DB
for i in (0..=num_versions).step_by(2) {
pruner
Expand Down
10 changes: 3 additions & 7 deletions storage/aptosdb/src/pruner/ledger_pruner_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
db_pruner::DBPruner, ledger_pruner_worker::LedgerPrunerWorker,
ledger_store::ledger_store_pruner::LedgerPruner, pruner_manager::PrunerManager,
},
pruner_utils, StateStore,
pruner_utils,
};
use aptos_config::config::LedgerPrunerConfig;
use aptos_infallible::Mutex;
Expand Down Expand Up @@ -99,12 +99,8 @@ impl PrunerManager for LedgerPrunerManager {

impl LedgerPrunerManager {
/// Creates a worker thread that waits on a channel for pruning commands.
pub fn new(
ledger_rocksdb: Arc<DB>,
state_store: Arc<StateStore>,
ledger_pruner_config: LedgerPrunerConfig,
) -> Self {
let ledger_pruner = pruner_utils::create_ledger_pruner(ledger_rocksdb, state_store);
pub fn new(ledger_rocksdb: Arc<DB>, ledger_pruner_config: LedgerPrunerConfig) -> Self {
let ledger_pruner = pruner_utils::create_ledger_pruner(ledger_rocksdb);

if ledger_pruner_config.enable {
PRUNER_WINDOW
Expand Down
Loading

0 comments on commit 3bd0238

Please sign in to comment.