From 363b4f09937496fadeb38857f5c0c73146995ce5 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 12 Nov 2024 11:20:45 +0200 Subject: [PATCH] fix(merkle-tree): Repair stale keys for tree in background (#3200) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Implements a background task to remove bogus stale keys for the Merkle tree. ## Why ❔ These keys could have been produced during L1 batch reverts before https://github.com/matter-labs/zksync-era/pull/3178. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zkstack dev fmt` and `zkstack dev lint`. --- core/bin/external_node/src/config/mod.rs | 9 + core/bin/external_node/src/node_builder.rs | 5 + core/lib/config/src/configs/experimental.rs | 4 + core/lib/config/src/testonly.rs | 1 + core/lib/env_config/src/database.rs | 4 + core/lib/merkle_tree/src/domain.rs | 5 + core/lib/merkle_tree/src/lib.rs | 16 + core/lib/merkle_tree/src/pruning.rs | 39 +- core/lib/merkle_tree/src/repair.rs | 376 ++++++++++++++++++ core/lib/merkle_tree/src/storage/rocksdb.rs | 98 ++++- .../merkle_tree/src/storage/serialization.rs | 13 + core/lib/merkle_tree/src/utils.rs | 43 ++ core/lib/protobuf_config/src/experimental.rs | 8 +- .../src/proto/config/experimental.proto | 3 +- .../src/api_server/metrics.rs | 1 + .../metadata_calculator/src/api_server/mod.rs | 15 + .../src/api_server/tests.rs | 15 + core/node/metadata_calculator/src/helpers.rs | 14 + core/node/metadata_calculator/src/lib.rs | 7 + core/node/metadata_calculator/src/repair.rs | 258 ++++++++++++ .../layers/metadata_calculator.rs | 30 +- 21 files changed, 923 insertions(+), 41 deletions(-) create mode 100644 core/lib/merkle_tree/src/repair.rs create mode 100644 core/node/metadata_calculator/src/repair.rs diff --git a/core/bin/external_node/src/config/mod.rs b/core/bin/external_node/src/config/mod.rs index 0a94f993656a..81604f83008a 100644 --- a/core/bin/external_node/src/config/mod.rs +++ b/core/bin/external_node/src/config/mod.rs @@ -408,6 +408,9 @@ pub(crate) struct OptionalENConfig { /// Timeout to wait for the Merkle tree database to run compaction on stalled writes. #[serde(default = "OptionalENConfig::default_merkle_tree_stalled_writes_timeout_sec")] merkle_tree_stalled_writes_timeout_sec: u64, + /// Enables the stale keys repair task for the Merkle tree. + #[serde(default)] + pub merkle_tree_repair_stale_keys: bool, // Postgres config (new parameters) /// Threshold in milliseconds for the DB connection lifetime to denote it as long-living and log its details. @@ -639,6 +642,12 @@ impl OptionalENConfig { merkle_tree.stalled_writes_timeout_sec, default_merkle_tree_stalled_writes_timeout_sec ), + merkle_tree_repair_stale_keys: general_config + .db_config + .as_ref() + .map_or(false, |config| { + config.experimental.merkle_tree_repair_stale_keys + }), database_long_connection_threshold_ms: load_config!( general_config.postgres_config, long_connection_threshold_ms diff --git a/core/bin/external_node/src/node_builder.rs b/core/bin/external_node/src/node_builder.rs index b7f6f8039025..5c70fd436781 100644 --- a/core/bin/external_node/src/node_builder.rs +++ b/core/bin/external_node/src/node_builder.rs @@ -378,6 +378,11 @@ impl ExternalNodeBuilder { layer = layer.with_tree_api_config(merkle_tree_api_config); } + // Add stale keys repair task if requested. + if self.config.optional.merkle_tree_repair_stale_keys { + layer = layer.with_stale_keys_repair(); + } + // Add tree pruning if needed. if self.config.optional.pruning_enabled { layer = layer.with_pruning_config(self.config.optional.pruning_removal_delay()); diff --git a/core/lib/config/src/configs/experimental.rs b/core/lib/config/src/configs/experimental.rs index a87a221ef222..2553864e251d 100644 --- a/core/lib/config/src/configs/experimental.rs +++ b/core/lib/config/src/configs/experimental.rs @@ -29,6 +29,9 @@ pub struct ExperimentalDBConfig { /// correspondingly; otherwise, RocksDB performance can significantly degrade. #[serde(default)] pub include_indices_and_filters_in_block_cache: bool, + /// Enables the stale keys repair task for the Merkle tree. + #[serde(default)] + pub merkle_tree_repair_stale_keys: bool, } impl Default for ExperimentalDBConfig { @@ -40,6 +43,7 @@ impl Default for ExperimentalDBConfig { protective_reads_persistence_enabled: false, processing_delay_ms: Self::default_merkle_tree_processing_delay_ms(), include_indices_and_filters_in_block_cache: false, + merkle_tree_repair_stale_keys: false, } } } diff --git a/core/lib/config/src/testonly.rs b/core/lib/config/src/testonly.rs index 93d502cc4e8a..c24d47f27b33 100644 --- a/core/lib/config/src/testonly.rs +++ b/core/lib/config/src/testonly.rs @@ -305,6 +305,7 @@ impl Distribution for EncodeDist { protective_reads_persistence_enabled: self.sample(rng), processing_delay_ms: self.sample(rng), include_indices_and_filters_in_block_cache: self.sample(rng), + merkle_tree_repair_stale_keys: self.sample(rng), } } } diff --git a/core/lib/env_config/src/database.rs b/core/lib/env_config/src/database.rs index 119d64b7738c..ae4c3059ce32 100644 --- a/core/lib/env_config/src/database.rs +++ b/core/lib/env_config/src/database.rs @@ -88,6 +88,7 @@ mod tests { DATABASE_MERKLE_TREE_MAX_L1_BATCHES_PER_ITER=50 DATABASE_EXPERIMENTAL_STATE_KEEPER_DB_BLOCK_CACHE_CAPACITY_MB=64 DATABASE_EXPERIMENTAL_STATE_KEEPER_DB_MAX_OPEN_FILES=100 + DATABASE_EXPERIMENTAL_MERKLE_TREE_REPAIR_STALE_KEYS=true "#; lock.set_env(config); @@ -109,6 +110,7 @@ mod tests { db_config.experimental.state_keeper_db_max_open_files, NonZeroU32::new(100) ); + assert!(db_config.experimental.merkle_tree_repair_stale_keys); } #[test] @@ -118,6 +120,7 @@ mod tests { "DATABASE_STATE_KEEPER_DB_PATH", "DATABASE_EXPERIMENTAL_STATE_KEEPER_DB_MAX_OPEN_FILES", "DATABASE_EXPERIMENTAL_STATE_KEEPER_DB_BLOCK_CACHE_CAPACITY_MB", + "DATABASE_EXPERIMENTAL_MERKLE_TREE_REPAIR_STALE_KEYS", "DATABASE_MERKLE_TREE_BACKUP_PATH", "DATABASE_MERKLE_TREE_PATH", "DATABASE_MERKLE_TREE_MODE", @@ -144,6 +147,7 @@ mod tests { 128 ); assert_eq!(db_config.experimental.state_keeper_db_max_open_files, None); + assert!(!db_config.experimental.merkle_tree_repair_stale_keys); // Check that new env variable for Merkle tree path is supported lock.set_env("DATABASE_MERKLE_TREE_PATH=/db/tree/main"); diff --git a/core/lib/merkle_tree/src/domain.rs b/core/lib/merkle_tree/src/domain.rs index 5064c791ed5b..5265f93264f2 100644 --- a/core/lib/merkle_tree/src/domain.rs +++ b/core/lib/merkle_tree/src/domain.rs @@ -410,6 +410,11 @@ impl ZkSyncTreeReader { &self.0.db } + /// Converts this reader to the underlying DB. + pub fn into_db(self) -> RocksDBWrapper { + self.0.db + } + /// Returns the root hash and leaf count at the specified L1 batch. pub fn root_info(&self, l1_batch_number: L1BatchNumber) -> Option<(ValueHash, u64)> { let root = self.0.root(l1_batch_number.0.into())?; diff --git a/core/lib/merkle_tree/src/lib.rs b/core/lib/merkle_tree/src/lib.rs index 5e97d6d77c69..1782f373954c 100644 --- a/core/lib/merkle_tree/src/lib.rs +++ b/core/lib/merkle_tree/src/lib.rs @@ -71,6 +71,7 @@ mod hasher; mod metrics; mod pruning; pub mod recovery; +pub mod repair; mod storage; mod types; mod utils; @@ -200,6 +201,21 @@ impl MerkleTree { root.unwrap_or(Root::Empty) } + /// Incorrect version of [`Self::truncate_recent_versions()`] that doesn't remove stale keys for the truncated tree versions. + #[cfg(test)] + fn truncate_recent_versions_incorrectly( + &mut self, + retained_version_count: u64, + ) -> anyhow::Result<()> { + let mut manifest = self.db.manifest().unwrap_or_default(); + if manifest.version_count > retained_version_count { + manifest.version_count = retained_version_count; + let patch = PatchSet::from_manifest(manifest); + self.db.apply_patch(patch)?; + } + Ok(()) + } + /// Extends this tree by creating its new version. /// /// # Return value diff --git a/core/lib/merkle_tree/src/pruning.rs b/core/lib/merkle_tree/src/pruning.rs index 2e328d0a2bb5..ae8300b893ab 100644 --- a/core/lib/merkle_tree/src/pruning.rs +++ b/core/lib/merkle_tree/src/pruning.rs @@ -250,6 +250,7 @@ mod tests { use super::*; use crate::{ types::{Node, NodeKey}, + utils::testonly::setup_tree_with_stale_keys, Database, Key, MerkleTree, PatchSet, RocksDBWrapper, TreeEntry, ValueHash, }; @@ -507,47 +508,17 @@ mod tests { test_keys_are_removed_by_pruning_when_overwritten_in_multiple_batches(true); } - fn test_pruning_with_truncation(db: impl PruneDatabase) { - let mut tree = MerkleTree::new(db).unwrap(); - let kvs: Vec<_> = (0_u64..100) - .map(|i| TreeEntry::new(Key::from(i), i + 1, ValueHash::zero())) - .collect(); - tree.extend(kvs).unwrap(); - - let overridden_kvs = vec![TreeEntry::new( - Key::from(0), - 1, - ValueHash::repeat_byte(0xaa), - )]; - tree.extend(overridden_kvs).unwrap(); - - let stale_keys = tree.db.stale_keys(1); - assert!( - stale_keys.iter().any(|key| !key.is_empty()), - "{stale_keys:?}" - ); - - // Revert `overridden_kvs`. - tree.truncate_recent_versions(1).unwrap(); - assert_eq!(tree.latest_version(), Some(0)); - let future_stale_keys = tree.db.stale_keys(1); - assert!(future_stale_keys.is_empty()); - - // Add a new version without the key. To make the matter more egregious, the inserted key - // differs from all existing keys, starting from the first nibble. - let new_key = Key::from_big_endian(&[0xaa; 32]); - let new_kvs = vec![TreeEntry::new(new_key, 101, ValueHash::repeat_byte(0xaa))]; - tree.extend(new_kvs).unwrap(); - assert_eq!(tree.latest_version(), Some(1)); + fn test_pruning_with_truncation(mut db: impl PruneDatabase) { + setup_tree_with_stale_keys(&mut db, false); - let stale_keys = tree.db.stale_keys(1); + let stale_keys = db.stale_keys(1); assert_eq!(stale_keys.len(), 1); assert!( stale_keys[0].is_empty() && stale_keys[0].version == 0, "{stale_keys:?}" ); - let (mut pruner, _) = MerkleTreePruner::new(tree.db); + let (mut pruner, _) = MerkleTreePruner::new(db); let prunable_version = pruner.last_prunable_version().unwrap(); assert_eq!(prunable_version, 1); let stats = pruner diff --git a/core/lib/merkle_tree/src/repair.rs b/core/lib/merkle_tree/src/repair.rs new file mode 100644 index 000000000000..c83569e96b13 --- /dev/null +++ b/core/lib/merkle_tree/src/repair.rs @@ -0,0 +1,376 @@ +//! Service tasks for the Merkle tree. + +use std::{ + ops, + sync::{mpsc, Arc, Mutex}, + time::{Duration, Instant}, +}; + +use anyhow::Context as _; +use rayon::prelude::*; + +use crate::{ + types::{NodeKey, StaleNodeKey}, + Database, PruneDatabase, RocksDBWrapper, +}; + +/// Persisted information about stale keys repair progress. +#[derive(Debug)] +pub(crate) struct StaleKeysRepairData { + pub next_version: u64, +} + +/// [`StaleKeysRepairTask`] progress stats. +#[derive(Debug, Clone, Default)] +pub struct StaleKeysRepairStats { + /// Versions checked by the task, or `None` if no versions have been checked. + pub checked_versions: Option>, + /// Number of repaired stale keys. + pub repaired_key_count: usize, +} + +#[derive(Debug)] +struct StepStats { + checked_versions: ops::RangeInclusive, + repaired_key_count: usize, +} + +/// Handle for a [`StaleKeysRepairTask`] allowing to abort its operation. +/// +/// The task is aborted once the handle is dropped. +#[must_use = "Paired `StaleKeysRepairTask` is aborted once handle is dropped"] +#[derive(Debug)] +pub struct StaleKeysRepairHandle { + stats: Arc>, + _aborted_sender: mpsc::Sender<()>, +} + +impl StaleKeysRepairHandle { + /// Returns stats for the paired task. + #[allow(clippy::missing_panics_doc)] // mutex poisoning shouldn't happen + pub fn stats(&self) -> StaleKeysRepairStats { + self.stats.lock().expect("stats mutex poisoned").clone() + } +} + +/// Task that repairs stale keys for the tree. +/// +/// Early tree versions contained a bug: If a tree version was truncated, stale keys for it remained intact. +/// If an overwritten tree version did not contain the same keys, this could lead to keys incorrectly marked as stale, +/// meaning that after pruning, a tree may end up broken. +#[derive(Debug)] +pub struct StaleKeysRepairTask { + db: RocksDBWrapper, + parallelism: u64, + poll_interval: Duration, + stats: Arc>, + aborted_receiver: mpsc::Receiver<()>, +} + +impl StaleKeysRepairTask { + /// Creates a new task. + pub fn new(db: RocksDBWrapper) -> (Self, StaleKeysRepairHandle) { + let (aborted_sender, aborted_receiver) = mpsc::channel(); + let stats = Arc::>::default(); + let this = Self { + db, + parallelism: (rayon::current_num_threads() as u64).max(1), + poll_interval: Duration::from_secs(60), + stats: stats.clone(), + aborted_receiver, + }; + let handle = StaleKeysRepairHandle { + stats, + _aborted_sender: aborted_sender, + }; + (this, handle) + } + + /// Sets the poll interval for this task. + pub fn set_poll_interval(&mut self, poll_interval: Duration) { + self.poll_interval = poll_interval; + } + + /// Runs stale key detection for a single tree version. + #[tracing::instrument(skip(db))] + pub fn bogus_stale_keys(db: &RocksDBWrapper, version: u64) -> Vec { + const SAMPLE_COUNT: usize = 5; + + let version_keys = db.all_keys_for_version(version).unwrap_or_else(|err| { + panic!("failed loading keys changed in tree version {version}: {err}") + }); + let stale_keys = db.stale_keys(version); + + if !version_keys.unreachable_keys.is_empty() { + let keys_sample: Vec<_> = version_keys + .unreachable_keys + .iter() + .take(SAMPLE_COUNT) + .collect::>(); + tracing::warn!( + version, + unreachable_keys.len = version_keys.unreachable_keys.len(), + unreachable_keys.sample = ?keys_sample, + "Found unreachable keys in tree" + ); + } + + let mut bogus_stale_keys = vec![]; + for stale_key in stale_keys { + if version_keys.valid_keys.contains(&stale_key.nibbles) { + // Normal case: a new node obsoletes a previous version. + } else if version_keys.unreachable_keys.contains(&stale_key.nibbles) { + // Explainable bogus stale key: a node that was updated in `version` before the truncation is no longer updated after truncation. + bogus_stale_keys.push(stale_key); + } else { + tracing::warn!( + version, + ?stale_key, + "Unexplained bogus stale key: not present in any nodes changed in the tree version" + ); + bogus_stale_keys.push(stale_key); + } + } + + if bogus_stale_keys.is_empty() { + return vec![]; + } + + let keys_sample: Vec<_> = bogus_stale_keys.iter().take(SAMPLE_COUNT).collect(); + tracing::info!( + stale_keys.len = bogus_stale_keys.len(), + stale_keys.sample = ?keys_sample, + "Found bogus stale keys" + ); + bogus_stale_keys + } + + /// Returns a boolean flag indicating whether the task data was updated. + fn step(&mut self) -> anyhow::Result> { + let repair_data = self + .db + .stale_keys_repair_data() + .context("failed getting repair data")?; + let min_stale_key_version = self.db.min_stale_key_version(); + let start_version = match (repair_data, min_stale_key_version) { + (_, None) => { + tracing::debug!("No stale keys in tree, nothing to do"); + return Ok(None); + } + (None, Some(version)) => version, + (Some(data), Some(version)) => data.next_version.max(version), + }; + + let latest_version = self + .db + .manifest() + .and_then(|manifest| manifest.version_count.checked_sub(1)); + let Some(latest_version) = latest_version else { + tracing::warn!( + min_stale_key_version, + "Tree has stale keys, but no latest versions" + ); + return Ok(None); + }; + + let end_version = (start_version + self.parallelism - 1).min(latest_version); + let versions = start_version..=end_version; + if versions.is_empty() { + tracing::debug!(?versions, latest_version, "No tree versions to check"); + return Ok(None); + } + + tracing::debug!( + ?versions, + latest_version, + ?min_stale_key_version, + "Checking stale keys" + ); + + let stale_keys = versions + .clone() + .into_par_iter() + .map(|version| { + Self::bogus_stale_keys(&self.db, version) + .into_iter() + .map(|key| StaleNodeKey::new(key, version)) + .collect::>() + }) + .reduce(Vec::new, |mut acc, keys| { + acc.extend(keys); + acc + }); + self.update_task_data(versions.clone(), &stale_keys)?; + + Ok(Some(StepStats { + checked_versions: versions, + repaired_key_count: stale_keys.len(), + })) + } + + #[tracing::instrument( + level = "debug", + err, + skip(self, removed_keys), + fields(removed_keys.len = removed_keys.len()), + )] + fn update_task_data( + &mut self, + versions: ops::RangeInclusive, + removed_keys: &[StaleNodeKey], + ) -> anyhow::Result<()> { + tracing::debug!("Updating task data"); + let started_at = Instant::now(); + let new_data = StaleKeysRepairData { + next_version: *versions.end() + 1, + }; + self.db + .repair_stale_keys(&new_data, removed_keys) + .context("failed removing bogus stale keys")?; + let latency = started_at.elapsed(); + tracing::debug!(?latency, "Updated task data"); + Ok(()) + } + + fn wait_for_abort(&mut self, timeout: Duration) -> bool { + match self.aborted_receiver.recv_timeout(timeout) { + Ok(()) | Err(mpsc::RecvTimeoutError::Disconnected) => true, + Err(mpsc::RecvTimeoutError::Timeout) => false, + } + } + + fn update_stats(&self, step_stats: StepStats) { + let mut stats = self.stats.lock().expect("stats mutex poisoned"); + if let Some(versions) = &mut stats.checked_versions { + *versions = *versions.start()..=*step_stats.checked_versions.end(); + } else { + stats.checked_versions = Some(step_stats.checked_versions); + } + stats.repaired_key_count += step_stats.repaired_key_count; + } + + /// Runs this task indefinitely. + /// + /// # Errors + /// + /// Propagates RocksDB I/O errors. + pub fn run(mut self) -> anyhow::Result<()> { + let repair_data = self + .db + .stale_keys_repair_data() + .context("failed getting repair data")?; + tracing::info!( + paralellism = self.parallelism, + poll_interval = ?self.poll_interval, + ?repair_data, + "Starting repair task" + ); + + let mut wait_interval = Duration::ZERO; + while !self.wait_for_abort(wait_interval) { + wait_interval = if let Some(step_stats) = self.step()? { + self.update_stats(step_stats); + Duration::ZERO + } else { + self.poll_interval + }; + } + tracing::info!("Stop signal received, stale keys repair is shut down"); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::thread; + + use super::*; + use crate::{ + utils::testonly::setup_tree_with_stale_keys, Key, MerkleTree, MerkleTreePruner, TreeEntry, + ValueHash, + }; + + #[test] + fn stale_keys_repair_with_normal_tree() { + let temp_dir = tempfile::TempDir::new().unwrap(); + let mut db = RocksDBWrapper::new(temp_dir.path()).unwrap(); + + // The task should work fine with future tree versions. + for version in [0, 1, 100] { + let bogus_stale_keys = StaleKeysRepairTask::bogus_stale_keys(&db, version); + assert!(bogus_stale_keys.is_empty()); + } + + let kvs: Vec<_> = (0_u64..100) + .map(|i| TreeEntry::new(Key::from(i), i + 1, ValueHash::zero())) + .collect(); + MerkleTree::new(&mut db).unwrap().extend(kvs).unwrap(); + + let bogus_stale_keys = StaleKeysRepairTask::bogus_stale_keys(&db, 0); + assert!(bogus_stale_keys.is_empty()); + } + + #[test] + fn detecting_bogus_stale_keys() { + let temp_dir = tempfile::TempDir::new().unwrap(); + let mut db = RocksDBWrapper::new(temp_dir.path()).unwrap(); + setup_tree_with_stale_keys(&mut db, true); + + let bogus_stale_keys = StaleKeysRepairTask::bogus_stale_keys(&db, 1); + assert!(!bogus_stale_keys.is_empty()); + + let (mut task, _handle) = StaleKeysRepairTask::new(db); + task.parallelism = 10; // Ensure that all tree versions are checked at once. + // Repair the tree. + let step_stats = task.step().unwrap().expect("tree was not repaired"); + assert_eq!(step_stats.checked_versions, 1..=1); + assert!(step_stats.repaired_key_count > 0); + // Check that the tree works fine once it's pruned. + let (mut pruner, _) = MerkleTreePruner::new(&mut task.db); + pruner.prune_up_to(1).unwrap().expect("tree was not pruned"); + + MerkleTree::new(&mut task.db) + .unwrap() + .verify_consistency(1, false) + .unwrap(); + + let bogus_stale_keys = StaleKeysRepairTask::bogus_stale_keys(&task.db, 1); + assert!(bogus_stale_keys.is_empty()); + MerkleTree::new(&mut task.db) + .unwrap() + .verify_consistency(1, false) + .unwrap(); + + assert!(task.step().unwrap().is_none()); + } + + #[test] + fn full_stale_keys_task_workflow() { + let temp_dir = tempfile::TempDir::new().unwrap(); + let mut db = RocksDBWrapper::new(temp_dir.path()).unwrap(); + setup_tree_with_stale_keys(&mut db, true); + + let (task, handle) = StaleKeysRepairTask::new(db.clone()); + let task_thread = thread::spawn(|| task.run()); + + loop { + if let Some(task_data) = db.stale_keys_repair_data().unwrap() { + if task_data.next_version == 2 { + // All tree versions are processed. + break; + } + } + thread::sleep(Duration::from_millis(50)); + } + let stats = handle.stats(); + assert_eq!(stats.checked_versions, Some(1..=1)); + assert!(stats.repaired_key_count > 0, "{stats:?}"); + + assert!(!task_thread.is_finished()); + drop(handle); + task_thread.join().unwrap().unwrap(); + + let bogus_stale_keys = StaleKeysRepairTask::bogus_stale_keys(&db, 1); + assert!(bogus_stale_keys.is_empty()); + } +} diff --git a/core/lib/merkle_tree/src/storage/rocksdb.rs b/core/lib/merkle_tree/src/storage/rocksdb.rs index 6995bbfbfc7f..5a40c82b680c 100644 --- a/core/lib/merkle_tree/src/storage/rocksdb.rs +++ b/core/lib/merkle_tree/src/storage/rocksdb.rs @@ -1,6 +1,13 @@ //! RocksDB implementation of [`Database`]. -use std::{any::Any, cell::RefCell, ops, path::Path, sync::Arc}; +use std::{ + any::Any, + cell::RefCell, + collections::{HashMap, HashSet}, + ops, + path::Path, + sync::Arc, +}; use anyhow::Context as _; use rayon::prelude::*; @@ -15,6 +22,7 @@ use zksync_storage::{ use crate::{ errors::{DeserializeError, ErrorContext}, metrics::ApplyPatchStats, + repair::StaleKeysRepairData, storage::{ database::{PruneDatabase, PrunePatchSet}, Database, NodeKeys, PatchSet, @@ -70,6 +78,15 @@ impl ToDbKey for (NodeKey, bool) { } } +/// All node keys modified in a certain version of the tree, loaded via a prefix iterator. +#[derive(Debug, Default)] +pub(crate) struct VersionKeys { + /// Valid / reachable keys modified in the version. + pub valid_keys: HashSet, + /// Unreachable keys modified in the version, e.g. as a result of truncating the tree and overwriting the version. + pub unreachable_keys: HashSet, +} + /// Main [`Database`] implementation wrapping a [`RocksDB`] reference. /// /// # Cloning @@ -97,6 +114,8 @@ impl RocksDBWrapper { // since the minimum node key is [0, 0, 0, 0, 0, 0, 0, 0]. const MANIFEST_KEY: &'static [u8] = &[0]; + const STALE_KEYS_REPAIR_KEY: &'static [u8] = &[0, 0]; + /// Creates a new wrapper, initializing RocksDB at the specified directory. /// /// # Errors @@ -174,6 +193,83 @@ impl RocksDBWrapper { }) } + pub(crate) fn all_keys_for_version( + &self, + version: u64, + ) -> Result { + let Some(Root::Filled { + node: root_node, .. + }) = self.root(version) + else { + return Ok(VersionKeys::default()); + }; + + let cf = MerkleTreeColumnFamily::Tree; + let version_prefix = version.to_be_bytes(); + let mut nodes = HashMap::from([(Nibbles::EMPTY, root_node)]); + let mut unreachable_keys = HashSet::new(); + + for (raw_key, raw_value) in self.db.prefix_iterator_cf(cf, &version_prefix) { + let key = NodeKey::from_db_key(&raw_key); + let Some((parent_nibbles, nibble)) = key.nibbles.split_last() else { + // Root node, already processed + continue; + }; + let Some(Node::Internal(parent)) = nodes.get(&parent_nibbles) else { + unreachable_keys.insert(key.nibbles); + continue; + }; + let Some(this_ref) = parent.child_ref(nibble) else { + unreachable_keys.insert(key.nibbles); + continue; + }; + if this_ref.version != version { + unreachable_keys.insert(key.nibbles); + continue; + } + + // Now we are sure that `this_ref` actually points to the node we're processing. + let node = Self::deserialize_node(&raw_value, &key, this_ref.is_leaf)?; + nodes.insert(key.nibbles, node); + } + + Ok(VersionKeys { + valid_keys: nodes.into_keys().collect(), + unreachable_keys, + }) + } + + pub(crate) fn repair_stale_keys( + &mut self, + data: &StaleKeysRepairData, + removed_keys: &[StaleNodeKey], + ) -> anyhow::Result<()> { + let mut raw_value = vec![]; + data.serialize(&mut raw_value); + + let mut write_batch = self.db.new_write_batch(); + write_batch.put_cf( + MerkleTreeColumnFamily::Tree, + Self::STALE_KEYS_REPAIR_KEY, + &raw_value, + ); + for key in removed_keys { + write_batch.delete_cf(MerkleTreeColumnFamily::StaleKeys, &key.to_db_key()); + } + self.db + .write(write_batch) + .context("Failed writing a batch to RocksDB") + } + + pub(crate) fn stale_keys_repair_data( + &self, + ) -> Result, DeserializeError> { + let Some(raw_value) = self.raw_node(Self::STALE_KEYS_REPAIR_KEY) else { + return Ok(None); + }; + StaleKeysRepairData::deserialize(&raw_value).map(Some) + } + /// Returns the wrapped RocksDB instance. pub fn into_inner(self) -> RocksDB { self.db diff --git a/core/lib/merkle_tree/src/storage/serialization.rs b/core/lib/merkle_tree/src/storage/serialization.rs index d0c573fd8170..700a4cd5020b 100644 --- a/core/lib/merkle_tree/src/storage/serialization.rs +++ b/core/lib/merkle_tree/src/storage/serialization.rs @@ -4,6 +4,7 @@ use std::{collections::HashMap, str}; use crate::{ errors::{DeserializeError, DeserializeErrorKind, ErrorContext}, + repair::StaleKeysRepairData, types::{ ChildRef, InternalNode, Key, LeafNode, Manifest, Node, RawNode, Root, TreeTags, ValueHash, HASH_SIZE, KEY_SIZE, @@ -355,6 +356,18 @@ impl Manifest { } } +impl StaleKeysRepairData { + pub(super) fn deserialize(mut bytes: &[u8]) -> Result { + let next_version = + leb128::read::unsigned(&mut bytes).map_err(DeserializeErrorKind::Leb128)?; + Ok(Self { next_version }) + } + + pub(super) fn serialize(&self, buffer: &mut Vec) { + leb128::write::unsigned(buffer, self.next_version).unwrap(); + } +} + #[cfg(test)] mod tests { use zksync_types::H256; diff --git a/core/lib/merkle_tree/src/utils.rs b/core/lib/merkle_tree/src/utils.rs index 4771a940f2c8..a3c025a8b7bd 100644 --- a/core/lib/merkle_tree/src/utils.rs +++ b/core/lib/merkle_tree/src/utils.rs @@ -165,6 +165,49 @@ impl Iterator for MergingIter { impl ExactSizeIterator for MergingIter {} +#[cfg(test)] +pub(crate) mod testonly { + use crate::{Key, MerkleTree, PruneDatabase, TreeEntry, ValueHash}; + + pub(crate) fn setup_tree_with_stale_keys(db: impl PruneDatabase, incorrect_truncation: bool) { + let mut tree = MerkleTree::new(db).unwrap(); + let kvs: Vec<_> = (0_u64..100) + .map(|i| TreeEntry::new(Key::from(i), i + 1, ValueHash::zero())) + .collect(); + tree.extend(kvs).unwrap(); + + let overridden_kvs = vec![TreeEntry::new( + Key::from(0), + 1, + ValueHash::repeat_byte(0xaa), + )]; + tree.extend(overridden_kvs).unwrap(); + + let stale_keys = tree.db.stale_keys(1); + assert!( + stale_keys.iter().any(|key| !key.is_empty()), + "{stale_keys:?}" + ); + + // Revert `overridden_kvs`. + if incorrect_truncation { + tree.truncate_recent_versions_incorrectly(1).unwrap(); + } else { + tree.truncate_recent_versions(1).unwrap(); + } + assert_eq!(tree.latest_version(), Some(0)); + let future_stale_keys = tree.db.stale_keys(1); + assert_eq!(future_stale_keys.is_empty(), !incorrect_truncation); + + // Add a new version without the key. To make the matter more egregious, the inserted key + // differs from all existing keys, starting from the first nibble. + let new_key = Key::from_big_endian(&[0xaa; 32]); + let new_kvs = vec![TreeEntry::new(new_key, 101, ValueHash::repeat_byte(0xaa))]; + tree.extend(new_kvs).unwrap(); + assert_eq!(tree.latest_version(), Some(1)); + } +} + #[cfg(test)] mod tests { use zksync_types::U256; diff --git a/core/lib/protobuf_config/src/experimental.rs b/core/lib/protobuf_config/src/experimental.rs index 750dc7b04f01..8dfbf413d5a1 100644 --- a/core/lib/protobuf_config/src/experimental.rs +++ b/core/lib/protobuf_config/src/experimental.rs @@ -30,13 +30,12 @@ impl ProtoRepr for proto::Db { .map(|count| NonZeroU32::new(count).context("cannot be 0")) .transpose() .context("state_keeper_db_max_open_files")?, - protective_reads_persistence_enabled: self - .reads_persistence_enabled - .unwrap_or_default(), + protective_reads_persistence_enabled: self.reads_persistence_enabled.unwrap_or(false), processing_delay_ms: self.processing_delay_ms.unwrap_or_default(), include_indices_and_filters_in_block_cache: self .include_indices_and_filters_in_block_cache - .unwrap_or_default(), + .unwrap_or(false), + merkle_tree_repair_stale_keys: self.merkle_tree_repair_stale_keys.unwrap_or(false), }) } @@ -55,6 +54,7 @@ impl ProtoRepr for proto::Db { include_indices_and_filters_in_block_cache: Some( this.include_indices_and_filters_in_block_cache, ), + merkle_tree_repair_stale_keys: Some(this.merkle_tree_repair_stale_keys), } } } diff --git a/core/lib/protobuf_config/src/proto/config/experimental.proto b/core/lib/protobuf_config/src/proto/config/experimental.proto index 87af8d3835c6..22de076ece27 100644 --- a/core/lib/protobuf_config/src/proto/config/experimental.proto +++ b/core/lib/protobuf_config/src/proto/config/experimental.proto @@ -10,7 +10,8 @@ message DB { optional uint32 state_keeper_db_max_open_files = 2; // optional optional bool reads_persistence_enabled = 3; optional uint64 processing_delay_ms = 4; - optional bool include_indices_and_filters_in_block_cache = 5; + optional bool include_indices_and_filters_in_block_cache = 5; // optional; defaults to false + optional bool merkle_tree_repair_stale_keys = 6; // optional; defaults to false } // Experimental part of the Snapshot recovery configuration. diff --git a/core/node/metadata_calculator/src/api_server/metrics.rs b/core/node/metadata_calculator/src/api_server/metrics.rs index 92f948e09702..a2444e639943 100644 --- a/core/node/metadata_calculator/src/api_server/metrics.rs +++ b/core/node/metadata_calculator/src/api_server/metrics.rs @@ -11,6 +11,7 @@ pub(super) enum MerkleTreeApiMethod { GetProofs, GetNodes, GetStaleKeys, + GetBogusStaleKeys, } /// Metrics for Merkle tree API. diff --git a/core/node/metadata_calculator/src/api_server/mod.rs b/core/node/metadata_calculator/src/api_server/mod.rs index ced29310408e..51f25c99ddef 100644 --- a/core/node/metadata_calculator/src/api_server/mod.rs +++ b/core/node/metadata_calculator/src/api_server/mod.rs @@ -486,6 +486,17 @@ impl AsyncTreeReader { Json(StaleKeysResponse { stale_keys }) } + async fn bogus_stale_keys_handler( + State(this): State, + Json(request): Json, + ) -> Json { + let latency = API_METRICS.latency[&MerkleTreeApiMethod::GetBogusStaleKeys].start(); + let stale_keys = this.clone().bogus_stale_keys(request.l1_batch_number).await; + let stale_keys = stale_keys.into_iter().map(HexNodeKey).collect(); + latency.observe(); + Json(StaleKeysResponse { stale_keys }) + } + async fn create_api_server( self, bind_address: &SocketAddr, @@ -501,6 +512,10 @@ impl AsyncTreeReader { "/debug/stale-keys", routing::post(Self::get_stale_keys_handler), ) + .route( + "/debug/stale-keys/bogus", + routing::post(Self::bogus_stale_keys_handler), + ) .with_state(self); let listener = tokio::net::TcpListener::bind(bind_address) diff --git a/core/node/metadata_calculator/src/api_server/tests.rs b/core/node/metadata_calculator/src/api_server/tests.rs index 815522a4cd8e..9bb994cb4163 100644 --- a/core/node/metadata_calculator/src/api_server/tests.rs +++ b/core/node/metadata_calculator/src/api_server/tests.rs @@ -96,6 +96,21 @@ async fn merkle_tree_api() { let raw_stale_keys_response: serde_json::Value = raw_stale_keys_response.json().await.unwrap(); assert_raw_stale_keys_response(&raw_stale_keys_response); + let raw_stale_keys_response = api_client + .inner + .post(format!("http://{local_addr}/debug/stale-keys/bogus")) + .json(&serde_json::json!({ "l1_batch_number": 1 })) + .send() + .await + .unwrap() + .error_for_status() + .unwrap(); + let raw_stale_keys_response: serde_json::Value = raw_stale_keys_response.json().await.unwrap(); + assert_eq!( + raw_stale_keys_response, + serde_json::json!({ "stale_keys": [] }) + ); + // Stop the calculator and the tree API server. stop_sender.send_replace(true); api_server_task.await.unwrap().unwrap(); diff --git a/core/node/metadata_calculator/src/helpers.rs b/core/node/metadata_calculator/src/helpers.rs index 3f370afaf77e..b8d02067f8ea 100644 --- a/core/node/metadata_calculator/src/helpers.rs +++ b/core/node/metadata_calculator/src/helpers.rs @@ -22,6 +22,7 @@ use zksync_health_check::{CheckHealth, Health, HealthStatus, ReactiveHealthCheck use zksync_merkle_tree::{ domain::{TreeMetadata, ZkSyncTree, ZkSyncTreeReader}, recovery::{MerkleTreeRecovery, PersistenceThreadHandle}, + repair::StaleKeysRepairTask, unstable::{NodeKey, RawNode}, Database, Key, MerkleTreeColumnFamily, NoVersionError, RocksDBWrapper, TreeEntry, TreeEntryWithProof, TreeInstruction, @@ -420,6 +421,19 @@ impl AsyncTreeReader { .await .unwrap() } + + pub(crate) async fn bogus_stale_keys(self, l1_batch_number: L1BatchNumber) -> Vec { + let version = l1_batch_number.0.into(); + tokio::task::spawn_blocking(move || { + StaleKeysRepairTask::bogus_stale_keys(self.inner.db(), version) + }) + .await + .unwrap() + } + + pub(crate) fn into_db(self) -> RocksDBWrapper { + self.inner.into_db() + } } /// Version of async tree reader that holds a weak reference to RocksDB. Used in [`MerkleTreeHealthCheck`]. diff --git a/core/node/metadata_calculator/src/lib.rs b/core/node/metadata_calculator/src/lib.rs index 5c64330a0e7d..dddb53b4c52f 100644 --- a/core/node/metadata_calculator/src/lib.rs +++ b/core/node/metadata_calculator/src/lib.rs @@ -26,6 +26,7 @@ use self::{ pub use self::{ helpers::{AsyncTreeReader, LazyAsyncTreeReader, MerkleTreeInfo}, pruning::MerkleTreePruningTask, + repair::StaleKeysRepairTask, }; use crate::helpers::create_readonly_db; @@ -34,6 +35,7 @@ mod helpers; mod metrics; mod pruning; mod recovery; +mod repair; #[cfg(test)] pub(crate) mod tests; mod updater; @@ -203,6 +205,11 @@ impl MetadataCalculator { MerkleTreePruningTask::new(pruning_handles, self.pool.clone(), poll_interval) } + /// This method should be called once. + pub fn stale_keys_repair_task(&self) -> StaleKeysRepairTask { + StaleKeysRepairTask::new(self.tree_reader()) + } + async fn create_tree(&self) -> anyhow::Result { self.health_updater .update(MerkleTreeHealth::Initialization.into()); diff --git a/core/node/metadata_calculator/src/repair.rs b/core/node/metadata_calculator/src/repair.rs new file mode 100644 index 000000000000..9dfec4348ed6 --- /dev/null +++ b/core/node/metadata_calculator/src/repair.rs @@ -0,0 +1,258 @@ +//! High-level wrapper for the stale keys repair task. + +use std::{ + sync::{Arc, Weak}, + time::Duration, +}; + +use anyhow::Context as _; +use async_trait::async_trait; +use once_cell::sync::OnceCell; +use serde::Serialize; +use tokio::sync::watch; +use zksync_health_check::{CheckHealth, Health, HealthStatus}; +use zksync_merkle_tree::repair; + +use crate::LazyAsyncTreeReader; + +#[derive(Debug, Serialize)] +struct RepairHealthDetails { + #[serde(skip_serializing_if = "Option::is_none")] + earliest_checked_version: Option, + #[serde(skip_serializing_if = "Option::is_none")] + latest_checked_version: Option, + repaired_key_count: usize, +} + +impl From for RepairHealthDetails { + fn from(stats: repair::StaleKeysRepairStats) -> Self { + let versions = stats.checked_versions.as_ref(); + Self { + earliest_checked_version: versions.map(|versions| *versions.start()), + latest_checked_version: versions.map(|versions| *versions.end()), + repaired_key_count: stats.repaired_key_count, + } + } +} + +#[derive(Debug, Default)] +struct RepairHealthCheck { + handle: OnceCell>, +} + +#[async_trait] +impl CheckHealth for RepairHealthCheck { + fn name(&self) -> &'static str { + "tree_stale_keys_repair" + } + + async fn check_health(&self) -> Health { + let Some(weak_handle) = self.handle.get() else { + return HealthStatus::Affected.into(); + }; + let Some(handle) = weak_handle.upgrade() else { + return HealthStatus::ShutDown.into(); + }; + Health::from(HealthStatus::Ready).with_details(RepairHealthDetails::from(handle.stats())) + } +} + +/// Stale keys repair task. +#[derive(Debug)] +#[must_use = "Task should `run()` in a managed Tokio task"] +pub struct StaleKeysRepairTask { + tree_reader: LazyAsyncTreeReader, + health_check: Arc, + poll_interval: Duration, +} + +impl StaleKeysRepairTask { + pub(super) fn new(tree_reader: LazyAsyncTreeReader) -> Self { + Self { + tree_reader, + health_check: Arc::default(), + poll_interval: Duration::from_secs(60), + } + } + + pub fn health_check(&self) -> Arc { + self.health_check.clone() + } + + /// Runs this task indefinitely. + #[tracing::instrument(skip_all)] + pub async fn run(self, mut stop_receiver: watch::Receiver) -> anyhow::Result<()> { + let db = tokio::select! { + res = self.tree_reader.wait() => { + match res { + Some(reader) => reader.into_db(), + None => { + tracing::info!("Merkle tree dropped; shutting down stale keys repair"); + return Ok(()); + } + } + } + _ = stop_receiver.changed() => { + tracing::info!("Stop signal received before Merkle tree is initialized; shutting down stale keys repair"); + return Ok(()); + } + }; + + let (mut task, handle) = repair::StaleKeysRepairTask::new(db); + task.set_poll_interval(self.poll_interval); + let handle = Arc::new(handle); + self.health_check + .handle + .set(Arc::downgrade(&handle)) + .map_err(|_| anyhow::anyhow!("failed setting health check handle"))?; + + let mut task = tokio::task::spawn_blocking(|| task.run()); + tokio::select! { + res = &mut task => { + tracing::error!("Stale keys repair spontaneously stopped"); + res.context("repair task panicked")? + }, + _ = stop_receiver.changed() => { + tracing::info!("Stop signal received, stale keys repair is shutting down"); + // This is the only strong reference to the handle, so dropping it should signal the task to stop. + drop(handle); + task.await.context("stale keys repair task panicked")? + } + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use tempfile::TempDir; + use zksync_dal::{ConnectionPool, Core}; + use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; + use zksync_types::L1BatchNumber; + + use super::*; + use crate::{ + tests::{extend_db_state, gen_storage_logs, mock_config, reset_db_state}, + MetadataCalculator, + }; + + const POLL_INTERVAL: Duration = Duration::from_millis(50); + + async fn wait_for_health( + check: &dyn CheckHealth, + mut condition: impl FnMut(&Health) -> bool, + ) -> Health { + loop { + let health = check.check_health().await; + if condition(&health) { + return health; + } else if matches!( + health.status(), + HealthStatus::ShutDown | HealthStatus::Panicked + ) { + panic!("reached terminal health: {health:?}"); + } + tokio::time::sleep(POLL_INTERVAL).await; + } + } + + #[tokio::test] + async fn repair_task_basics() { + let pool = ConnectionPool::::test_pool().await; + let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); + let config = mock_config(temp_dir.path()); + let mut storage = pool.connection().await.unwrap(); + insert_genesis_batch(&mut storage, &GenesisParams::mock()) + .await + .unwrap(); + reset_db_state(&pool, 5).await; + + let calculator = MetadataCalculator::new(config, None, pool.clone()) + .await + .unwrap(); + let reader = calculator.tree_reader(); + let mut repair_task = calculator.stale_keys_repair_task(); + repair_task.poll_interval = POLL_INTERVAL; + let health_check = repair_task.health_check(); + + let (stop_sender, stop_receiver) = watch::channel(false); + let calculator_handle = tokio::spawn(calculator.run(stop_receiver.clone())); + let repair_task_handle = tokio::spawn(repair_task.run(stop_receiver)); + wait_for_health(&health_check, |health| { + matches!(health.status(), HealthStatus::Ready) + }) + .await; + + // Wait until the calculator is initialized and then drop the reader so that it doesn't lock RocksDB. + { + let reader = reader.wait().await.unwrap(); + while reader.clone().info().await.next_l1_batch_number < L1BatchNumber(6) { + tokio::time::sleep(POLL_INTERVAL).await; + } + } + + // Wait until all tree versions have been checked. + let health = wait_for_health(&health_check, |health| { + if !matches!(health.status(), HealthStatus::Ready) { + return false; + } + let details = health.details().unwrap(); + details.get("latest_checked_version") == Some(&5.into()) + }) + .await; + let details = health.details().unwrap(); + assert_eq!(details["earliest_checked_version"], 1); + assert_eq!(details["repaired_key_count"], 0); + + stop_sender.send_replace(true); + calculator_handle.await.unwrap().unwrap(); + repair_task_handle.await.unwrap().unwrap(); + wait_for_health(&health_check, |health| { + matches!(health.status(), HealthStatus::ShutDown) + }) + .await; + + test_repair_persistence(temp_dir, pool).await; + } + + async fn test_repair_persistence(temp_dir: TempDir, pool: ConnectionPool) { + let config = mock_config(temp_dir.path()); + let calculator = MetadataCalculator::new(config, None, pool.clone()) + .await + .unwrap(); + let mut repair_task = calculator.stale_keys_repair_task(); + repair_task.poll_interval = POLL_INTERVAL; + let health_check = repair_task.health_check(); + + let (stop_sender, stop_receiver) = watch::channel(false); + let calculator_handle = tokio::spawn(calculator.run(stop_receiver.clone())); + let repair_task_handle = tokio::spawn(repair_task.run(stop_receiver)); + wait_for_health(&health_check, |health| { + matches!(health.status(), HealthStatus::Ready) + }) + .await; + + // Add more batches to the storage. + let mut storage = pool.connection().await.unwrap(); + let logs = gen_storage_logs(200..300, 5); + extend_db_state(&mut storage, logs).await; + + // Wait until new tree versions have been checked. + let health = wait_for_health(&health_check, |health| { + if !matches!(health.status(), HealthStatus::Ready) { + return false; + } + let details = health.details().unwrap(); + details.get("latest_checked_version") == Some(&10.into()) + }) + .await; + let details = health.details().unwrap(); + assert_eq!(details["earliest_checked_version"], 6); + assert_eq!(details["repaired_key_count"], 0); + + stop_sender.send_replace(true); + calculator_handle.await.unwrap().unwrap(); + repair_task_handle.await.unwrap().unwrap(); + } +} diff --git a/core/node/node_framework/src/implementations/layers/metadata_calculator.rs b/core/node/node_framework/src/implementations/layers/metadata_calculator.rs index 4092ee6dcd56..45aa320786ef 100644 --- a/core/node/node_framework/src/implementations/layers/metadata_calculator.rs +++ b/core/node/node_framework/src/implementations/layers/metadata_calculator.rs @@ -8,7 +8,7 @@ use anyhow::Context as _; use zksync_config::configs::{api::MerkleTreeApiConfig, database::MerkleTreeMode}; use zksync_metadata_calculator::{ LazyAsyncTreeReader, MerkleTreePruningTask, MerkleTreeReaderConfig, MetadataCalculator, - MetadataCalculatorConfig, TreeReaderTask, + MetadataCalculatorConfig, StaleKeysRepairTask, TreeReaderTask, }; use zksync_storage::RocksDB; @@ -31,6 +31,7 @@ pub struct MetadataCalculatorLayer { config: MetadataCalculatorConfig, tree_api_config: Option, pruning_config: Option, + stale_keys_repair_enabled: bool, } #[derive(Debug, FromContext)] @@ -56,6 +57,9 @@ pub struct Output { /// Only provided if configuration is provided. #[context(task)] pub pruning_task: Option, + /// Only provided if enabled in the config. + #[context(task)] + pub stale_keys_repair_task: Option, pub rocksdb_shutdown_hook: ShutdownHook, } @@ -65,6 +69,7 @@ impl MetadataCalculatorLayer { config, tree_api_config: None, pruning_config: None, + stale_keys_repair_enabled: false, } } @@ -77,6 +82,11 @@ impl MetadataCalculatorLayer { self.pruning_config = Some(pruning_config); self } + + pub fn with_stale_keys_repair(mut self) -> Self { + self.stale_keys_repair_enabled = true; + self + } } #[async_trait::async_trait] @@ -141,6 +151,12 @@ impl WiringLayer for MetadataCalculatorLayer { ) .transpose()?; + let stale_keys_repair_task = if self.stale_keys_repair_enabled { + Some(metadata_calculator.stale_keys_repair_task()) + } else { + None + }; + let tree_api_client = TreeApiClientResource(Arc::new(metadata_calculator.tree_reader())); let rocksdb_shutdown_hook = ShutdownHook::new("rocksdb_terminaton", async { @@ -155,6 +171,7 @@ impl WiringLayer for MetadataCalculatorLayer { tree_api_client, tree_api_task, pruning_task, + stale_keys_repair_task, rocksdb_shutdown_hook, }) } @@ -196,6 +213,17 @@ impl Task for TreeApiTask { } } +#[async_trait::async_trait] +impl Task for StaleKeysRepairTask { + fn id(&self) -> TaskId { + "merkle_tree_stale_keys_repair_task".into() + } + + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + (*self).run(stop_receiver.0).await + } +} + #[async_trait::async_trait] impl Task for MerkleTreePruningTask { fn id(&self) -> TaskId {