Skip to content

Commit

Permalink
[Storage][Sharding] Sharded state merkle pruner. (#7857)
Browse files Browse the repository at this point in the history
  • Loading branch information
grao1991 authored Jun 21, 2023
1 parent 1b436b2 commit 63722ca
Show file tree
Hide file tree
Showing 18 changed files with 401 additions and 220 deletions.
1 change: 1 addition & 0 deletions storage/aptosdb/src/aptosdb_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ proptest! {

#[test]
fn test_state_merkle_pruning(input in arb_blocks_to_commit()) {
aptos_logger::Logger::new().init();
test_state_merkle_pruning_impl(input);
}
}
20 changes: 1 addition & 19 deletions storage/aptosdb/src/pruner/db_pruner.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,18 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use anyhow::{Context, Result};
use aptos_logger::info;
use anyhow::Result;
use aptos_types::transaction::Version;
use std::cmp::min;

/// Defines the trait for pruner for different DB
pub trait DBPruner: Send + Sync {
/// Find out the first undeleted item in the stale node index.
fn initialize(&self) {
let min_readable_version = self
.initialize_min_readable_version()
.context(self.name())
.expect("Pruner failed to initialize.");
info!(
min_readable_version = min_readable_version,
"{} initialized.",
self.name()
);
self.record_progress(min_readable_version);
}

fn name(&self) -> &'static str;

/// Performs the actual pruning, a target version is passed, which is the target the pruner
/// tries to prune.
fn prune(&self, batch_size: usize) -> Result<Version>;

/// Initializes the least readable version stored in underlying DB storage
fn initialize_min_readable_version(&self) -> Result<Version>;

/// Returns the progress of the pruner.
fn progress(&self) -> Version;

Expand Down
6 changes: 2 additions & 4 deletions storage/aptosdb/src/pruner/event_store/event_store_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
pruner::{
db_sub_pruner::DBSubPruner, pruner_utils::get_or_initialize_ledger_subpruner_progress,
},
pruner::{db_sub_pruner::DBSubPruner, pruner_utils::get_or_initialize_subpruner_progress},
schema::db_metadata::{DbMetadataKey, DbMetadataSchema, DbMetadataValue},
EventStore,
};
Expand Down Expand Up @@ -38,7 +36,7 @@ impl EventStorePruner {
event_db: Arc<DB>,
metadata_progress: Version,
) -> Result<Self> {
let progress = get_or_initialize_ledger_subpruner_progress(
let progress = get_or_initialize_subpruner_progress(
&event_db,
&DbMetadataKey::EventPrunerProgress,
metadata_progress,
Expand Down
19 changes: 11 additions & 8 deletions storage/aptosdb/src/pruner/ledger_store/ledger_store_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{
EventStore, TransactionStore,
};
use anyhow::Result;
use aptos_logger::info;
use aptos_types::transaction::{AtomicVersion, Version};
use std::{
cmp::min,
Expand Down Expand Up @@ -66,10 +67,6 @@ impl DBPruner for LedgerPruner {
Ok(target_version)
}

fn initialize_min_readable_version(&self) -> Result<Version> {
self.ledger_metadata_pruner.progress()
}

fn progress(&self) -> Version {
self.progress.load(Ordering::SeqCst)
}
Expand All @@ -85,16 +82,18 @@ impl DBPruner for LedgerPruner {
self.target_version.load(Ordering::SeqCst)
}

fn record_progress(&self, min_readable_version: Version) {
self.progress.store(min_readable_version, Ordering::SeqCst);
fn record_progress(&self, progress: Version) {
self.progress.store(progress, Ordering::SeqCst);
PRUNER_VERSIONS
.with_label_values(&["ledger_pruner", "progress"])
.set(min_readable_version as i64);
.set(progress as i64);
}
}

impl LedgerPruner {
pub fn new(ledger_db: Arc<LedgerDb>) -> Result<Self> {
info!(name = LEDGER_PRUNER_NAME, "Initializing...");

let ledger_metadata_pruner = Box::new(
LedgerMetadataPruner::new(ledger_db.metadata_db_arc())
.expect("Failed to initialize ledger_metadata_pruner."),
Expand Down Expand Up @@ -143,7 +142,11 @@ impl LedgerPruner {
],
};

pruner.initialize();
info!(
name = pruner.name(),
progress = metadata_progress,
"Initialized."
);

Ok(pruner)
}
Expand Down
15 changes: 11 additions & 4 deletions storage/aptosdb/src/pruner/pruner_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ pub fn create_state_merkle_pruner<S: StaleNodeIndexSchemaTrait>(
where
StaleNodeIndex: KeyCodec<S>,
{
Arc::new(StateMerklePruner::<S>::new(Arc::clone(&state_merkle_db)))
Arc::new(
StateMerklePruner::<S>::new(Arc::clone(&state_merkle_db))
.expect("Failed to create state merkle pruner."),
)
}

/// A utility function to instantiate the ledger pruner
Expand All @@ -41,7 +44,7 @@ pub(crate) fn create_ledger_pruner(ledger_db: Arc<LedgerDb>) -> Arc<LedgerPruner

/// A utility function to instantiate the state kv pruner.
pub(crate) fn create_state_kv_pruner(state_kv_db: Arc<StateKvDb>) -> Arc<StateKvPruner> {
Arc::new(StateKvPruner::new(state_kv_db))
Arc::new(StateKvPruner::new(state_kv_db).expect("Failed to create state kv pruner."))
}

pub(crate) fn get_ledger_pruner_progress(ledger_db: &LedgerDb) -> Result<Version> {
Expand Down Expand Up @@ -78,10 +81,14 @@ pub(crate) fn get_state_merkle_pruner_progress<S: StaleNodeIndexSchemaTrait>(
where
StaleNodeIndex: KeyCodec<S>,
{
Ok(get_progress(state_merkle_db.metadata_db(), &S::tag())?.unwrap_or(0))
Ok(get_progress(
state_merkle_db.metadata_db(),
&S::progress_metadata_key(None),
)?
.unwrap_or(0))
}

pub(crate) fn get_or_initialize_ledger_subpruner_progress(
pub(crate) fn get_or_initialize_subpruner_progress(
sub_db: &DB,
progress_key: &DbMetadataKey,
metadata_progress: Version,
Expand Down
38 changes: 20 additions & 18 deletions storage/aptosdb/src/pruner/state_kv_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
state_kv_db::StateKvDb,
};
use anyhow::Result;
use aptos_logger::info;
use aptos_schemadb::SchemaBatch;
use aptos_types::transaction::{AtomicVersion, Version};
use std::sync::{atomic::Ordering, Arc};
Expand Down Expand Up @@ -43,47 +44,48 @@ impl DBPruner for StateKvPruner {
Ok(current_target_version)
}

fn initialize_min_readable_version(&self) -> anyhow::Result<Version> {
Ok(self
.state_kv_db
.metadata_db()
.get::<DbMetadataSchema>(&DbMetadataKey::StateKvPrunerProgress)?
.map_or(0, |v| v.expect_version()))
}

fn progress(&self) -> Version {
self.progress.load(Ordering::SeqCst)
}

fn set_target_version(&self, target_version: Version) {
self.target_version.store(target_version, Ordering::Relaxed);
self.target_version.store(target_version, Ordering::SeqCst);
PRUNER_VERSIONS
.with_label_values(&["state_kv_pruner", "target"])
.set(target_version as i64);
}

fn target_version(&self) -> Version {
self.target_version.load(Ordering::Relaxed)
self.target_version.load(Ordering::SeqCst)
}

fn record_progress(&self, min_readable_version: Version) {
self.progress.store(min_readable_version, Ordering::Relaxed);
fn record_progress(&self, progress: Version) {
self.progress.store(progress, Ordering::SeqCst);
PRUNER_VERSIONS
.with_label_values(&["state_kv_pruner", "progress"])
.set(min_readable_version as i64);
.set(progress as i64);
}
}

impl StateKvPruner {
pub fn new(state_kv_db: Arc<StateKvDb>) -> Self {
pub fn new(state_kv_db: Arc<StateKvDb>) -> Result<Self> {
info!(name = STATE_KV_PRUNER_NAME, "Initializing...");

let progress = state_kv_db
.metadata_db()
.get::<DbMetadataSchema>(&DbMetadataKey::StateKvPrunerProgress)?
.map_or(0, |v| v.expect_version());

let pruner = StateKvPruner {
state_kv_db: Arc::clone(&state_kv_db),
target_version: AtomicVersion::new(0),
progress: AtomicVersion::new(0),
target_version: AtomicVersion::new(progress),
progress: AtomicVersion::new(progress),
state_value_pruner: Arc::new(StateValuePruner::new(state_kv_db)),
};
pruner.initialize();
pruner

info!(name = pruner.name(), progress = progress, "Initialized.");

Ok(pruner)
}

fn prune_inner(
Expand Down
5 changes: 2 additions & 3 deletions storage/aptosdb/src/pruner/state_merkle_pruner_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@ where
StaleNodeIndex: KeyCodec<S>,
{
state_merkle_db: Arc<StateMerkleDb>,
/// DB version window, which dictates how many versions of state store
/// to keep.
/// DB version window, which dictates how many versions of state merkle data to keep.
prune_window: Version,
/// It is None iff the pruner is not enabled.
pruner_worker: Option<PrunerWorker>,
/// The minimal readable version for the ledger data.
/// The minimal readable version for the state merkle data.
min_readable_version: AtomicVersion,

_phantom: PhantomData<S>,
Expand Down
18 changes: 13 additions & 5 deletions storage/aptosdb/src/pruner/state_store/generics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@ pub trait StaleNodeIndexSchemaTrait: Schema<Key = StaleNodeIndex>
where
StaleNodeIndex: KeyCodec<Self>,
{
fn tag() -> DbMetadataKey;
fn progress_metadata_key(shard_id: Option<u8>) -> DbMetadataKey;
fn name() -> &'static str;
}

impl StaleNodeIndexSchemaTrait for StaleNodeIndexSchema {
fn tag() -> DbMetadataKey {
DbMetadataKey::StateMerklePrunerProgress
fn progress_metadata_key(shard_id: Option<u8>) -> DbMetadataKey {
if let Some(shard_id) = shard_id {
DbMetadataKey::StateMerkleShardPrunerProgress(shard_id as usize)
} else {
DbMetadataKey::StateMerklePrunerProgress
}
}

fn name() -> &'static str {
Expand All @@ -27,8 +31,12 @@ impl StaleNodeIndexSchemaTrait for StaleNodeIndexSchema {
}

impl StaleNodeIndexSchemaTrait for StaleNodeIndexCrossEpochSchema {
fn tag() -> DbMetadataKey {
DbMetadataKey::EpochEndingStateMerklePrunerProgress
fn progress_metadata_key(shard_id: Option<u8>) -> DbMetadataKey {
if let Some(shard_id) = shard_id {
DbMetadataKey::EpochEndingStateMerkleShardPrunerProgress(shard_id as usize)
} else {
DbMetadataKey::EpochEndingStateMerklePrunerProgress
}
}

fn name() -> &'static str {
Expand Down
Loading

0 comments on commit 63722ca

Please sign in to comment.