From 3345c178cdfd772f3f7c78e9d2f9d7d885e7255c Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 23 Oct 2023 20:18:58 +0300 Subject: [PATCH] Unify stalled writes timeout logic --- core/bin/external_node/src/config/mod.rs | 16 ++-- core/bin/external_node/src/main.rs | 2 +- core/lib/config/src/configs/database.rs | 26 +++--- core/lib/storage/src/db.rs | 86 ++++++++++++------- core/lib/storage/src/lib.rs | 2 +- .../src/metadata_calculator/helpers.rs | 13 +-- .../src/metadata_calculator/mod.rs | 7 +- .../src/metadata_calculator/updater.rs | 2 +- 8 files changed, 85 insertions(+), 69 deletions(-) diff --git a/core/bin/external_node/src/config/mod.rs b/core/bin/external_node/src/config/mod.rs index eb371fd37a7a..e424a6325563 100644 --- a/core/bin/external_node/src/config/mod.rs +++ b/core/bin/external_node/src/config/mod.rs @@ -194,10 +194,9 @@ pub struct OptionalENConfig { /// large value (order of 512 MiB) is helpful for large DBs that experience write stalls. #[serde(default = "OptionalENConfig::default_merkle_tree_memtable_capacity_mb")] merkle_tree_memtable_capacity_mb: usize, - /// Timeout to wait for the Merkle tree database to run compaction on startup so that it doesn't have - /// stopped writes. - #[serde(default = "OptionalENConfig::default_merkle_tree_init_stopped_writes_timeout_sec")] - merkle_tree_init_stopped_writes_timeout_sec: u64, + /// Timeout to wait for the Merkle tree database to run compaction on stalled writes. + #[serde(default = "OptionalENConfig::default_merkle_tree_stalled_writes_timeout_sec")] + merkle_tree_stalled_writes_timeout_sec: u64, // Other config settings /// Port on which the Prometheus exporter server is listening. @@ -286,7 +285,7 @@ impl OptionalENConfig { 256 } - const fn default_merkle_tree_init_stopped_writes_timeout_sec() -> u64 { + const fn default_merkle_tree_stalled_writes_timeout_sec() -> u64 { 30 } @@ -339,10 +338,9 @@ impl OptionalENConfig { self.merkle_tree_memtable_capacity_mb * BYTES_IN_MEGABYTE } - /// Returns the timeout to wait for the Merkle tree database to run compaction on startup so that - /// it doesn't have stopped writes. - pub fn merkle_tree_init_stopped_writes_timeout(&self) -> Duration { - Duration::from_secs(self.merkle_tree_init_stopped_writes_timeout_sec) + /// Returns the timeout to wait for the Merkle tree database to run compaction on stalled writes. + pub fn merkle_tree_stalled_writes_timeout(&self) -> Duration { + Duration::from_secs(self.merkle_tree_stalled_writes_timeout_sec) } pub fn api_namespaces(&self) -> Vec { diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 3b70e8bb02d4..45efe1afd980 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -146,7 +146,7 @@ async fn init_tasks( multi_get_chunk_size: config.optional.merkle_tree_multi_get_chunk_size, block_cache_capacity: config.optional.merkle_tree_block_cache_size(), memtable_capacity: config.optional.merkle_tree_memtable_capacity(), - init_stopped_writes_timeout: config.optional.merkle_tree_init_stopped_writes_timeout(), + stalled_writes_timeout: config.optional.merkle_tree_stalled_writes_timeout(), }) .await; healthchecks.push(Box::new(metadata_calculator.tree_health_check())); diff --git a/core/lib/config/src/configs/database.rs b/core/lib/config/src/configs/database.rs index e91637d36c7f..465a493172a0 100644 --- a/core/lib/config/src/configs/database.rs +++ b/core/lib/config/src/configs/database.rs @@ -42,10 +42,9 @@ pub struct MerkleTreeConfig { /// large value (order of 512 MiB) is helpful for large DBs that experience write stalls. #[serde(default = "MerkleTreeConfig::default_memtable_capacity_mb")] pub memtable_capacity_mb: usize, - /// Timeout to wait for the Merkle tree database to run compaction on startup so that it doesn't have - /// stopped writes. - #[serde(default = "MerkleTreeConfig::default_init_stopped_writes_timeout_sec")] - pub init_stopped_writes_timeout_sec: u64, + /// Timeout to wait for the Merkle tree database to run compaction on stalled writes. + #[serde(default = "MerkleTreeConfig::default_stalled_writes_timeout_sec")] + pub stalled_writes_timeout_sec: u64, /// Maximum number of L1 batches to be processed by the Merkle tree at a time. #[serde(default = "MerkleTreeConfig::default_max_l1_batches_per_iter")] pub max_l1_batches_per_iter: usize, @@ -60,7 +59,7 @@ impl Default for MerkleTreeConfig { multi_get_chunk_size: Self::default_multi_get_chunk_size(), block_cache_size_mb: Self::default_block_cache_size_mb(), memtable_capacity_mb: Self::default_memtable_capacity_mb(), - init_stopped_writes_timeout_sec: Self::default_init_stopped_writes_timeout_sec(), + stalled_writes_timeout_sec: Self::default_stalled_writes_timeout_sec(), max_l1_batches_per_iter: Self::default_max_l1_batches_per_iter(), } } @@ -87,7 +86,7 @@ impl MerkleTreeConfig { 256 } - const fn default_init_stopped_writes_timeout_sec() -> u64 { + const fn default_stalled_writes_timeout_sec() -> u64 { 30 } @@ -105,10 +104,9 @@ impl MerkleTreeConfig { self.memtable_capacity_mb * super::BYTES_IN_MEGABYTE } - /// Returns the timeout to wait for the Merkle tree database to run compaction on startup so that - /// it doesn't have stopped writes. - pub fn init_stopped_writes_timeout(&self) -> Duration { - Duration::from_secs(self.init_stopped_writes_timeout_sec) + /// Returns the timeout to wait for the Merkle tree database to run compaction on stalled writes. + pub fn stalled_writes_timeout(&self) -> Duration { + Duration::from_secs(self.stalled_writes_timeout_sec) } } @@ -181,7 +179,7 @@ mod tests { DATABASE_MERKLE_TREE_MODE=lightweight DATABASE_MERKLE_TREE_MULTI_GET_CHUNK_SIZE=250 DATABASE_MERKLE_TREE_MEMTABLE_CAPACITY_MB=512 - DATABASE_MERKLE_TREE_INIT_STOPPED_WRITES_TIMEOUT_SEC=60 + DATABASE_MERKLE_TREE_STALLED_WRITES_TIMEOUT_SEC=60 DATABASE_MERKLE_TREE_MAX_L1_BATCHES_PER_ITER=50 DATABASE_BACKUP_COUNT=5 DATABASE_BACKUP_INTERVAL_MS=60000 @@ -196,7 +194,7 @@ mod tests { assert_eq!(db_config.merkle_tree.multi_get_chunk_size, 250); assert_eq!(db_config.merkle_tree.max_l1_batches_per_iter, 50); assert_eq!(db_config.merkle_tree.memtable_capacity_mb, 512); - assert_eq!(db_config.merkle_tree.init_stopped_writes_timeout_sec, 60); + assert_eq!(db_config.merkle_tree.stalled_writes_timeout_sec, 60); assert_eq!(db_config.backup_count, 5); assert_eq!(db_config.backup_interval().as_secs(), 60); } @@ -212,7 +210,7 @@ mod tests { "DATABASE_MERKLE_TREE_MULTI_GET_CHUNK_SIZE", "DATABASE_MERKLE_TREE_BLOCK_CACHE_SIZE_MB", "DATABASE_MERKLE_TREE_MEMTABLE_CAPACITY_MB", - "DATABASE_MERKLE_TREE_INIT_STOPPED_WRITES_TIMEOUT_SEC", + "DATABASE_MERKLE_TREE_STALLED_WRITES_TIMEOUT_SEC", "DATABASE_MERKLE_TREE_MAX_L1_BATCHES_PER_ITER", "DATABASE_BACKUP_COUNT", "DATABASE_BACKUP_INTERVAL_MS", @@ -227,7 +225,7 @@ mod tests { assert_eq!(db_config.merkle_tree.max_l1_batches_per_iter, 20); assert_eq!(db_config.merkle_tree.block_cache_size_mb, 128); assert_eq!(db_config.merkle_tree.memtable_capacity_mb, 256); - assert_eq!(db_config.merkle_tree.init_stopped_writes_timeout_sec, 30); + assert_eq!(db_config.merkle_tree.stalled_writes_timeout_sec, 30); assert_eq!(db_config.backup_count, 5); assert_eq!(db_config.backup_interval().as_secs(), 60); diff --git a/core/lib/storage/src/db.rs b/core/lib/storage/src/db.rs index 3e8797fb8ee7..be8a1665bbec 100644 --- a/core/lib/storage/src/db.rs +++ b/core/lib/storage/src/db.rs @@ -6,7 +6,7 @@ use rocksdb::{ use std::{ collections::{HashMap, HashSet}, ffi::CStr, - fmt, + fmt, iter, marker::PhantomData, ops, path::Path, @@ -176,12 +176,8 @@ impl RocksDBInner { /// Waits until writes are not stopped for any of the CFs. Writes can stop immediately on DB initialization /// if there are too many level-0 SST files; in this case, it may help waiting several seconds until /// these files are compacted. - fn wait_for_writes_to_resume(&self, timeout: Duration) { - const RETRY_INTERVAL: Duration = Duration::from_secs(1); - - let started_at = Instant::now(); - let mut retry = 0; - while started_at.elapsed() < timeout { + fn wait_for_writes_to_resume(&self, retries: &StalledWritesRetries) { + for (retry_idx, retry_interval) in retries.intervals().enumerate() { let cfs_with_stopped_writes = self.cf_names.iter().copied().filter(|cf_name| { let cf = self.db.cf_handle(cf_name).unwrap(); // ^ `unwrap()` is safe (CF existence is checked during DB initialization) @@ -193,11 +189,10 @@ impl RocksDBInner { } else { tracing::info!( "Writes are stopped for column families {cfs_with_stopped_writes:?} in DB `{}` \ - (retry: {retry})", + (retry #{retry_idx})", self.db_name ); - thread::sleep(RETRY_INTERVAL); - retry += 1; + thread::sleep(retry_interval); } } @@ -208,20 +203,22 @@ impl RocksDBInner { } } +/// Configuration for retries when RocksDB writes are stalled. #[derive(Debug, Clone, Copy)] -struct StalledWritesRetries { +pub struct StalledWritesRetries { max_batch_size: usize, - retry_count: usize, + timeout: Duration, start_interval: Duration, max_interval: Duration, scale_factor: f64, } -impl Default for StalledWritesRetries { - fn default() -> Self { +impl StalledWritesRetries { + /// Creates retries configuration with the specified timeout. + pub fn new(timeout: Duration) -> Self { Self { max_batch_size: 128 << 20, // 128 MiB - retry_count: 20, + timeout, start_interval: Duration::from_millis(50), max_interval: Duration::from_secs(2), scale_factor: 1.5, @@ -230,10 +227,20 @@ impl Default for StalledWritesRetries { } impl StalledWritesRetries { - fn interval(&self, retry_index: usize) -> Duration { - self.start_interval - .mul_f64(self.scale_factor.powi(retry_index as i32)) - .min(self.max_interval) + fn intervals(&self) -> impl Iterator { + let &Self { + timeout, + start_interval, + max_interval, + scale_factor, + .. + } = self; + let started_at = Instant::now(); + + iter::successors(Some(start_interval), move |&prev_interval| { + Some(prev_interval.mul_f64(scale_factor).min(max_interval)) + }) + .take_while(move |_| started_at.elapsed() <= timeout) } // **NB.** The error message may change between RocksDB versions! @@ -254,8 +261,9 @@ pub struct RocksDBOptions { /// Setting this to a reasonably large value (order of 512 MiB) is helpful for large DBs that experience /// write stalls. If not set, large CFs will not be configured specially. pub large_memtable_capacity: Option, - /// Timeout to wait for the database to run compaction on startup so that it doesn't have stopped writes. - pub init_stopped_writes_timeout: Duration, + /// Timeout to wait for the database to run compaction on stalled writes during startup or + /// when the corresponding RocksDB error is encountered. + pub stalled_writes_retries: StalledWritesRetries, } impl Default for RocksDBOptions { @@ -263,7 +271,7 @@ impl Default for RocksDBOptions { Self { block_cache_capacity: None, large_memtable_capacity: None, - init_stopped_writes_timeout: Duration::from_secs(10), + stalled_writes_retries: StalledWritesRetries::new(Duration::from_secs(10)), } } } @@ -352,11 +360,11 @@ impl RocksDB { path.display() ); - inner.wait_for_writes_to_resume(options.init_stopped_writes_timeout); + inner.wait_for_writes_to_resume(&options.stalled_writes_retries); Self { inner, sync_writes: false, - stalled_writes_retries: StalledWritesRetries::default(), + stalled_writes_retries: options.stalled_writes_retries, _cf: PhantomData, } } @@ -439,7 +447,7 @@ impl RocksDB { } let raw_batch_bytes = raw_batch.data().to_vec(); - let mut retry_count = 0; + let mut retries = self.stalled_writes_retries.intervals(); loop { match self.write_inner(raw_batch) { Ok(()) => return Ok(()), @@ -447,16 +455,16 @@ impl RocksDB { let is_stalled_write = StalledWritesRetries::is_write_stall_error(&err); if is_stalled_write { METRICS.report_stalled_write(CF::DB_NAME); + } else { + return Err(err); } - if is_stalled_write && retry_count < retries.retry_count { - let retry_interval = retries.interval(retry_count); + if let Some(retry_interval) = retries.next() { tracing::warn!( "Writes stalled when writing to DB `{}`; will retry after {retry_interval:?}", CF::DB_NAME ); thread::sleep(retry_interval); - retry_count += 1; raw_batch = rocksdb::WriteBatch::from_data(&raw_batch_bytes); } else { return Err(err); @@ -581,11 +589,23 @@ mod tests { #[test] fn retry_interval_computation() { - let retries = StalledWritesRetries::default(); - assert_close(retries.interval(0), Duration::from_millis(50)); - assert_close(retries.interval(1), Duration::from_millis(75)); - assert_close(retries.interval(2), Duration::from_micros(112_500)); - assert_close(retries.interval(20), retries.max_interval); + let retries = StalledWritesRetries::new(Duration::from_secs(10)); + let intervals: Vec<_> = retries.intervals().take(20).collect(); + assert_close(intervals[0], Duration::from_millis(50)); + assert_close(intervals[1], Duration::from_millis(75)); + assert_close(intervals[2], Duration::from_micros(112_500)); + assert_close(intervals[19], retries.max_interval); + } + + #[test] + fn retries_iterator_is_finite() { + let retries = StalledWritesRetries::new(Duration::from_millis(10)); + let mut retry_count = 0; + for _ in retries.intervals() { + thread::sleep(Duration::from_millis(5)); + retry_count += 1; + } + assert!(retry_count <= 2); } fn assert_close(lhs: Duration, rhs: Duration) { diff --git a/core/lib/storage/src/lib.rs b/core/lib/storage/src/lib.rs index 6ea928309ff4..729a547981d0 100644 --- a/core/lib/storage/src/lib.rs +++ b/core/lib/storage/src/lib.rs @@ -1,5 +1,5 @@ pub mod db; mod metrics; -pub use db::{RocksDB, RocksDBOptions}; +pub use db::{RocksDB, RocksDBOptions, StalledWritesRetries}; pub use rocksdb; diff --git a/core/lib/zksync_core/src/metadata_calculator/helpers.rs b/core/lib/zksync_core/src/metadata_calculator/helpers.rs index dc091f72a820..34e187d85977 100644 --- a/core/lib/zksync_core/src/metadata_calculator/helpers.rs +++ b/core/lib/zksync_core/src/metadata_calculator/helpers.rs @@ -18,7 +18,7 @@ use zksync_merkle_tree::{ domain::{TreeMetadata, ZkSyncTree, ZkSyncTreeReader}, Key, MerkleTreeColumnFamily, NoVersionError, TreeEntryWithProof, }; -use zksync_storage::{RocksDB, RocksDBOptions}; +use zksync_storage::{RocksDB, RocksDBOptions, StalledWritesRetries}; use zksync_types::{block::L1BatchHeader, L1BatchNumber, StorageLog, H256}; use super::metrics::{LoadChangesStage, TreeUpdateStage, METRICS}; @@ -62,11 +62,12 @@ impl AsyncTree { multi_get_chunk_size: usize, block_cache_capacity: usize, memtable_capacity: usize, - init_stopped_writes_timeout: Duration, + stalled_writes_timeout: Duration, ) -> Self { tracing::info!( "Initializing Merkle tree at `{db_path}` with {multi_get_chunk_size} multi-get chunk size, \ - {block_cache_capacity}B block cache, {memtable_capacity}B memtable capacity", + {block_cache_capacity}B block cache, {memtable_capacity}B memtable capacity, \ + {stalled_writes_timeout:?} stalled writes timeout", db_path = db_path.display() ); @@ -75,7 +76,7 @@ impl AsyncTree { &db_path, block_cache_capacity, memtable_capacity, - init_stopped_writes_timeout, + stalled_writes_timeout, ); match mode { MerkleTreeMode::Full => ZkSyncTree::new(db), @@ -96,14 +97,14 @@ impl AsyncTree { path: &Path, block_cache_capacity: usize, memtable_capacity: usize, - init_stopped_writes_timeout: Duration, + stalled_writes_timeout: Duration, ) -> RocksDB { let db = RocksDB::with_options( path, RocksDBOptions { block_cache_capacity: Some(block_cache_capacity), large_memtable_capacity: Some(memtable_capacity), - init_stopped_writes_timeout, + stalled_writes_retries: StalledWritesRetries::new(stalled_writes_timeout), }, ); if cfg!(test) { diff --git a/core/lib/zksync_core/src/metadata_calculator/mod.rs b/core/lib/zksync_core/src/metadata_calculator/mod.rs index 6acb95606a19..05fdc5c49ed8 100644 --- a/core/lib/zksync_core/src/metadata_calculator/mod.rs +++ b/core/lib/zksync_core/src/metadata_calculator/mod.rs @@ -75,9 +75,8 @@ pub struct MetadataCalculatorConfig<'a> { /// Capacity of RocksDB memtables. Can be set to a reasonably large value (order of 512 MiB) /// to mitigate write stalls. pub memtable_capacity: usize, - /// Timeout to wait for the Merkle tree database to run compaction on startup so that it doesn't have - /// stopped writes. - pub init_stopped_writes_timeout: Duration, + /// Timeout to wait for the Merkle tree database to run compaction on stalled writes. + pub stalled_writes_timeout: Duration, } impl<'a> MetadataCalculatorConfig<'a> { @@ -94,7 +93,7 @@ impl<'a> MetadataCalculatorConfig<'a> { multi_get_chunk_size: db_config.merkle_tree.multi_get_chunk_size, block_cache_capacity: db_config.merkle_tree.block_cache_size(), memtable_capacity: db_config.merkle_tree.memtable_capacity(), - init_stopped_writes_timeout: db_config.merkle_tree.init_stopped_writes_timeout(), + stalled_writes_timeout: db_config.merkle_tree.stalled_writes_timeout(), } } } diff --git a/core/lib/zksync_core/src/metadata_calculator/updater.rs b/core/lib/zksync_core/src/metadata_calculator/updater.rs index 6eeb8359ac95..f895bf7b81b4 100644 --- a/core/lib/zksync_core/src/metadata_calculator/updater.rs +++ b/core/lib/zksync_core/src/metadata_calculator/updater.rs @@ -45,7 +45,7 @@ impl TreeUpdater { config.multi_get_chunk_size, config.block_cache_capacity, config.memtable_capacity, - config.init_stopped_writes_timeout, + config.stalled_writes_timeout, ) .await; Self {