Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(merkle-tree): Propagate errors for mutable tree operations #2056

Merged
merged 9 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions core/bin/merkle_tree_consistency_checker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,31 @@ struct Cli {
}

impl Cli {
fn run(self, config: &DBConfig) {
fn run(self, config: &DBConfig) -> anyhow::Result<()> {
let db_path = &config.merkle_tree.path;
tracing::info!("Verifying consistency of Merkle tree at {db_path}");
let start = Instant::now();
let db = RocksDB::new(Path::new(db_path)).unwrap();
let tree = ZkSyncTree::new_lightweight(db.into());
let db =
RocksDB::new(Path::new(db_path)).context("failed initializing Merkle tree RocksDB")?;
let tree =
ZkSyncTree::new_lightweight(db.into()).context("cannot initialize Merkle tree")?;

let l1_batch_number = if let Some(number) = self.l1_batch {
L1BatchNumber(number)
} else {
let next_number = tree.next_l1_batch_number();
if next_number == L1BatchNumber(0) {
tracing::info!("Merkle tree is empty, skipping");
return;
return Ok(());
}
next_number - 1
};

tracing::info!("L1 batch number to check: {l1_batch_number}");
tree.verify_consistency(l1_batch_number);
tree.verify_consistency(l1_batch_number)
.context("Merkle tree is inconsistent")?;
tracing::info!("Merkle tree verified in {:?}", start.elapsed());
Ok(())
}
}

Expand All @@ -64,6 +68,5 @@ fn main() -> anyhow::Result<()> {
let _guard = builder.build();

let db_config = DBConfig::from_env().context("DBConfig::from_env()")?;
Cli::parse().run(&db_config);
Ok(())
Cli::parse().run(&db_config)
}
7 changes: 4 additions & 3 deletions core/lib/merkle_tree/examples/loadtest/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,15 @@ impl Database for WithBatching<'_> {
self.inner.start_profiling(operation)
}

fn apply_patch(&mut self, patch: PatchSet) {
self.inner.apply_patch(patch);
fn apply_patch(&mut self, patch: PatchSet) -> anyhow::Result<()> {
self.inner.apply_patch(patch)?;

self.in_memory_batch_size += 1;
if self.in_memory_batch_size >= self.batch_size {
println!("Flushing changes to underlying DB");
self.inner.flush();
self.inner.flush()?;
self.in_memory_batch_size = 0;
}
Ok(())
}
}
52 changes: 38 additions & 14 deletions core/lib/merkle_tree/examples/loadtest/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
//! prohibitively slow.

use std::{
thread,
any, thread,
time::{Duration, Instant},
};

use anyhow::Context as _;
use clap::Parser;
use rand::{rngs::StdRng, seq::IteratorRandom, SeedableRng};
use tempfile::TempDir;
Expand All @@ -24,6 +25,17 @@ use crate::batch::WithBatching;

mod batch;

fn panic_to_error(panic: Box<dyn any::Any + Send>) -> anyhow::Error {
let panic_message = if let Some(&panic_string) = panic.downcast_ref::<&'static str>() {
panic_string.to_string()
} else if let Some(panic_string) = panic.downcast_ref::<String>() {
panic_string.to_string()
} else {
"(unknown panic)".to_string()
};
anyhow::Error::msg(panic_message)
}

/// CLI for load-testing for the Merkle tree implementation.
#[derive(Debug, Parser)]
#[command(author, version, about, long_about = None)]
Expand Down Expand Up @@ -78,7 +90,7 @@ impl Cli {
.init();
}

fn run(self) {
fn run(self) -> anyhow::Result<()> {
Self::init_logging();
tracing::info!("Launched with options: {self:?}");

Expand All @@ -89,7 +101,7 @@ impl Cli {
mock_db = PatchSet::default();
&mut mock_db
} else {
let dir = TempDir::new().expect("failed creating temp dir for RocksDB");
let dir = TempDir::new().context("failed creating temp dir for RocksDB")?;
tracing::info!(
"Created temp dir for RocksDB: {}",
dir.path().to_string_lossy()
Expand All @@ -99,7 +111,8 @@ impl Cli {
include_indices_and_filters_in_block_cache: self.cache_indices,
..RocksDBOptions::default()
};
let db = RocksDB::with_options(dir.path(), db_options).unwrap();
let db =
RocksDB::with_options(dir.path(), db_options).context("failed creating RocksDB")?;
rocksdb = RocksDBWrapper::from(db);

if let Some(chunk_size) = self.chunk_size {
Expand All @@ -125,7 +138,7 @@ impl Cli {
let hasher: &dyn HashTree = if self.no_hashing { &() } else { &Blake2Hasher };
let mut rng = StdRng::seed_from_u64(self.rng_seed);

let mut tree = MerkleTree::with_hasher(db, hasher);
let mut tree = MerkleTree::with_hasher(db, hasher).context("cannot create tree")?;
let mut next_key_idx = 0_u64;
let mut next_value_idx = 0_u64;
for version in 0..self.commit_count {
Expand Down Expand Up @@ -154,19 +167,26 @@ impl Cli {
let reads =
Self::generate_keys(read_indices.into_iter()).map(TreeInstruction::Read);
let instructions = kvs.map(TreeInstruction::Write).chain(reads).collect();
let output = tree.extend_with_proofs(instructions);
output.root_hash().unwrap()
let output = tree
.extend_with_proofs(instructions)
.context("failed extending tree")?;
output.root_hash().context("tree update is empty")?
} else {
let output = tree.extend(kvs.collect());
let output = tree
.extend(kvs.collect())
.context("failed extending tree")?;
output.root_hash
};

if let Some((pruner_handle, _)) = &pruner_handles {
if pruner_handle.set_target_retained_version(version).is_err() {
tracing::error!("Pruner unexpectedly stopped");
let (_, pruner_thread) = pruner_handles.unwrap();
pruner_thread.join().expect("Pruner panicked");
return; // unreachable
pruner_thread
.join()
.map_err(panic_to_error)
.context("pruner thread panicked")??;
return Ok(()); // unreachable
}
}

Expand All @@ -177,14 +197,18 @@ impl Cli {
tracing::info!("Verifying tree consistency...");
let start = Instant::now();
tree.verify_consistency(self.commit_count - 1, false)
.expect("tree consistency check failed");
.context("tree consistency check failed")?;
let elapsed = start.elapsed();
tracing::info!("Verified tree consistency in {elapsed:?}");

if let Some((pruner_handle, pruner_thread)) = pruner_handles {
drop(pruner_handle);
pruner_thread.join().expect("Pruner panicked");
pruner_thread
.join()
.map_err(panic_to_error)
.context("pruner thread panicked")??;
}
Ok(())
}

fn generate_keys(key_indexes: impl Iterator<Item = u64>) -> impl Iterator<Item = U256> {
Expand All @@ -197,6 +221,6 @@ impl Cli {
}
}

fn main() {
Cli::parse().run();
fn main() -> anyhow::Result<()> {
Cli::parse().run()
}
32 changes: 22 additions & 10 deletions core/lib/merkle_tree/examples/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::time::Instant;

use anyhow::Context as _;
use clap::Parser;
use rand::{rngs::StdRng, Rng, SeedableRng};
use tempfile::TempDir;
Expand Down Expand Up @@ -47,7 +48,7 @@ impl Cli {
.init();
}

fn run(self) {
fn run(self) -> anyhow::Result<()> {
Self::init_logging();
tracing::info!("Launched with options: {self:?}");

Expand All @@ -57,7 +58,7 @@ impl Cli {
mock_db = PatchSet::default();
&mut mock_db
} else {
let dir = TempDir::new().expect("failed creating temp dir for RocksDB");
let dir = TempDir::new().context("failed creating temp dir for RocksDB")?;
tracing::info!(
"Created temp dir for RocksDB: {}",
dir.path().to_string_lossy()
Expand All @@ -66,7 +67,8 @@ impl Cli {
block_cache_capacity: self.block_cache,
..RocksDBOptions::default()
};
let db = RocksDB::with_options(dir.path(), db_options).unwrap();
let db =
RocksDB::with_options(dir.path(), db_options).context("failed creating RocksDB")?;
rocksdb = RocksDBWrapper::from(db);
_temp_dir = Some(dir);
&mut rocksdb
Expand All @@ -83,7 +85,8 @@ impl Cli {

let mut last_key = Key::zero();
let mut last_leaf_index = 0;
let mut recovery = MerkleTreeRecovery::with_hasher(db, recovered_version, hasher);
let mut recovery = MerkleTreeRecovery::with_hasher(db, recovered_version, hasher)
.context("cannot create tree")?;
let recovery_started_at = Instant::now();
for updated_idx in 0..self.update_count {
let started_at = Instant::now();
Expand All @@ -108,27 +111,36 @@ impl Cli {
})
.collect();
if self.random {
recovery.extend_random(recovery_entries);
recovery
.extend_random(recovery_entries)
.context("failed extending tree during recovery")?;
} else {
recovery.extend_linear(recovery_entries);
recovery
.extend_linear(recovery_entries)
.context("failed extending tree during recovery")?;
}
tracing::info!(
"Updated tree with recovery chunk #{updated_idx} in {:?}",
started_at.elapsed()
);
}

let tree = MerkleTree::new(recovery.finalize());
let db = recovery
.finalize()
.context("failed finalizing tree recovery")?;
let tree = MerkleTree::new(db).context("tree has invalid metadata after recovery")?;
tracing::info!(
"Recovery finished in {:?}; verifying consistency...",
recovery_started_at.elapsed()
);
let started_at = Instant::now();
tree.verify_consistency(recovered_version, true).unwrap();
tree.verify_consistency(recovered_version, true)
.context("tree is inconsistent")?;
tracing::info!("Verified consistency in {:?}", started_at.elapsed());
Ok(())
}
}

fn main() {
Cli::parse().run();
fn main() -> anyhow::Result<()> {
Cli::parse().run()
}
7 changes: 4 additions & 3 deletions core/lib/merkle_tree/src/consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,12 @@ mod tests {
const SECOND_KEY: Key = U256([0, 0, 0, 0x_dead_beef_0100_0000]);

fn prepare_database() -> PatchSet {
let mut tree = MerkleTree::new(PatchSet::default());
let mut tree = MerkleTree::new(PatchSet::default()).unwrap();
tree.extend(vec![
TreeEntry::new(FIRST_KEY, 1, H256([1; 32])),
TreeEntry::new(SECOND_KEY, 2, H256([2; 32])),
]);
])
.unwrap();
tree.db
}

Expand Down Expand Up @@ -315,7 +316,7 @@ mod tests {
.num_threads(1)
.build()
.expect("failed initializing `rayon` thread pool");
thread_pool.install(|| MerkleTree::new(db).verify_consistency(0, true))
thread_pool.install(|| MerkleTree::new(db).unwrap().verify_consistency(0, true))
}

#[test]
Expand Down
Loading
Loading