Skip to content

Commit

Permalink
fix(merkle-tree): Fix tree truncation (#3178)
Browse files Browse the repository at this point in the history
## What ❔

Removes all stale keys for the "future" tree versions during truncation.

## Why ❔

Otherwise, we may get bogus stale keys and non-stale nodes removed from
the tree during pruning.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zkstack dev fmt` and `zkstack dev
lint`.
  • Loading branch information
slowli authored Oct 31, 2024
1 parent 8ae06b2 commit 9654097
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 25 deletions.
5 changes: 4 additions & 1 deletion core/lib/merkle_tree/src/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,10 @@ impl ZkSyncTree {
pub fn roll_back_logs(&mut self, last_l1_batch_to_keep: L1BatchNumber) -> anyhow::Result<()> {
self.tree.db.reset();
let retained_version_count = u64::from(last_l1_batch_to_keep.0 + 1);
self.tree.truncate_recent_versions(retained_version_count)
// Since `Patched<_>` doesn't implement `PruneDatabase`, we borrow the underlying DB, which is safe
// because the in-memory patch was reset above.
MerkleTree::new_unchecked(self.tree.db.inner_mut())
.truncate_recent_versions(retained_version_count)
}

/// Saves the accumulated changes in the tree to RocksDB.
Expand Down
38 changes: 20 additions & 18 deletions core/lib/merkle_tree/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,24 +200,6 @@ impl<DB: Database, H: HashTree> MerkleTree<DB, H> {
root.unwrap_or(Root::Empty)
}

/// Removes the most recent versions from the database.
///
/// The current implementation does not actually remove node data for the removed versions
/// since it's likely to be reused in the future (especially upper-level internal nodes).
///
/// # Errors
///
/// Proxies database I/O errors.
pub fn truncate_recent_versions(&mut self, retained_version_count: u64) -> anyhow::Result<()> {
let mut manifest = self.db.manifest().unwrap_or_default();
if manifest.version_count > retained_version_count {
manifest.version_count = retained_version_count;
let patch = PatchSet::from_manifest(manifest);
self.db.apply_patch(patch)?;
}
Ok(())
}

/// Extends this tree by creating its new version.
///
/// # Return value
Expand Down Expand Up @@ -259,6 +241,26 @@ impl<DB: Database, H: HashTree> MerkleTree<DB, H> {
}

impl<DB: PruneDatabase> MerkleTree<DB> {
/// Removes the most recent versions from the database.
///
/// The current implementation does not actually remove node data for the removed versions
/// since it's likely to be reused in the future (especially upper-level internal nodes).
///
/// # Errors
///
/// Proxies database I/O errors.
pub fn truncate_recent_versions(&mut self, retained_version_count: u64) -> anyhow::Result<()> {
let mut manifest = self.db.manifest().unwrap_or_default();
let current_version_count = manifest.version_count;
if current_version_count > retained_version_count {
// It is necessary to remove "future" stale keys since otherwise they may be used in future pruning and lead
// to non-obsolete tree nodes getting removed.
manifest.version_count = retained_version_count;
self.db.truncate(manifest, ..current_version_count)?;
}
Ok(())
}

/// Returns the first retained version of the tree.
pub fn first_retained_version(&self) -> Option<u64> {
match self.db.min_stale_key_version() {
Expand Down
67 changes: 66 additions & 1 deletion core/lib/merkle_tree/src/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ mod tests {
use super::*;
use crate::{
types::{Node, NodeKey},
Database, Key, MerkleTree, PatchSet, TreeEntry, ValueHash,
Database, Key, MerkleTree, PatchSet, RocksDBWrapper, TreeEntry, ValueHash,
};

fn create_db() -> PatchSet {
Expand Down Expand Up @@ -506,4 +506,69 @@ mod tests {
println!("Keys are pruned after each update");
test_keys_are_removed_by_pruning_when_overwritten_in_multiple_batches(true);
}

fn test_pruning_with_truncation(db: impl PruneDatabase) {
let mut tree = MerkleTree::new(db).unwrap();
let kvs: Vec<_> = (0_u64..100)
.map(|i| TreeEntry::new(Key::from(i), i + 1, ValueHash::zero()))
.collect();
tree.extend(kvs).unwrap();

let overridden_kvs = vec![TreeEntry::new(
Key::from(0),
1,
ValueHash::repeat_byte(0xaa),
)];
tree.extend(overridden_kvs).unwrap();

let stale_keys = tree.db.stale_keys(1);
assert!(
stale_keys.iter().any(|key| !key.is_empty()),
"{stale_keys:?}"
);

// Revert `overridden_kvs`.
tree.truncate_recent_versions(1).unwrap();
assert_eq!(tree.latest_version(), Some(0));
let future_stale_keys = tree.db.stale_keys(1);
assert!(future_stale_keys.is_empty());

// Add a new version without the key. To make the matter more egregious, the inserted key
// differs from all existing keys, starting from the first nibble.
let new_key = Key::from_big_endian(&[0xaa; 32]);
let new_kvs = vec![TreeEntry::new(new_key, 101, ValueHash::repeat_byte(0xaa))];
tree.extend(new_kvs).unwrap();
assert_eq!(tree.latest_version(), Some(1));

let stale_keys = tree.db.stale_keys(1);
assert_eq!(stale_keys.len(), 1);
assert!(
stale_keys[0].is_empty() && stale_keys[0].version == 0,
"{stale_keys:?}"
);

let (mut pruner, _) = MerkleTreePruner::new(tree.db);
let prunable_version = pruner.last_prunable_version().unwrap();
assert_eq!(prunable_version, 1);
let stats = pruner
.prune_up_to(prunable_version)
.unwrap()
.expect("tree was not pruned");
assert_eq!(stats.target_retained_version, 1);
assert_eq!(stats.pruned_key_count, 1); // only the root node should have been pruned

let tree = MerkleTree::new(pruner.db).unwrap();
tree.verify_consistency(1, false).unwrap();
}

#[test]
fn pruning_with_truncation() {
test_pruning_with_truncation(PatchSet::default());
}

#[test]
fn pruning_with_truncation_on_rocksdb() {
let temp_dir = tempfile::TempDir::new().unwrap();
test_pruning_with_truncation(RocksDBWrapper::new(temp_dir.path()).unwrap());
}
}
30 changes: 30 additions & 0 deletions core/lib/merkle_tree/src/storage/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,17 @@ pub trait PruneDatabase: Database {
///
/// Propagates database I/O errors.
fn prune(&mut self, patch: PrunePatchSet) -> anyhow::Result<()>;

/// Atomically truncates the specified range of versions and stale keys.
///
/// # Errors
///
/// Propagates database I/O errors.
fn truncate(
&mut self,
manifest: Manifest,
truncated_versions: ops::RangeTo<u64>,
) -> anyhow::Result<()>;
}

impl<T: PruneDatabase + ?Sized> PruneDatabase for &mut T {
Expand All @@ -414,6 +425,14 @@ impl<T: PruneDatabase + ?Sized> PruneDatabase for &mut T {
fn prune(&mut self, patch: PrunePatchSet) -> anyhow::Result<()> {
(**self).prune(patch)
}

fn truncate(
&mut self,
manifest: Manifest,
truncated_versions: ops::RangeTo<u64>,
) -> anyhow::Result<()> {
(**self).truncate(manifest, truncated_versions)
}
}

impl PruneDatabase for PatchSet {
Expand Down Expand Up @@ -447,6 +466,17 @@ impl PruneDatabase for PatchSet {
.retain(|version, _| !patch.deleted_stale_key_versions.contains(version));
Ok(())
}

fn truncate(
&mut self,
manifest: Manifest,
truncated_versions: ops::RangeTo<u64>,
) -> anyhow::Result<()> {
self.manifest = manifest;
self.stale_keys_by_version
.retain(|version, _| !truncated_versions.contains(version));
Ok(())
}
}

#[cfg(test)]
Expand Down
24 changes: 23 additions & 1 deletion core/lib/merkle_tree/src/storage/parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
any::Any,
collections::{HashMap, VecDeque},
error::Error as StdError,
mem,
mem, ops,
sync::{mpsc, Arc},
thread,
time::Duration,
Expand Down Expand Up @@ -375,6 +375,17 @@ impl<DB: PruneDatabase> PruneDatabase for ParallelDatabase<DB> {
.context("failed synchronizing database before pruning")?;
self.inner.prune(patch)
}

fn truncate(
&mut self,
manifest: Manifest,
truncated_versions: ops::RangeTo<u64>,
) -> anyhow::Result<()> {
// Require the underlying database to be fully synced.
self.wait_sync()
.context("failed synchronizing database before truncation")?;
self.inner.truncate(manifest, truncated_versions)
}
}

/// Database with either sequential or parallel persistence.
Expand Down Expand Up @@ -479,6 +490,17 @@ impl<DB: PruneDatabase> PruneDatabase for MaybeParallel<DB> {
Self::Parallel(db) => db.prune(patch),
}
}

fn truncate(
&mut self,
manifest: Manifest,
truncated_versions: ops::RangeTo<u64>,
) -> anyhow::Result<()> {
match self {
Self::Sequential(db) => db.truncate(manifest, truncated_versions),
Self::Parallel(db) => db.truncate(manifest, truncated_versions),
}
}
}

#[cfg(test)]
Expand Down
28 changes: 27 additions & 1 deletion core/lib/merkle_tree/src/storage/rocksdb.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! RocksDB implementation of [`Database`].
use std::{any::Any, cell::RefCell, path::Path, sync::Arc};
use std::{any::Any, cell::RefCell, ops, path::Path, sync::Arc};

use anyhow::Context as _;
use rayon::prelude::*;
Expand Down Expand Up @@ -351,6 +351,32 @@ impl PruneDatabase for RocksDBWrapper {
.write(write_batch)
.context("Failed writing a batch to RocksDB")
}

fn truncate(
&mut self,
manifest: Manifest,
truncated_versions: ops::RangeTo<u64>,
) -> anyhow::Result<()> {
anyhow::ensure!(
manifest.version_count <= truncated_versions.end,
"Invalid truncate call: manifest={manifest:?}, truncated_versions={truncated_versions:?}"
);
let mut write_batch = self.db.new_write_batch();

let tree_cf = MerkleTreeColumnFamily::Tree;
let mut node_bytes = Vec::with_capacity(128);
manifest.serialize(&mut node_bytes);
write_batch.put_cf(tree_cf, Self::MANIFEST_KEY, &node_bytes);

let stale_keys_cf = MerkleTreeColumnFamily::StaleKeys;
let first_version = &manifest.version_count.to_be_bytes() as &[_];
let last_version = &truncated_versions.end.to_be_bytes();
write_batch.delete_range_cf(stale_keys_cf, first_version..last_version);

self.db
.write(write_batch)
.context("Failed writing a batch to RocksDB")
}
}

#[cfg(test)]
Expand Down
6 changes: 3 additions & 3 deletions core/lib/merkle_tree/tests/integration/merkle_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};
use test_casing::test_casing;
use zksync_crypto_primitives::hasher::blake2::Blake2Hasher;
use zksync_merkle_tree::{
Database, HashTree, MerkleTree, PatchSet, Patched, TreeEntry, TreeInstruction, TreeLogEntry,
TreeRangeDigest,
Database, HashTree, MerkleTree, PatchSet, Patched, PruneDatabase, TreeEntry, TreeInstruction,
TreeLogEntry, TreeRangeDigest,
};
use zksync_types::{AccountTreeId, Address, StorageKey, H256, U256};

Expand Down Expand Up @@ -270,7 +270,7 @@ fn accumulating_commits(chunk_size: usize) {
test_accumulated_commits(PatchSet::default(), chunk_size);
}

fn test_root_hash_computing_with_reverts(db: &mut impl Database) {
fn test_root_hash_computing_with_reverts(db: &mut impl PruneDatabase) {
let (kvs, expected_hash) = &*ENTRIES_AND_HASH;
let (initial_update, final_update) = kvs.split_at(75);
let key_updates: Vec<_> = kvs
Expand Down

0 comments on commit 9654097

Please sign in to comment.