Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tree): Improved tree pruning #1532

Merged
merged 23 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,10 @@ async fn run_tree(
.await
.context("failed creating DB pool for Merkle tree recovery")?;

let metadata_calculator =
MetadataCalculator::new(metadata_calculator_config, None, tree_pool, recovery_pool)
.await
.context("failed initializing metadata calculator")?;
let metadata_calculator = MetadataCalculator::new(metadata_calculator_config, None, tree_pool)
.await
.context("failed initializing metadata calculator")?
.with_recovery_pool(recovery_pool);

let tree_reader = Arc::new(metadata_calculator.tree_reader());
app_health.insert_component(metadata_calculator.tree_health_check());
Expand Down
7 changes: 6 additions & 1 deletion core/lib/merkle_tree/examples/loadtest/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl Cli {
}

if self.prune {
let (mut pruner, pruner_handle) = MerkleTreePruner::new(rocksdb.clone(), 0);
let (mut pruner, pruner_handle) = MerkleTreePruner::new(rocksdb.clone());
pruner.set_poll_interval(Duration::from_secs(10));
let pruner_thread = thread::spawn(|| pruner.run());
pruner_handles = Some((pruner_handle, pruner_thread));
Expand Down Expand Up @@ -160,6 +160,11 @@ impl Cli {
let output = tree.extend(kvs.collect());
output.root_hash
};

if let Some((pruner_handle, _)) = &pruner_handles {
pruner_handle.set_target_retained_version(version);
}

let elapsed = start.elapsed();
tracing::info!("Processed block #{version} in {elapsed:?}, root hash = {root_hash:?}");
}
Expand Down
42 changes: 41 additions & 1 deletion core/lib/merkle_tree/src/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ use zksync_prover_interface::inputs::{PrepareBasicCircuitsJob, StorageLogMetadat
use zksync_types::{L1BatchNumber, StorageKey};

use crate::{
consistency::ConsistencyError,
storage::{PatchSet, Patched, RocksDBWrapper},
types::{
Key, Root, TreeEntry, TreeEntryWithProof, TreeInstruction, TreeLogEntry, ValueHash,
TREE_DEPTH,
},
BlockOutput, HashTree, MerkleTree, NoVersionError,
BlockOutput, HashTree, MerkleTree, MerkleTreePruner, MerkleTreePrunerHandle, NoVersionError,
};

/// Metadata for the current tree state.
Expand Down Expand Up @@ -42,6 +43,7 @@ pub struct ZkSyncTree {
tree: MerkleTree<Patched<RocksDBWrapper>>,
thread_pool: Option<ThreadPool>,
mode: TreeMode,
pruning_enabled: bool,
}

impl ZkSyncTree {
Expand Down Expand Up @@ -93,9 +95,26 @@ impl ZkSyncTree {
tree: MerkleTree::new(Patched::new(db)),
thread_pool: None,
mode,
pruning_enabled: false,
}
}

/// Returns tree pruner and a handle to stop it.
///
/// # Panics
///
/// Panics if this method was already called for the tree instance; it's logically unsound to run
/// multiple pruners for the same tree concurrently.
pub fn pruner(&mut self) -> (MerkleTreePruner<RocksDBWrapper>, MerkleTreePrunerHandle) {
assert!(
!self.pruning_enabled,
"pruner was already obtained for the tree"
);
self.pruning_enabled = true;
let db = self.tree.db.inner().clone();
MerkleTreePruner::new(db)
}

/// Returns a readonly handle to the tree. The handle **does not** see uncommitted changes to the tree,
/// only ones flushed to RocksDB.
pub fn reader(&self) -> ZkSyncTreeReader {
Expand Down Expand Up @@ -360,6 +379,14 @@ impl ZkSyncTreeReader {
L1BatchNumber(number)
}

/// Returns the minimum L1 batch number retained by the tree.
#[allow(clippy::missing_panics_doc)]
pub fn min_l1_batch_number(&self) -> Option<L1BatchNumber> {
self.0.first_retained_version().map(|version| {
L1BatchNumber(u32::try_from(version).expect("integer overflow for L1 batch number"))
})
}

/// Returns the number of leaves in the tree.
pub fn leaf_count(&self) -> u64 {
self.0.latest_root().leaf_count()
Expand All @@ -379,4 +406,17 @@ impl ZkSyncTreeReader {
let version = u64::from(l1_batch_number.0);
self.0.entries_with_proofs(version, keys)
}

/// Verifies consistency of the tree at the specified L1 batch number.
///
/// # Errors
///
/// Returns the first encountered verification error, should one occur.
pub fn verify_consistency(
&self,
l1_batch_number: L1BatchNumber,
) -> Result<(), ConsistencyError> {
let version = l1_batch_number.0.into();
self.0.verify_consistency(version, true)
}
}
13 changes: 13 additions & 0 deletions core/lib/merkle_tree/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,19 @@ impl<DB: Database, H: HashTree> MerkleTree<DB, H> {
}
}

impl<DB: PruneDatabase> MerkleTree<DB> {
/// Returns the first retained version of the tree.
pub fn first_retained_version(&self) -> Option<u64> {
match self.db.min_stale_key_version() {
// Min stale key version is next after the first retained version since at least
// the root is updated on each version.
Some(version) => version.checked_sub(1),
// No stale keys means all past versions of the tree have been pruned
None => self.latest_version(),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion core/lib/merkle_tree/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ pub struct PruningStats {
}

impl PruningStats {
pub fn report(self) {
pub fn report(&self) {
PRUNING_METRICS
.target_retained_version
.set(self.target_retained_version);
Expand Down
Loading
Loading