From b08a667c819f8b3d222c237fc4447be6b75d334e Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Thu, 6 Jun 2024 09:58:36 +0300 Subject: [PATCH] perf(en): Parallelize persistence and chunk processing during tree recovery (#2050) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Persists chunks during tree recovery in parallel to processing subsequent chunks. ## Why ❔ - Could speed up tree recovery ~2x on the mainnet (both persistence and processing of a single chunk of 200k entries take ~3s). - Significantly easier to implement and reason about than alternatives. - May be used together with alternatives. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. - [x] Spellcheck has been run via `zk spellcheck`. --- core/bin/external_node/src/config/mod.rs | 7 + core/bin/external_node/src/main.rs | 3 + core/lib/merkle_tree/examples/recovery.rs | 36 +- core/lib/merkle_tree/src/metrics.rs | 3 + .../src/{recovery.rs => recovery/mod.rs} | 119 ++-- core/lib/merkle_tree/src/recovery/tests.rs | 56 ++ core/lib/merkle_tree/src/storage/mod.rs | 7 +- core/lib/merkle_tree/src/storage/parallel.rs | 625 ++++++++++++++++++ core/lib/merkle_tree/src/storage/patch.rs | 15 + .../merkle_tree/tests/integration/recovery.rs | 40 ++ core/node/metadata_calculator/src/helpers.rs | 44 +- core/node/metadata_calculator/src/lib.rs | 10 +- .../metadata_calculator/src/recovery/mod.rs | 7 +- .../metadata_calculator/src/recovery/tests.rs | 127 ++-- .../tests/snapshot-recovery.test.ts | 4 +- 15 files changed, 956 insertions(+), 147 deletions(-) rename core/lib/merkle_tree/src/{recovery.rs => recovery/mod.rs} (81%) create mode 100644 core/lib/merkle_tree/src/recovery/tests.rs create mode 100644 core/lib/merkle_tree/src/storage/parallel.rs diff --git a/core/bin/external_node/src/config/mod.rs b/core/bin/external_node/src/config/mod.rs index 3d94e833217a..e329150721c0 100644 --- a/core/bin/external_node/src/config/mod.rs +++ b/core/bin/external_node/src/config/mod.rs @@ -755,6 +755,12 @@ pub(crate) struct ExperimentalENConfig { /// of recovery and then restarted with a different config). #[serde(default = "ExperimentalENConfig::default_snapshots_recovery_tree_chunk_size")] pub snapshots_recovery_tree_chunk_size: u64, + /// Buffer capacity for parallel persistence operations. Should be reasonably small since larger buffer means more RAM usage; + /// buffer elements are persisted tree chunks. OTOH, small buffer can lead to persistence parallelization being inefficient. + /// + /// If not set, parallel persistence will be disabled. + #[serde(default)] // Temporarily use a conservative option (sequential recovery) as default + pub snapshots_recovery_tree_parallel_persistence_buffer: Option, // Commitment generator /// Maximum degree of parallelism during commitment generation, i.e., the maximum number of L1 batches being processed in parallel. @@ -779,6 +785,7 @@ impl ExperimentalENConfig { state_keeper_db_max_open_files: None, snapshots_recovery_l1_batch: None, snapshots_recovery_tree_chunk_size: Self::default_snapshots_recovery_tree_chunk_size(), + snapshots_recovery_tree_parallel_persistence_buffer: None, commitment_generator_max_parallelism: None, } } diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 05f4b2ba9d43..a80d652ba200 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -140,6 +140,9 @@ async fn run_tree( stalled_writes_timeout: config.optional.merkle_tree_stalled_writes_timeout(), recovery: MetadataCalculatorRecoveryConfig { desired_chunk_size: config.experimental.snapshots_recovery_tree_chunk_size, + parallel_persistence_buffer: config + .experimental + .snapshots_recovery_tree_parallel_persistence_buffer, }, }; diff --git a/core/lib/merkle_tree/examples/recovery.rs b/core/lib/merkle_tree/examples/recovery.rs index c9367c48b360..882bfe9d9823 100644 --- a/core/lib/merkle_tree/examples/recovery.rs +++ b/core/lib/merkle_tree/examples/recovery.rs @@ -32,6 +32,9 @@ struct Cli { /// Perform testing on in-memory DB rather than RocksDB (i.e., with focus on hashing logic). #[arg(long = "in-memory", short = 'M')] in_memory: bool, + /// Parallelize DB persistence with processing. + #[arg(long = "parallelize", conflicts_with = "in_memory")] + parallelize: bool, /// Block cache capacity for RocksDB in bytes. #[arg(long = "block-cache", conflicts_with = "in_memory")] block_cache: Option, @@ -52,11 +55,13 @@ impl Cli { Self::init_logging(); tracing::info!("Launched with options: {self:?}"); - let (mut mock_db, mut rocksdb); - let mut _temp_dir = None; - let db: &mut dyn PruneDatabase = if self.in_memory { - mock_db = PatchSet::default(); - &mut mock_db + let hasher: &dyn HashTree = if self.no_hashing { &() } else { &Blake2Hasher }; + let recovered_version = 123; + + if self.in_memory { + let recovery = + MerkleTreeRecovery::with_hasher(PatchSet::default(), recovered_version, hasher)?; + self.recover_tree(recovery, recovered_version) } else { let dir = TempDir::new().context("failed creating temp dir for RocksDB")?; tracing::info!( @@ -69,15 +74,22 @@ impl Cli { }; let db = RocksDB::with_options(dir.path(), db_options).context("failed creating RocksDB")?; - rocksdb = RocksDBWrapper::from(db); - _temp_dir = Some(dir); - &mut rocksdb - }; + let db = RocksDBWrapper::from(db); + let mut recovery = MerkleTreeRecovery::with_hasher(db, recovered_version, hasher)?; + if self.parallelize { + recovery.parallelize_persistence(4)?; + } + self.recover_tree(recovery, recovered_version) + } + } - let hasher: &dyn HashTree = if self.no_hashing { &() } else { &Blake2Hasher }; + fn recover_tree( + self, + mut recovery: MerkleTreeRecovery, + recovered_version: u64, + ) -> anyhow::Result<()> { let mut rng = StdRng::seed_from_u64(self.rng_seed); - let recovered_version = 123; let key_step = Key::MAX / (Key::from(self.update_count) * Key::from(self.writes_per_update)); assert!(key_step > Key::from(u64::MAX)); @@ -85,8 +97,6 @@ impl Cli { let mut last_key = Key::zero(); let mut last_leaf_index = 0; - let mut recovery = MerkleTreeRecovery::with_hasher(db, recovered_version, hasher) - .context("cannot create tree")?; let recovery_started_at = Instant::now(); for updated_idx in 0..self.update_count { let started_at = Instant::now(); diff --git a/core/lib/merkle_tree/src/metrics.rs b/core/lib/merkle_tree/src/metrics.rs index 2190b9acaa07..84769482527a 100644 --- a/core/lib/merkle_tree/src/metrics.rs +++ b/core/lib/merkle_tree/src/metrics.rs @@ -365,6 +365,7 @@ pub(crate) static PRUNING_TIMINGS: Global = Global::new(); pub(crate) enum RecoveryStage { Extend, ApplyPatch, + ParallelPersistence, } const CHUNK_SIZE_BUCKETS: Buckets = Buckets::values(&[ @@ -391,6 +392,8 @@ pub(crate) struct RecoveryMetrics { /// Latency of a specific stage of recovery for a single chunk. #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] pub stage_latency: Family>, + /// Number of buffered commands if parallel persistence is used. + pub parallel_persistence_buffer_size: Gauge, } #[vise::register] diff --git a/core/lib/merkle_tree/src/recovery.rs b/core/lib/merkle_tree/src/recovery/mod.rs similarity index 81% rename from core/lib/merkle_tree/src/recovery.rs rename to core/lib/merkle_tree/src/recovery/mod.rs index 8f3cf35558f6..87a601f32f97 100644 --- a/core/lib/merkle_tree/src/recovery.rs +++ b/core/lib/merkle_tree/src/recovery/mod.rs @@ -40,17 +40,21 @@ use std::{collections::HashMap, time::Instant}; use anyhow::Context as _; use zksync_crypto::hasher::blake2::Blake2Hasher; +pub use crate::storage::PersistenceThreadHandle; use crate::{ hasher::{HashTree, HasherWithStats}, metrics::{RecoveryStage, RECOVERY_METRICS}, - storage::{PatchSet, PruneDatabase, PrunePatchSet, Storage}, + storage::{Database, MaybeParallel, PatchSet, PruneDatabase, PrunePatchSet, Storage}, types::{Key, Manifest, Root, TreeEntry, TreeTags, ValueHash}, }; +#[cfg(test)] +mod tests; + /// Handle to a Merkle tree during its recovery. #[derive(Debug)] pub struct MerkleTreeRecovery { - pub(crate) db: DB, + pub(crate) db: MaybeParallel, hasher: H, recovered_version: u64, } @@ -105,7 +109,7 @@ impl MerkleTreeRecovery { db.apply_patch(PatchSet::from_manifest(manifest))?; Ok(Self { - db, + db: MaybeParallel::Sequential(db), hasher, recovered_version, }) @@ -257,7 +261,54 @@ impl MerkleTreeRecovery { self.db.apply_patch(PatchSet::from_manifest(manifest))?; tracing::debug!("Updated tree manifest to mark recovery as complete"); - Ok(self.db) + self.db.join() + } +} + +impl MerkleTreeRecovery { + /// Offloads database persistence to a background thread, so that it can run at the same time as processing of the following chunks. + /// Chunks are still guaranteed to be persisted atomically and in order. + /// + /// # Arguments + /// + /// - `buffer_capacity` determines how many chunks can be buffered before persistence blocks (i.e., back-pressure). + /// Also controls memory usage, since each chunk translates into a non-trivial database patch (order of 1 kB / entry; + /// i.e., a chunk with 200,000 entries would translate to a 200 MB patch). + /// + /// # Return value + /// + /// On success, returns a handle allowing to control background persistence thread. For now, it can only be used to emulate persistence crashes; + /// the handle can be dropped otherwise. + /// + /// # Safety + /// + /// If recovery is interrupted (e.g., its process crashes), then some of the latest chunks may not be persisted, + /// and will need to be processed again. It is **unsound** to restart recovery while a persistence thread may be active; + /// this may lead to a corrupted database state. + /// + /// # Errors + /// + /// Returns an error if `buffer_capacity` is 0, or if persistence was already parallelized. + pub fn parallelize_persistence( + &mut self, + buffer_capacity: usize, + ) -> anyhow::Result { + anyhow::ensure!(buffer_capacity > 0, "Buffer capacity must be positive"); + self.db + .parallelize(self.recovered_version, buffer_capacity) + .context("persistence is already parallelized") + } + + /// Waits until all changes in the underlying database are persisted, i.e. all chunks are flushed into it. + /// This is only relevant if [persistence was parallelized](Self::parallelize_persistence()) earlier; + /// otherwise, this method will return immediately. + /// + /// # Errors + /// + /// Propagates database I/O errors, should they occur during persistence. + pub fn wait_for_persistence(self) -> anyhow::Result<()> { + self.db.join()?; + Ok(()) } } @@ -267,63 +318,3 @@ fn entries_key_range(entries: &[TreeEntry]) -> String { }; format!("{:0>64x}..={:0>64x}", first.key, last.key) } - -#[cfg(test)] -mod tests { - use super::*; - use crate::{hasher::HasherWithStats, types::LeafNode, MerkleTree}; - - #[test] - fn recovery_for_initialized_tree() { - let mut db = PatchSet::default(); - MerkleTreeRecovery::new(&mut db, 123) - .unwrap() - .finalize() - .unwrap(); - let err = MerkleTreeRecovery::new(db, 123).unwrap_err().to_string(); - assert!( - err.contains("Tree is expected to be in the process of recovery"), - "{err}" - ); - } - - #[test] - fn recovery_for_different_version() { - let mut db = PatchSet::default(); - MerkleTreeRecovery::new(&mut db, 123).unwrap(); - let err = MerkleTreeRecovery::new(&mut db, 42) - .unwrap_err() - .to_string(); - assert!( - err.contains("Requested to recover tree version 42"), - "{err}" - ); - } - - #[test] - fn recovering_empty_tree() { - let db = MerkleTreeRecovery::new(PatchSet::default(), 42) - .unwrap() - .finalize() - .unwrap(); - let tree = MerkleTree::new(db).unwrap(); - assert_eq!(tree.latest_version(), Some(42)); - assert_eq!(tree.root(42), Some(Root::Empty)); - } - - #[test] - fn recovering_tree_with_single_node() { - let mut recovery = MerkleTreeRecovery::new(PatchSet::default(), 42).unwrap(); - let recovery_entry = TreeEntry::new(Key::from(123), 1, ValueHash::repeat_byte(1)); - recovery.extend_linear(vec![recovery_entry]).unwrap(); - let tree = MerkleTree::new(recovery.finalize().unwrap()).unwrap(); - - assert_eq!(tree.latest_version(), Some(42)); - let mut hasher = HasherWithStats::new(&Blake2Hasher); - assert_eq!( - tree.latest_root_hash(), - LeafNode::new(recovery_entry).hash(&mut hasher, 0) - ); - tree.verify_consistency(42, true).unwrap(); - } -} diff --git a/core/lib/merkle_tree/src/recovery/tests.rs b/core/lib/merkle_tree/src/recovery/tests.rs new file mode 100644 index 000000000000..601b56269b65 --- /dev/null +++ b/core/lib/merkle_tree/src/recovery/tests.rs @@ -0,0 +1,56 @@ +use super::*; +use crate::{hasher::HasherWithStats, types::LeafNode, MerkleTree}; + +#[test] +fn recovery_for_initialized_tree() { + let mut db = PatchSet::default(); + MerkleTreeRecovery::new(&mut db, 123) + .unwrap() + .finalize() + .unwrap(); + let err = MerkleTreeRecovery::new(db, 123).unwrap_err().to_string(); + assert!( + err.contains("Tree is expected to be in the process of recovery"), + "{err}" + ); +} + +#[test] +fn recovery_for_different_version() { + let mut db = PatchSet::default(); + MerkleTreeRecovery::new(&mut db, 123).unwrap(); + let err = MerkleTreeRecovery::new(&mut db, 42) + .unwrap_err() + .to_string(); + assert!( + err.contains("Requested to recover tree version 42"), + "{err}" + ); +} + +#[test] +fn recovering_empty_tree() { + let db = MerkleTreeRecovery::new(PatchSet::default(), 42) + .unwrap() + .finalize() + .unwrap(); + let tree = MerkleTree::new(db).unwrap(); + assert_eq!(tree.latest_version(), Some(42)); + assert_eq!(tree.root(42), Some(Root::Empty)); +} + +#[test] +fn recovering_tree_with_single_node() { + let mut recovery = MerkleTreeRecovery::new(PatchSet::default(), 42).unwrap(); + let recovery_entry = TreeEntry::new(Key::from(123), 1, ValueHash::repeat_byte(1)); + recovery.extend_linear(vec![recovery_entry]).unwrap(); + let tree = MerkleTree::new(recovery.finalize().unwrap()).unwrap(); + + assert_eq!(tree.latest_version(), Some(42)); + let mut hasher = HasherWithStats::new(&Blake2Hasher); + assert_eq!( + tree.latest_root_hash(), + LeafNode::new(recovery_entry).hash(&mut hasher, 0) + ); + tree.verify_consistency(42, true).unwrap(); +} diff --git a/core/lib/merkle_tree/src/storage/mod.rs b/core/lib/merkle_tree/src/storage/mod.rs index 9728f99d57bb..b70485b93188 100644 --- a/core/lib/merkle_tree/src/storage/mod.rs +++ b/core/lib/merkle_tree/src/storage/mod.rs @@ -1,11 +1,15 @@ //! Storage-related logic. -pub(crate) use self::patch::{LoadAncestorsResult, WorkingPatchSet}; pub use self::{ database::{Database, NodeKeys, Patched, PruneDatabase, PrunePatchSet}, + parallel::PersistenceThreadHandle, patch::PatchSet, rocksdb::{MerkleTreeColumnFamily, RocksDBWrapper}, }; +pub(crate) use self::{ + parallel::MaybeParallel, + patch::{LoadAncestorsResult, WorkingPatchSet}, +}; use crate::{ hasher::HashTree, metrics::{TreeUpdaterStats, BLOCK_TIMINGS, GENERAL_METRICS}, @@ -16,6 +20,7 @@ use crate::{ }; mod database; +mod parallel; mod patch; mod proofs; mod rocksdb; diff --git a/core/lib/merkle_tree/src/storage/parallel.rs b/core/lib/merkle_tree/src/storage/parallel.rs new file mode 100644 index 000000000000..c5368c4561d2 --- /dev/null +++ b/core/lib/merkle_tree/src/storage/parallel.rs @@ -0,0 +1,625 @@ +//! Parallel storage implementation. + +use std::{ + any::Any, + collections::{HashMap, VecDeque}, + error::Error as StdError, + mem, + sync::{mpsc, Arc}, + thread, + time::Duration, +}; + +use anyhow::Context as _; + +use super::{patch::PartialPatchSet, Database, NodeKeys, PatchSet}; +use crate::{ + errors::DeserializeError, + metrics::{RecoveryStage, RECOVERY_METRICS}, + types::{Manifest, Node, NodeKey, ProfiledTreeOperation, Root}, + PruneDatabase, PrunePatchSet, +}; + +/// Persistence command passed to a persistence thread over a bounded channel. +#[derive(Debug, Clone)] +struct PersistenceCommand { + manifest: Manifest, + patch: Arc, + stale_keys: Vec, +} + +/// Command to a background persistence thread. +#[derive(Debug)] +enum Command { + Persist(PersistenceCommand), + Stop(mpsc::SyncSender<()>), +} + +/// Handle allowing to control background persistence for Merkle tree. +#[derive(Debug)] +pub struct PersistenceThreadHandle { + command_sender: mpsc::SyncSender, +} + +impl PersistenceThreadHandle { + /// Emulates stopping persisting updates; any updates afterwards will not actually be persisted. + /// + /// This method should only be used in tests. It is blocking (waits until all previous persistence commands are processed). + pub fn test_stop_processing(self) { + let (stop_sender, stop_receiver) = mpsc::sync_channel(0); + self.command_sender.send(Command::Stop(stop_sender)).ok(); + stop_receiver.recv().ok(); + } +} + +/// Thread join handle, or an error produced by the thread. +#[derive(Debug, Default)] +enum HandleOrError { + #[default] + Nothing, + Handle(thread::JoinHandle>), + Err(Arc), +} + +impl HandleOrError { + /// Checks whether the thread handle has exited, and returns an error if it exited with an error. + /// If `join` is set, waits for the thread handle to exit. + fn check(&mut self, join: bool) -> anyhow::Result<()> { + let err_arc = match self { + Self::Handle(handle) if join || handle.is_finished() => { + let Self::Handle(handle) = mem::take(self) else { + unreachable!("just checked variant earlier"); + }; + let err = match handle.join() { + Err(_) => anyhow::anyhow!("persistence thread panicked"), + // Handling normal exits depends on whether we expect the thread to exit. + Ok(Ok(())) if join => return Ok(()), + Ok(Ok(())) => anyhow::anyhow!("persistence thread unexpectedly stopped"), + Ok(Err(err)) => err, + }; + let err: Box = err.into(); + let err: Arc = err.into(); + *self = Self::Err(err.clone()); + err + } + Self::Handle(_) => return Ok(()), + Self::Err(err) => err.clone(), + Self::Nothing => unreachable!("only used temporarily to take out `JoinHandle`"), + }; + Err(anyhow::Error::new(err_arc)) + } + + fn join(mut self) -> anyhow::Result<()> { + self.check(true) + } +} + +/// Database implementation that persists changes in a background thread. Not yet applied changes +/// are queued up and are used in `Database` methods. A queue can sometimes be stale (i.e., changes +/// at its head may have been applied), but this is fine as long as changes are applied atomically and sequentially. +/// +/// The only use case where this struct is used right now is tree recovery. Correspondingly, some reported metrics +/// are specific to recovery and would need to be reworked if this struct is eventually used for other use cases. +/// +/// # Assumptions +/// +/// - This is the only mutable database instance. +/// - All database updates update the same tree version. +/// - The application supports latest changes being dropped. +#[derive(Debug)] +pub(crate) struct ParallelDatabase { + inner: DB, + updated_version: u64, + command_sender: mpsc::SyncSender, + persistence_handle: HandleOrError, + commands: VecDeque, +} + +impl ParallelDatabase { + fn new(inner: DB, updated_version: u64, buffer_capacity: usize) -> Self { + let (command_sender, command_receiver) = mpsc::sync_channel(buffer_capacity); + let persistence_database = inner.clone(); + Self { + inner, + updated_version, + command_sender, + persistence_handle: HandleOrError::Handle(thread::spawn(move || { + Self::run_persistence(persistence_database, updated_version, command_receiver) + })), + commands: VecDeque::with_capacity(buffer_capacity), + } + } + + fn persistence_thread_handle(&self) -> PersistenceThreadHandle { + PersistenceThreadHandle { + command_sender: self.command_sender.clone(), + } + } + + fn run_persistence( + mut database: DB, + updated_version: u64, + command_receiver: mpsc::Receiver, + ) -> anyhow::Result<()> { + let mut persisted_count = 0; + while let Ok(command) = command_receiver.recv() { + let command = match command { + Command::Persist(command) => command, + Command::Stop(_sender) => { + // Ensure that `PersistenceThreadHandle::test_stop_processing()` returns after the processing loop terminates. + drop(command_receiver); + anyhow::bail!("emulated persistence crash"); + } + }; + + tracing::debug!( + "Persisting patch #{persisted_count} with {} nodes and {} stale keys", + command.patch.nodes.len(), + command.stale_keys.len() + ); + // Reconstitute a `PatchSet` and apply it to the underlying database. + let patch = PatchSet { + manifest: command.manifest, + patches_by_version: HashMap::from([(updated_version, command.patch.cloned())]), + updated_version: Some(updated_version), + stale_keys_by_version: HashMap::from([(updated_version, command.stale_keys)]), + }; + let stage_latency = + RECOVERY_METRICS.stage_latency[&RecoveryStage::ParallelPersistence].start(); + database.apply_patch(patch)?; + let stage_latency = stage_latency.observe(); + tracing::debug!("Persisted patch #{persisted_count} in {stage_latency:?}"); + persisted_count += 1; + } + Ok(()) + } +} + +impl ParallelDatabase { + fn wait_sync(&mut self) -> anyhow::Result<()> { + while !self.commands.is_empty() { + self.commands + .retain(|command| Arc::strong_count(&command.patch) > 1); + thread::sleep(Duration::from_millis(50)); // TODO: more intelligent approach + } + RECOVERY_METRICS.parallel_persistence_buffer_size.set(0); + + // Check that the persistence thread hasn't panicked + self.persistence_handle.check(false) + } + + fn join(self) -> anyhow::Result { + drop(self.command_sender); + drop(self.commands); + RECOVERY_METRICS.parallel_persistence_buffer_size.set(0); + self.persistence_handle.join()?; + Ok(self.inner) + } +} + +impl Database for ParallelDatabase { + fn try_manifest(&self) -> Result, DeserializeError> { + let latest_command = self.commands.iter().next_back(); + if let Some(command) = latest_command { + Ok(Some(command.manifest.clone())) + } else { + self.inner.try_manifest() + } + } + + fn try_root(&self, version: u64) -> Result, DeserializeError> { + if version != self.updated_version { + return self.inner.try_root(version); + } + let root = self + .commands + .iter() + .rev() + .find_map(|command| command.patch.root.clone()); + if let Some(root) = root { + Ok(Some(root)) + } else { + self.inner.try_root(version) + } + } + + fn try_tree_node( + &self, + key: &NodeKey, + is_leaf: bool, + ) -> Result, DeserializeError> { + if key.version != self.updated_version { + return self.inner.try_tree_node(key, is_leaf); + } + + let node = self + .commands + .iter() + .rev() + .find_map(|command| command.patch.nodes.get(key).cloned()); + if let Some(node) = node { + debug_assert_eq!(matches!(node, Node::Leaf(_)), is_leaf); + Ok(Some(node)) + } else { + self.inner.try_tree_node(key, is_leaf) + } + } + + fn tree_nodes(&self, keys: &NodeKeys) -> Vec> { + let mut nodes = vec![None; keys.len()]; + for command in self.commands.iter().rev() { + for (key_idx, (key, is_leaf)) in keys.iter().enumerate() { + if nodes[key_idx].is_some() { + continue; + } + if let Some(node) = command.patch.nodes.get(key) { + debug_assert_eq!(matches!(node, Node::Leaf(_)), *is_leaf); + nodes[key_idx] = Some(node.clone()); + } + } + } + + // Load missing nodes from the underlying database + let (key_indexes, missing_keys): (Vec<_>, Vec<_>) = keys + .iter() + .copied() + .enumerate() + .filter(|(i, _)| nodes[*i].is_none()) + .unzip(); + let inner_nodes = self.inner.tree_nodes(&missing_keys); + for (key_idx, node) in key_indexes.into_iter().zip(inner_nodes) { + nodes[key_idx] = node; + } + nodes + } + + fn start_profiling(&self, operation: ProfiledTreeOperation) -> Box { + self.inner.start_profiling(operation) + } + + fn apply_patch(&mut self, mut patch: PatchSet) -> anyhow::Result<()> { + let partial_patch = if let Some(updated_version) = patch.updated_version { + anyhow::ensure!( + updated_version == self.updated_version, + "Unsupported update: must update predefined version {}", + self.updated_version + ); + anyhow::ensure!( + patch.patches_by_version.len() == 1, + "Unsupported update: must *only* update version {updated_version}" + ); + + // Garbage-collect patches already applied by the persistence thread. This will remove all patches + // if the persistence thread has failed, but this is OK because we'll propagate the failure below anyway. + self.commands + .retain(|command| Arc::strong_count(&command.patch) > 1); + RECOVERY_METRICS + .parallel_persistence_buffer_size + .set(self.commands.len()); + tracing::debug!( + "Retained {} buffered persistence command(s)", + self.commands.len() + ); + + patch + .patches_by_version + .remove(&updated_version) + .context("PatchSet invariant violated: missing patch for the updated version")? + } else { + // We only support manifest updates. + anyhow::ensure!( + patch.patches_by_version.is_empty(), + "Invalid update: {patch:?}" + ); + PartialPatchSet::empty() + }; + + let mut stale_keys_by_version = patch.stale_keys_by_version; + anyhow::ensure!( + stale_keys_by_version.is_empty() + || (stale_keys_by_version.len() == 1 + && stale_keys_by_version.contains_key(&self.updated_version)), + "Invalid stale keys update: {stale_keys_by_version:?}" + ); + let stale_keys = stale_keys_by_version + .remove(&self.updated_version) + .unwrap_or_default(); + + let command = PersistenceCommand { + manifest: patch.manifest, + patch: Arc::new(partial_patch), + stale_keys, + }; + if self + .command_sender + .send(Command::Persist(command.clone())) + .is_err() + { + self.persistence_handle.check(true)?; + anyhow::bail!( + "persistence thread never exits normally when `ParallelDatabase` is alive" + ); + } + self.commands.push_back(command); + RECOVERY_METRICS.parallel_persistence_buffer_size.inc_by(1); + Ok(()) + } +} + +impl PruneDatabase for ParallelDatabase { + fn min_stale_key_version(&self) -> Option { + let commands_have_stale_keys = self + .commands + .iter() + .any(|command| !command.stale_keys.is_empty()); + if commands_have_stale_keys { + return Some(self.updated_version); + } + self.inner.min_stale_key_version() + } + + fn stale_keys(&self, version: u64) -> Vec { + if version != self.updated_version { + return self.inner.stale_keys(version); + } + self.commands + .iter() + .flat_map(|command| command.stale_keys.clone()) + .chain(self.inner.stale_keys(version)) + .collect() + } + + fn prune(&mut self, patch: PrunePatchSet) -> anyhow::Result<()> { + // Require the underlying database to be fully synced. + self.wait_sync() + .context("failed synchronizing database before pruning")?; + self.inner.prune(patch) + } +} + +/// Database with either sequential or parallel persistence. +#[derive(Debug)] +pub(crate) enum MaybeParallel { + Sequential(DB), + Parallel(ParallelDatabase), +} + +impl MaybeParallel { + pub fn join(self) -> anyhow::Result { + match self { + Self::Sequential(db) => Ok(db), + Self::Parallel(db) => db.join(), + } + } +} + +impl MaybeParallel { + pub fn parallelize( + &mut self, + updated_version: u64, + buffer_capacity: usize, + ) -> Option { + if let Self::Sequential(db) = self { + let db = ParallelDatabase::new(db.clone(), updated_version, buffer_capacity); + let handle = db.persistence_thread_handle(); + *self = Self::Parallel(db); + Some(handle) + } else { + None + } + } +} + +impl Database for MaybeParallel { + fn try_manifest(&self) -> Result, DeserializeError> { + match self { + Self::Sequential(db) => db.try_manifest(), + Self::Parallel(db) => db.try_manifest(), + } + } + + fn try_root(&self, version: u64) -> Result, DeserializeError> { + match self { + Self::Sequential(db) => db.try_root(version), + Self::Parallel(db) => db.try_root(version), + } + } + + fn try_tree_node( + &self, + key: &NodeKey, + is_leaf: bool, + ) -> Result, DeserializeError> { + match self { + Self::Sequential(db) => db.try_tree_node(key, is_leaf), + Self::Parallel(db) => db.try_tree_node(key, is_leaf), + } + } + + fn tree_nodes(&self, keys: &NodeKeys) -> Vec> { + match self { + Self::Sequential(db) => db.tree_nodes(keys), + Self::Parallel(db) => db.tree_nodes(keys), + } + } + + fn start_profiling(&self, operation: ProfiledTreeOperation) -> Box { + match self { + Self::Sequential(db) => db.start_profiling(operation), + Self::Parallel(db) => db.start_profiling(operation), + } + } + + fn apply_patch(&mut self, patch: PatchSet) -> anyhow::Result<()> { + match self { + Self::Sequential(db) => db.apply_patch(patch), + Self::Parallel(db) => db.apply_patch(patch), + } + } +} + +impl PruneDatabase for MaybeParallel { + fn min_stale_key_version(&self) -> Option { + match self { + Self::Sequential(db) => db.min_stale_key_version(), + Self::Parallel(db) => db.min_stale_key_version(), + } + } + + fn stale_keys(&self, version: u64) -> Vec { + match self { + Self::Sequential(db) => db.stale_keys(version), + Self::Parallel(db) => db.stale_keys(version), + } + } + + fn prune(&mut self, patch: PrunePatchSet) -> anyhow::Result<()> { + match self { + Self::Sequential(db) => db.prune(patch), + Self::Parallel(db) => db.prune(patch), + } + } +} + +#[cfg(test)] +mod tests { + use assert_matches::assert_matches; + use tempfile::TempDir; + + use super::*; + use crate::{ + storage::Operation, + types::{ChildRef, InternalNode, LeafNode, Nibbles}, + Key, RocksDBWrapper, TreeEntry, ValueHash, + }; + + const UPDATED_VERSION: u64 = 10; + + fn mock_patch_set(start: u64, leaf_count: u64) -> PatchSet { + assert!(start <= leaf_count); + + let manifest = Manifest::new(UPDATED_VERSION, &()); + let mut root_node = InternalNode::default(); + root_node.insert_child_ref(0, ChildRef::leaf(UPDATED_VERSION)); + let root = Root::new(leaf_count, Node::Internal(root_node)); + let nodes = (start..leaf_count) + .map(|i| { + let key = Key::from(i); + let node_key = Nibbles::new(&key, 64).with_version(UPDATED_VERSION); + let leaf = LeafNode::new(TreeEntry { + key, + value: ValueHash::zero(), + leaf_index: i + 1, + }); + (node_key, Node::from(leaf)) + }) + .collect(); + PatchSet::new( + manifest, + UPDATED_VERSION, + root, + nodes, + vec![], + Operation::Update, + ) + } + + #[test] + fn database_methods_with_parallel_persistence() { + let temp_dir = TempDir::new().unwrap(); + let db = RocksDBWrapper::new(temp_dir.path()).unwrap(); + + let mut parallel_db = ParallelDatabase::new(db.clone(), UPDATED_VERSION, 1); + assert!(parallel_db.manifest().is_none()); + let manifest = Manifest::new(UPDATED_VERSION, &()); + parallel_db + .apply_patch(PatchSet::from_manifest(manifest)) + .unwrap(); + assert_eq!(parallel_db.commands.len(), 1); + assert_eq!( + parallel_db.manifest().unwrap().version_count, + UPDATED_VERSION + ); + + parallel_db.apply_patch(mock_patch_set(0, 10)).unwrap(); + assert_eq!(parallel_db.root(UPDATED_VERSION).unwrap().leaf_count(), 10); + + let keys: Vec<_> = (0..20) + .map(|i| { + ( + Nibbles::new(&Key::from(i), 64).with_version(UPDATED_VERSION), + true, + ) + }) + .collect(); + + let nodes = parallel_db.tree_nodes(&keys); + for (i, node) in nodes[..10].iter().enumerate() { + assert_matches!( + node.as_ref().unwrap(), + Node::Leaf(leaf) if leaf.leaf_index == i as u64 + 1 + ); + } + for node in &nodes[10..] { + assert!(node.is_none(), "{node:?}"); + } + + parallel_db.apply_patch(mock_patch_set(10, 15)).unwrap(); + + let nodes = parallel_db.tree_nodes(&keys); + for (i, node) in nodes[..15].iter().enumerate() { + assert_matches!( + node.as_ref().unwrap(), + Node::Leaf(leaf) if leaf.leaf_index == i as u64 + 1 + ); + } + for node in &nodes[15..] { + assert!(node.is_none(), "{node:?}"); + } + + parallel_db.wait_sync().unwrap(); + + let nodes = parallel_db.tree_nodes(&keys); + for (i, node) in nodes[..15].iter().enumerate() { + assert_matches!( + node.as_ref().unwrap(), + Node::Leaf(leaf) if leaf.leaf_index == i as u64 + 1 + ); + } + for node in &nodes[15..] { + assert!(node.is_none(), "{node:?}"); + } + + parallel_db.join().unwrap(); + } + + #[test] + fn fault_injection_with_parallel_persistence() { + let temp_dir = TempDir::new().unwrap(); + let db = RocksDBWrapper::new(temp_dir.path()).unwrap(); + + let mut parallel_db = ParallelDatabase::new(db.clone(), UPDATED_VERSION, 4); + let handle = parallel_db.persistence_thread_handle(); + + // Queue up a couple of patch sets + parallel_db.apply_patch(mock_patch_set(0, 5)).unwrap(); + assert_eq!(parallel_db.root(UPDATED_VERSION).unwrap().leaf_count(), 5); + parallel_db.apply_patch(mock_patch_set(5, 10)).unwrap(); + assert_eq!(parallel_db.root(UPDATED_VERSION).unwrap().leaf_count(), 10); + // Emulate the persistence thread stopping (e.g., due to the process crashing) + handle.test_stop_processing(); + + // Queue another patch set. + let err = parallel_db + .apply_patch(mock_patch_set(10, 15)) + .unwrap_err() + .to_string(); + assert!(err.contains("emulated persistence crash"), "{err}"); + + let err = parallel_db.join().unwrap_err().to_string(); + assert!(err.contains("emulated persistence crash"), "{err}"); + + // Check that the last patch set was dropped. + assert_eq!(db.root(UPDATED_VERSION).unwrap().leaf_count(), 10); + } +} diff --git a/core/lib/merkle_tree/src/storage/patch.rs b/core/lib/merkle_tree/src/storage/patch.rs index 329f748a8913..5f3e44c8bef0 100644 --- a/core/lib/merkle_tree/src/storage/patch.rs +++ b/core/lib/merkle_tree/src/storage/patch.rs @@ -3,6 +3,7 @@ use std::{ collections::{hash_map::Entry, HashMap}, iter, + sync::Arc, time::Instant, }; @@ -31,10 +32,24 @@ pub(super) struct PartialPatchSet { } impl PartialPatchSet { + pub fn empty() -> Self { + Self { + root: None, + nodes: HashMap::new(), + } + } + pub fn merge(&mut self, other: Self) { self.root = other.root; self.nodes.extend(other.nodes); } + + pub fn cloned(self: &Arc) -> Self { + Self { + root: self.root.clone(), + nodes: self.nodes.clone(), + } + } } /// Raw set of database changes. diff --git a/core/lib/merkle_tree/tests/integration/recovery.rs b/core/lib/merkle_tree/tests/integration/recovery.rs index 63d3faec3672..0bed36185d7c 100644 --- a/core/lib/merkle_tree/tests/integration/recovery.rs +++ b/core/lib/merkle_tree/tests/integration/recovery.rs @@ -119,6 +119,39 @@ fn test_tree_after_recovery( } } +fn test_parallel_recovery_in_chunks(db: DB, kind: RecoveryKind, chunk_size: usize) +where + DB: PruneDatabase + Clone + 'static, +{ + let (kvs, expected_hash) = &*ENTRIES_AND_HASH; + let mut recovery_entries = kvs.clone(); + if matches!(kind, RecoveryKind::Linear) { + recovery_entries.sort_unstable_by_key(|entry| entry.key); + } + + let recovered_version = 123; + let mut recovery = MerkleTreeRecovery::new(db.clone(), recovered_version).unwrap(); + recovery.parallelize_persistence(4).unwrap(); + for (i, chunk) in recovery_entries.chunks(chunk_size).enumerate() { + match kind { + RecoveryKind::Linear => recovery.extend_linear(chunk.to_vec()).unwrap(), + RecoveryKind::Random => recovery.extend_random(chunk.to_vec()).unwrap(), + } + if i % 3 == 1 { + // need this to ensure that the old persistence thread doesn't corrupt DB + recovery.wait_for_persistence().unwrap(); + recovery = MerkleTreeRecovery::new(db.clone(), recovered_version).unwrap(); + recovery.parallelize_persistence(4).unwrap(); + // ^ Simulate recovery interruption and restart. + } + } + + let mut tree = MerkleTree::new(recovery.finalize().unwrap()).unwrap(); + tree.verify_consistency(recovered_version, true).unwrap(); + // Check that new tree versions can be built and function as expected. + test_tree_after_recovery(&mut tree, recovered_version, *expected_hash); +} + #[test_casing(8, test_casing::Product((RecoveryKind::ALL, [6, 10, 17, 42])))] fn recovery_in_chunks(kind: RecoveryKind, chunk_size: usize) { test_recovery_in_chunks(PatchSet::default(), kind, chunk_size); @@ -136,4 +169,11 @@ mod rocksdb { let db = RocksDBWrapper::new(temp_dir.path()).unwrap(); test_recovery_in_chunks(db, kind, chunk_size); } + + #[test_casing(8, test_casing::Product((RecoveryKind::ALL, [6, 10, 17, 42])))] + fn parallel_recovery_in_chunks(kind: RecoveryKind, chunk_size: usize) { + let temp_dir = TempDir::new().unwrap(); + let db = RocksDBWrapper::new(temp_dir.path()).unwrap(); + test_parallel_recovery_in_chunks(db, kind, chunk_size); + } } diff --git a/core/node/metadata_calculator/src/helpers.rs b/core/node/metadata_calculator/src/helpers.rs index 896f77e8775f..20fd0babaac8 100644 --- a/core/node/metadata_calculator/src/helpers.rs +++ b/core/node/metadata_calculator/src/helpers.rs @@ -21,7 +21,7 @@ use zksync_dal::{Connection, Core, CoreDal}; use zksync_health_check::{CheckHealth, Health, HealthStatus, ReactiveHealthCheck}; use zksync_merkle_tree::{ domain::{TreeMetadata, ZkSyncTree, ZkSyncTreeReader}, - recovery::MerkleTreeRecovery, + recovery::{MerkleTreeRecovery, PersistenceThreadHandle}, Database, Key, MerkleTreeColumnFamily, NoVersionError, RocksDBWrapper, TreeEntry, TreeEntryWithProof, TreeInstruction, }; @@ -33,7 +33,7 @@ use zksync_types::{ use super::{ metrics::{LoadChangesStage, TreeUpdateStage, METRICS}, pruning::PruningHandles, - MetadataCalculatorConfig, + MetadataCalculatorConfig, MetadataCalculatorRecoveryConfig, }; /// General information about the Merkle tree. @@ -408,11 +408,28 @@ impl AsyncTreeRecovery { db: RocksDBWrapper, recovered_version: u64, mode: MerkleTreeMode, + config: &MetadataCalculatorRecoveryConfig, ) -> anyhow::Result { - Ok(Self { - inner: Some(MerkleTreeRecovery::new(db, recovered_version)?), + Ok(Self::with_handle(db, recovered_version, mode, config)?.0) + } + + // Public for testing purposes + pub fn with_handle( + db: RocksDBWrapper, + recovered_version: u64, + mode: MerkleTreeMode, + config: &MetadataCalculatorRecoveryConfig, + ) -> anyhow::Result<(Self, Option)> { + let mut recovery = MerkleTreeRecovery::new(db, recovered_version)?; + let handle = config + .parallel_persistence_buffer + .map(|buffer_capacity| recovery.parallelize_persistence(buffer_capacity.get())) + .transpose()?; + let this = Self { + inner: Some(recovery), mode, - }) + }; + Ok((this, handle)) } pub fn recovered_version(&self) -> u64 { @@ -490,6 +507,14 @@ impl AsyncTreeRecovery { Ok(()) } + /// Waits until all pending chunks are persisted. + pub async fn wait_for_persistence(self) -> anyhow::Result<()> { + let tree = self.inner.expect(Self::INCONSISTENT_MSG); + tokio::task::spawn_blocking(|| tree.wait_for_persistence()) + .await + .context("panicked while waiting for pending recovery chunks to be persisted")? + } + pub async fn finalize(self) -> anyhow::Result { let tree = self.inner.expect(Self::INCONSISTENT_MSG); let db = tokio::task::spawn_blocking(|| tree.finalize()) @@ -514,13 +539,18 @@ pub(super) enum GenericAsyncTree { } impl GenericAsyncTree { - pub async fn new(db: RocksDBWrapper, mode: MerkleTreeMode) -> anyhow::Result { + pub async fn new( + db: RocksDBWrapper, + config: &MetadataCalculatorConfig, + ) -> anyhow::Result { + let mode = config.mode; + let recovery = config.recovery.clone(); tokio::task::spawn_blocking(move || { let Some(manifest) = db.manifest() else { return Ok(Self::Empty { db, mode }); }; anyhow::Ok(if let Some(version) = manifest.recovered_version() { - Self::Recovering(AsyncTreeRecovery::new(db, version, mode)?) + Self::Recovering(AsyncTreeRecovery::new(db, version, mode, &recovery)?) } else { Self::Ready(AsyncTree::new(db, mode)?) }) diff --git a/core/node/metadata_calculator/src/lib.rs b/core/node/metadata_calculator/src/lib.rs index 3462d35e673a..4a422f243f40 100644 --- a/core/node/metadata_calculator/src/lib.rs +++ b/core/node/metadata_calculator/src/lib.rs @@ -2,7 +2,7 @@ //! stores them in the DB. use std::{ - num::NonZeroU32, + num::{NonZeroU32, NonZeroUsize}, sync::Arc, time::{Duration, Instant}, }; @@ -45,12 +45,18 @@ pub struct MetadataCalculatorRecoveryConfig { /// **Important.** This value cannot be changed in the middle of tree recovery (i.e., if a node is stopped in the middle /// of recovery and then restarted with a different config). pub desired_chunk_size: u64, + /// Buffer capacity for parallel persistence operations. Should be reasonably small since larger buffer means more RAM usage; + /// buffer elements are persisted tree chunks. OTOH, small buffer can lead to persistence parallelization being inefficient. + /// + /// If set to `None`, parallel persistence will be disabled. + pub parallel_persistence_buffer: Option, } impl Default for MetadataCalculatorRecoveryConfig { fn default() -> Self { Self { desired_chunk_size: 200_000, + parallel_persistence_buffer: NonZeroUsize::new(4), } } } @@ -208,7 +214,7 @@ impl MetadataCalculator { started_at.elapsed() ); - GenericAsyncTree::new(db, self.config.mode).await + GenericAsyncTree::new(db, &self.config).await } pub async fn run(self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { diff --git a/core/node/metadata_calculator/src/recovery/mod.rs b/core/node/metadata_calculator/src/recovery/mod.rs index b5e70213fac9..b4e91bf720ee 100644 --- a/core/node/metadata_calculator/src/recovery/mod.rs +++ b/core/node/metadata_calculator/src/recovery/mod.rs @@ -189,7 +189,7 @@ impl GenericAsyncTree { "Starting Merkle tree recovery with status {snapshot_recovery:?}" ); let l1_batch = snapshot_recovery.l1_batch_number; - let tree = AsyncTreeRecovery::new(db, l1_batch.0.into(), mode)?; + let tree = AsyncTreeRecovery::new(db, l1_batch.0.into(), mode, config)?; (tree, snapshot_recovery) } else { // Start the tree from scratch. The genesis block will be filled in `TreeUpdater::loop_updating_tree()`. @@ -267,12 +267,15 @@ impl AsyncTreeRecovery { }); future::try_join_all(chunk_tasks).await?; + let mut tree = tree.into_inner(); if *stop_receiver.borrow() { + // Waiting for persistence is mostly useful for tests. Normally, the tree database won't be used in the same process + // after a stop signal is received, so there's no risk of data races with the background persistence thread. + tree.wait_for_persistence().await?; return Ok(None); } let finalize_latency = RECOVERY_METRICS.latency[&RecoveryStage::Finalize].start(); - let mut tree = tree.into_inner(); let actual_root_hash = tree.root_hash().await; anyhow::ensure!( actual_root_hash == snapshot.expected_root_hash, diff --git a/core/node/metadata_calculator/src/recovery/tests.rs b/core/node/metadata_calculator/src/recovery/tests.rs index b4c8aca1d4d8..f8edd3e5678d 100644 --- a/core/node/metadata_calculator/src/recovery/tests.rs +++ b/core/node/metadata_calculator/src/recovery/tests.rs @@ -1,10 +1,10 @@ //! Tests for metadata calculator snapshot recovery. -use std::path::Path; +use std::{path::Path, sync::Mutex}; use assert_matches::assert_matches; use tempfile::TempDir; -use test_casing::test_casing; +use test_casing::{test_casing, Product}; use tokio::sync::mpsc; use zksync_config::configs::{ chain::OperationsManagerConfig, @@ -12,7 +12,7 @@ use zksync_config::configs::{ }; use zksync_dal::CoreDal; use zksync_health_check::{CheckHealth, HealthStatus, ReactiveHealthCheck}; -use zksync_merkle_tree::{domain::ZkSyncTree, TreeInstruction}; +use zksync_merkle_tree::{domain::ZkSyncTree, recovery::PersistenceThreadHandle, TreeInstruction}; use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; use zksync_node_test_utils::prepare_recovery_snapshot; use zksync_types::{L1BatchNumber, ProtocolVersionId, StorageLog}; @@ -44,9 +44,13 @@ fn calculating_chunk_count() { assert_eq!(snapshot.chunk_count(), 1); } -async fn create_tree_recovery(path: &Path, l1_batch: L1BatchNumber) -> AsyncTreeRecovery { +async fn create_tree_recovery( + path: &Path, + l1_batch: L1BatchNumber, + config: &MetadataCalculatorRecoveryConfig, +) -> (AsyncTreeRecovery, Option) { let db = create_db(mock_config(path)).await.unwrap(); - AsyncTreeRecovery::new(db, l1_batch.0.into(), MerkleTreeMode::Full).unwrap() + AsyncTreeRecovery::with_handle(db, l1_batch.0.into(), MerkleTreeMode::Full, config).unwrap() } #[tokio::test] @@ -66,7 +70,7 @@ async fn basic_recovery_workflow() { println!("Recovering tree with {chunk_count} chunks"); let tree_path = temp_dir.path().join(format!("recovery-{chunk_count}")); - let tree = create_tree_recovery(&tree_path, L1BatchNumber(1)).await; + let (tree, _) = create_tree_recovery(&tree_path, L1BatchNumber(1), &config).await; let (health_check, health_updater) = ReactiveHealthCheck::new("tree"); let recovery_options = RecoveryOptions { chunk_count, @@ -128,6 +132,7 @@ async fn prepare_recovery_snapshot_with_genesis( struct TestEventListener { expected_recovered_chunks: u64, stop_threshold: u64, + persistence_handle: Mutex>, processed_chunk_count: AtomicU64, stop_sender: watch::Sender, } @@ -137,6 +142,7 @@ impl TestEventListener { Self { expected_recovered_chunks: 0, stop_threshold, + persistence_handle: Mutex::default(), processed_chunk_count: AtomicU64::new(0), stop_sender, } @@ -146,6 +152,16 @@ impl TestEventListener { self.expected_recovered_chunks = count; self } + + fn crash_persistence_after( + mut self, + chunk_count: u64, + handle: PersistenceThreadHandle, + ) -> Self { + assert!(chunk_count < self.stop_threshold); + self.persistence_handle = Mutex::new(Some((handle, chunk_count))); + self + } } impl HandleRecoveryEvent for TestEventListener { @@ -158,66 +174,49 @@ impl HandleRecoveryEvent for TestEventListener { if processed_chunk_count >= self.stop_threshold { self.stop_sender.send_replace(true); } + + let mut persistence_handle = self.persistence_handle.lock().unwrap(); + if let Some((_, crash_threshold)) = &*persistence_handle { + if processed_chunk_count >= *crash_threshold { + let (handle, _) = persistence_handle.take().unwrap(); + handle.test_stop_processing(); + } + } } } -#[tokio::test] -async fn recovery_detects_incorrect_chunk_size_change() { - let pool = ConnectionPool::::test_pool().await; - let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let snapshot_recovery = prepare_recovery_snapshot_with_genesis(pool.clone(), &temp_dir).await; - - let tree_path = temp_dir.path().join("recovery"); - let tree = create_tree_recovery(&tree_path, L1BatchNumber(1)).await; - let (stop_sender, stop_receiver) = watch::channel(false); - let recovery_options = RecoveryOptions { - chunk_count: 5, - concurrency_limit: 1, - events: Box::new(TestEventListener::new(1, stop_sender)), - }; - let config = MetadataCalculatorRecoveryConfig::default(); - let mut snapshot = SnapshotParameters::new(&pool, &snapshot_recovery, &config) - .await - .unwrap(); - assert!(tree - .recover(snapshot, recovery_options, &pool, &stop_receiver) - .await - .unwrap() - .is_none()); - - let tree = create_tree_recovery(&tree_path, L1BatchNumber(1)).await; - let health_updater = ReactiveHealthCheck::new("tree").1; - let recovery_options = RecoveryOptions { - chunk_count: 5, - concurrency_limit: 1, - events: Box::new(RecoveryHealthUpdater::new(&health_updater)), - }; - snapshot.desired_chunk_size /= 2; +#[derive(Debug, Clone, Copy)] +enum FaultToleranceCase { + Sequential, + Parallel, + ParallelWithCrash, +} - let err = tree - .recover(snapshot, recovery_options, &pool, &stop_receiver) - .await - .unwrap_err() - .to_string(); - assert!(err.contains("desired chunk size"), "{err}"); +impl FaultToleranceCase { + const ALL: [Self; 3] = [Self::Sequential, Self::Parallel, Self::ParallelWithCrash]; } -#[test_casing(3, [5, 7, 8])] +#[test_casing(9, Product(([5, 7, 8], FaultToleranceCase::ALL)))] #[tokio::test] -async fn recovery_fault_tolerance(chunk_count: u64) { +async fn recovery_fault_tolerance(chunk_count: u64, case: FaultToleranceCase) { let pool = ConnectionPool::::test_pool().await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); let snapshot_recovery = prepare_recovery_snapshot_with_genesis(pool.clone(), &temp_dir).await; let tree_path = temp_dir.path().join("recovery"); - let tree = create_tree_recovery(&tree_path, L1BatchNumber(1)).await; + let mut config = MetadataCalculatorRecoveryConfig::default(); + assert!(config.parallel_persistence_buffer.is_some()); + if matches!(case, FaultToleranceCase::Sequential) { + config.parallel_persistence_buffer = None; + } + + let (tree, _) = create_tree_recovery(&tree_path, L1BatchNumber(1), &config).await; let (stop_sender, stop_receiver) = watch::channel(false); let recovery_options = RecoveryOptions { chunk_count, concurrency_limit: 1, events: Box::new(TestEventListener::new(1, stop_sender)), }; - let config = MetadataCalculatorRecoveryConfig::default(); let snapshot = SnapshotParameters::new(&pool, &snapshot_recovery, &config) .await .unwrap(); @@ -227,30 +226,44 @@ async fn recovery_fault_tolerance(chunk_count: u64) { .unwrap() .is_none()); - // Emulate a restart and recover 2 more chunks. - let mut tree = create_tree_recovery(&tree_path, L1BatchNumber(1)).await; + // Emulate a restart and recover 2 more chunks (or 1 + emulated persistence crash). + let (mut tree, handle) = create_tree_recovery(&tree_path, L1BatchNumber(1), &config).await; assert_ne!(tree.root_hash().await, snapshot_recovery.l1_batch_root_hash); let (stop_sender, stop_receiver) = watch::channel(false); - let event_listener = TestEventListener::new(2, stop_sender).expect_recovered_chunks(1); + let mut event_listener = TestEventListener::new(2, stop_sender).expect_recovered_chunks(1); + let expected_recovered_chunks = if matches!(case, FaultToleranceCase::ParallelWithCrash) { + event_listener = event_listener.crash_persistence_after(1, handle.unwrap()); + 2 + } else { + drop(handle); // necessary to terminate the background persistence thread in time + 3 + }; let recovery_options = RecoveryOptions { chunk_count, concurrency_limit: 1, events: Box::new(event_listener), }; - assert!(tree + let recovery_result = tree .recover(snapshot, recovery_options, &pool, &stop_receiver) - .await - .unwrap() - .is_none()); + .await; + if matches!(case, FaultToleranceCase::ParallelWithCrash) { + let err = format!("{:#}", recovery_result.unwrap_err()); + assert!(err.contains("emulated persistence crash"), "{err}"); + } else { + assert!(recovery_result.unwrap().is_none()); + } // Emulate another restart and recover remaining chunks. - let mut tree = create_tree_recovery(&tree_path, L1BatchNumber(1)).await; + let (mut tree, _) = create_tree_recovery(&tree_path, L1BatchNumber(1), &config).await; assert_ne!(tree.root_hash().await, snapshot_recovery.l1_batch_root_hash); let (stop_sender, stop_receiver) = watch::channel(false); let recovery_options = RecoveryOptions { chunk_count, concurrency_limit: 1, - events: Box::new(TestEventListener::new(u64::MAX, stop_sender).expect_recovered_chunks(3)), + events: Box::new( + TestEventListener::new(u64::MAX, stop_sender) + .expect_recovered_chunks(expected_recovered_chunks), + ), }; let tree = tree .recover(snapshot, recovery_options, &pool, &stop_receiver) diff --git a/core/tests/recovery-test/tests/snapshot-recovery.test.ts b/core/tests/recovery-test/tests/snapshot-recovery.test.ts index 47350921d5a1..3a5d3b7ef57c 100644 --- a/core/tests/recovery-test/tests/snapshot-recovery.test.ts +++ b/core/tests/recovery-test/tests/snapshot-recovery.test.ts @@ -77,7 +77,9 @@ describe('snapshot recovery', () => { let externalNodeEnv: { [key: string]: string } = { ...process.env, ZKSYNC_ENV: externalNodeEnvProfile, - EN_SNAPSHOTS_RECOVERY_ENABLED: 'true' + EN_SNAPSHOTS_RECOVERY_ENABLED: 'true', + // Test parallel persistence for tree recovery, which is (yet) not enabled by default + EN_EXPERIMENTAL_SNAPSHOTS_RECOVERY_TREE_PARALLEL_PERSISTENCE_BUFFER: '4' }; let snapshotMetadata: GetSnapshotResponse;