diff --git a/core/bin/external_node/src/config/mod.rs b/core/bin/external_node/src/config/mod.rs index 9fcaf037055c..65ce8d073cef 100644 --- a/core/bin/external_node/src/config/mod.rs +++ b/core/bin/external_node/src/config/mod.rs @@ -191,6 +191,11 @@ pub struct OptionalENConfig { #[serde(default = "OptionalENConfig::default_merkle_tree_block_cache_size_mb")] merkle_tree_block_cache_size_mb: usize, + /// Byte capacity of memtables (recent, non-persisted changes to RocksDB). Setting this to a reasonably + /// large value (order of 512 MiB) is helpful for large DBs that experience write stalls. + #[serde(default = "OptionalENConfig::default_merkle_tree_memtable_capacity_mb")] + merkle_tree_memtable_capacity_mb: usize, + // Other config settings /// Port on which the Prometheus exporter server is listening. pub prometheus_port: Option, @@ -274,6 +279,10 @@ impl OptionalENConfig { 128 } + const fn default_merkle_tree_memtable_capacity_mb() -> usize { + 256 + } + const fn default_fee_history_limit() -> u64 { 1_024 } @@ -318,6 +327,11 @@ impl OptionalENConfig { self.merkle_tree_block_cache_size_mb * BYTES_IN_MEGABYTE } + /// Returns the memtable capacity for Merkle tree in bytes. + pub fn merkle_tree_memtable_capacity(&self) -> usize { + self.merkle_tree_memtable_capacity_mb * BYTES_IN_MEGABYTE + } + pub fn api_namespaces(&self) -> Vec { self.api_namespaces .clone() diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 75b84c537283..1fa6c3577d36 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -138,6 +138,7 @@ async fn init_tasks( max_l1_batches_per_iter: config.optional.max_l1_batches_per_tree_iter, multi_get_chunk_size: config.optional.merkle_tree_multi_get_chunk_size, block_cache_capacity: config.optional.merkle_tree_block_cache_size(), + memtable_capacity: config.optional.merkle_tree_memtable_capacity(), }) .await; healthchecks.push(Box::new(metadata_calculator.tree_health_check())); diff --git a/core/lib/config/src/configs/database.rs b/core/lib/config/src/configs/database.rs index de0a59545dc7..142d862b52f3 100644 --- a/core/lib/config/src/configs/database.rs +++ b/core/lib/config/src/configs/database.rs @@ -38,6 +38,10 @@ pub struct MerkleTreeConfig { /// The default value is 128 MB. #[serde(default = "MerkleTreeConfig::default_block_cache_size_mb")] pub block_cache_size_mb: usize, + /// Byte capacity of memtables (recent, non-persisted changes to RocksDB). Setting this to a reasonably + /// large value (order of 512 MiB) is helpful for large DBs that experience write stalls. + #[serde(default = "MerkleTreeConfig::default_memtable_capacity_mb")] + pub memtable_capacity_mb: usize, /// Maximum number of L1 batches to be processed by the Merkle tree at a time. #[serde(default = "MerkleTreeConfig::default_max_l1_batches_per_iter")] pub max_l1_batches_per_iter: usize, @@ -51,6 +55,7 @@ impl Default for MerkleTreeConfig { mode: MerkleTreeMode::default(), multi_get_chunk_size: Self::default_multi_get_chunk_size(), block_cache_size_mb: Self::default_block_cache_size_mb(), + memtable_capacity_mb: Self::default_memtable_capacity_mb(), max_l1_batches_per_iter: Self::default_max_l1_batches_per_iter(), } } @@ -73,6 +78,10 @@ impl MerkleTreeConfig { 128 } + const fn default_memtable_capacity_mb() -> usize { + 256 + } + const fn default_max_l1_batches_per_iter() -> usize { 20 } @@ -81,6 +90,11 @@ impl MerkleTreeConfig { pub fn block_cache_size(&self) -> usize { self.block_cache_size_mb * super::BYTES_IN_MEGABYTE } + + /// Returns the memtable capacity in bytes. + pub fn memtable_capacity(&self) -> usize { + self.memtable_capacity_mb * super::BYTES_IN_MEGABYTE + } } /// Database configuration. diff --git a/core/lib/merkle_tree/examples/loadtest/main.rs b/core/lib/merkle_tree/examples/loadtest/main.rs index 89e14754bdb7..b598a579f6b4 100644 --- a/core/lib/merkle_tree/examples/loadtest/main.rs +++ b/core/lib/merkle_tree/examples/loadtest/main.rs @@ -17,7 +17,7 @@ use zksync_crypto::hasher::blake2::Blake2Hasher; use zksync_merkle_tree::{ Database, HashTree, MerkleTree, MerkleTreePruner, PatchSet, RocksDBWrapper, TreeInstruction, }; -use zksync_storage::RocksDB; +use zksync_storage::{RocksDB, RocksDBOptions}; use zksync_types::{AccountTreeId, Address, StorageKey, H256, U256}; mod batch; @@ -90,12 +90,15 @@ impl Cli { "Created temp dir for RocksDB: {}", dir.path().to_string_lossy() ); - rocksdb = if let Some(block_cache_capacity) = self.block_cache { - let db = RocksDB::with_cache(dir.path(), Some(block_cache_capacity)); - RocksDBWrapper::from(db) - } else { - RocksDBWrapper::new(dir.path()) - }; + let db = RocksDB::with_options( + dir.path(), + RocksDBOptions { + block_cache_capacity: self.block_cache, + ..RocksDBOptions::default() + }, + ); + rocksdb = RocksDBWrapper::from(db); + if let Some(chunk_size) = self.chunk_size { rocksdb.set_multi_get_chunk_size(chunk_size); } diff --git a/core/lib/merkle_tree/examples/recovery.rs b/core/lib/merkle_tree/examples/recovery.rs index 257944046047..207499da8b41 100644 --- a/core/lib/merkle_tree/examples/recovery.rs +++ b/core/lib/merkle_tree/examples/recovery.rs @@ -12,7 +12,7 @@ use zksync_merkle_tree::{ recovery::{MerkleTreeRecovery, RecoveryEntry}, HashTree, Key, PatchSet, PruneDatabase, RocksDBWrapper, ValueHash, }; -use zksync_storage::RocksDB; +use zksync_storage::{RocksDB, RocksDBOptions}; /// CLI for load-testing Merkle tree recovery. #[derive(Debug, Parser)] @@ -60,12 +60,14 @@ impl Cli { "Created temp dir for RocksDB: {}", dir.path().to_string_lossy() ); - rocksdb = if let Some(block_cache_capacity) = self.block_cache { - let db = RocksDB::with_cache(dir.path(), Some(block_cache_capacity)); - RocksDBWrapper::from(db) - } else { - RocksDBWrapper::new(dir.path()) - }; + let db = RocksDB::with_options( + dir.path(), + RocksDBOptions { + block_cache_capacity: self.block_cache, + ..RocksDBOptions::default() + }, + ); + rocksdb = RocksDBWrapper::from(db); _temp_dir = Some(dir); &mut rocksdb }; diff --git a/core/lib/merkle_tree/src/storage/rocksdb.rs b/core/lib/merkle_tree/src/storage/rocksdb.rs index da2a7475eae0..6c6a3a18105e 100644 --- a/core/lib/merkle_tree/src/storage/rocksdb.rs +++ b/core/lib/merkle_tree/src/storage/rocksdb.rs @@ -35,6 +35,10 @@ impl NamedColumnFamily for MerkleTreeColumnFamily { Self::StaleKeys => "stale_keys", } } + + fn requires_tuning(&self) -> bool { + matches!(self, Self::Tree) + } } /// Main [`Database`] implementation wrapping a [`RocksDB`] reference. diff --git a/core/lib/storage/src/db.rs b/core/lib/storage/src/db.rs index c6b75c6a25ed..b2a3636f62f5 100644 --- a/core/lib/storage/src/db.rs +++ b/core/lib/storage/src/db.rs @@ -4,7 +4,7 @@ use rocksdb::{ }; use std::{ - collections::HashSet, + collections::{HashMap, HashSet}, ffi::CStr, fmt, marker::PhantomData, @@ -29,6 +29,12 @@ pub trait NamedColumnFamily: 'static + Copy { const ALL: &'static [Self]; /// Names a column family to access it in `RocksDB`. Also used in metrics reporting. fn name(&self) -> &'static str; + + /// Returns whether this CF is so large that it's likely to require special configuration in terms + /// of compaction / memtables. + fn requires_tuning(&self) -> bool { + false + } } /// Thin typesafe wrapper around RocksDB `WriteBatch`. @@ -96,12 +102,39 @@ pub(crate) struct RocksDBInner { } impl RocksDBInner { - pub(crate) fn report_sizes(&self, metrics: &RocksdbSizeMetrics) { + pub(crate) fn collect_metrics(&self, metrics: &RocksdbSizeMetrics) { for &cf_name in &self.cf_names { let cf = self.db.cf_handle(cf_name).unwrap(); // ^ `unwrap()` is safe (CF existence is checked during DB initialization) let labels = RocksdbLabels::new(self.db_name, cf_name); + let writes_stopped = self.int_property(cf, properties::IS_WRITE_STOPPED); + let writes_stopped = writes_stopped == Some(1); + metrics.writes_stopped[&labels].set(writes_stopped.into()); + + let num_immutable_memtables = + self.int_property(cf, properties::NUM_IMMUTABLE_MEM_TABLE); + if let Some(num_immutable_memtables) = num_immutable_memtables { + metrics.immutable_mem_tables[&labels].set(num_immutable_memtables); + } + let num_level0_files = self.int_property(cf, &properties::num_files_at_level(0)); + if let Some(num_level0_files) = num_level0_files { + metrics.level0_files[&labels].set(num_level0_files); + } + let num_flushes = self.int_property(cf, properties::NUM_RUNNING_FLUSHES); + if let Some(num_flushes) = num_flushes { + metrics.running_flushes[&labels].set(num_flushes); + } + let num_compactions = self.int_property(cf, properties::NUM_RUNNING_COMPACTIONS); + if let Some(num_compactions) = num_compactions { + metrics.running_compactions[&labels].set(num_compactions); + } + let pending_compactions = + self.int_property(cf, properties::ESTIMATE_PENDING_COMPACTION_BYTES); + if let Some(pending_compactions) = pending_compactions { + metrics.pending_compactions[&labels].set(pending_compactions); + } + let live_data_size = self.int_property(cf, properties::ESTIMATE_LIVE_DATA_SIZE); if let Some(size) = live_data_size { metrics.live_data_size[&labels].set(size); @@ -139,26 +172,66 @@ impl RocksDBInner { } property } + + /// Waits until writes are not stopped for any of the CFs. Writes can stop immediately on DB initialization + /// if there are too many level-0 SST files; in this case, it may help waiting several seconds until + /// these files are compacted. + fn wait_for_writes_to_resume(&self) { + const RETRY_COUNT: usize = 10; + const RETRY_INTERVAL: Duration = Duration::from_secs(1); + + for retry in 0..RETRY_COUNT { + let cfs_with_stopped_writes = self.cf_names.iter().copied().filter(|cf_name| { + let cf = self.db.cf_handle(cf_name).unwrap(); + // ^ `unwrap()` is safe (CF existence is checked during DB initialization) + self.int_property(cf, properties::IS_WRITE_STOPPED) == Some(1) + }); + let cfs_with_stopped_writes: Vec<_> = cfs_with_stopped_writes.collect(); + if cfs_with_stopped_writes.is_empty() { + return; + } else { + tracing::info!( + "Writes are stopped for column families {cfs_with_stopped_writes:?} in DB `{}` \ + (retry: {retry}/{RETRY_COUNT})", + self.db_name + ); + thread::sleep(RETRY_INTERVAL); + } + } + + tracing::warn!( + "Exceeded {RETRY_COUNT} retries waiting for writes to resume in DB `{}`; \ + proceeding with stopped writes", + self.db_name + ); + } } #[derive(Debug, Clone, Copy)] struct StalledWritesRetries { max_batch_size: usize, retry_count: usize, - interval: Duration, + start_interval: Duration, + scale_factor: f64, } impl Default for StalledWritesRetries { fn default() -> Self { Self { max_batch_size: 128 << 20, // 128 MiB - retry_count: 3, - interval: Duration::from_millis(100), + retry_count: 10, + start_interval: Duration::from_millis(50), + scale_factor: 1.5, } } } impl StalledWritesRetries { + fn interval(&self, retry_index: usize) -> Duration { + self.start_interval + .mul_f64(self.scale_factor.powi(retry_index as i32)) + } + // **NB.** The error message may change between RocksDB versions! fn is_write_stall_error(error: &rocksdb::Error) -> bool { matches!(error.kind(), rocksdb::ErrorKind::ShutdownInProgress) @@ -166,6 +239,19 @@ impl StalledWritesRetries { } } +/// [`RocksDB`] options. +#[derive(Debug, Clone, Copy, Default)] +pub struct RocksDBOptions { + /// Byte capacity of the block cache (the main RocksDB cache for reads). If not set, default RocksDB + /// cache options will be used. + pub block_cache_capacity: Option, + /// Byte capacity of memtables (recent, non-persisted changes to RocksDB) set for large CFs + /// (as defined in [`NamedColumnFamily::requires_tuning()`]). + /// Setting this to a reasonably large value (order of 512 MiB) is helpful for large DBs that experience + /// write stalls. If not set, large CFs will not be configured specially. + pub large_memtable_capacity: Option, +} + /// Thin wrapper around a RocksDB instance. /// /// The wrapper is cheaply cloneable (internally, it wraps a DB instance in an [`Arc`]). @@ -179,13 +265,13 @@ pub struct RocksDB { impl RocksDB { pub fn new(path: &Path) -> Self { - Self::with_cache(path, None) + Self::with_options(path, RocksDBOptions::default()) } - pub fn with_cache(path: &Path, block_cache_capacity: Option) -> Self { - let caches = RocksDBCaches::new(block_cache_capacity); - let options = Self::rocksdb_options(None); - let existing_cfs = DB::list_cf(&options, path).unwrap_or_else(|err| { + pub fn with_options(path: &Path, options: RocksDBOptions) -> Self { + let caches = RocksDBCaches::new(options.block_cache_capacity); + let db_options = Self::rocksdb_options(None, None); + let existing_cfs = DB::list_cf(&db_options, path).unwrap_or_else(|err| { tracing::warn!( "Failed getting column families for RocksDB `{}` at `{}`, assuming CFs are empty; {err}", CF::DB_NAME, @@ -194,15 +280,18 @@ impl RocksDB { vec![] }); - let cf_names: HashSet<_> = CF::ALL.iter().map(|cf| cf.name()).collect(); + let cfs_and_options: HashMap<_, _> = CF::ALL + .iter() + .map(|cf| (cf.name(), cf.requires_tuning())) + .collect(); let obsolete_cfs: Vec<_> = existing_cfs .iter() .filter_map(|cf_name| { let cf_name = cf_name.as_str(); // The default CF is created on RocksDB instantiation in any case; it doesn't need // to be explicitly opened. - let is_obsolete = - cf_name != rocksdb::DEFAULT_COLUMN_FAMILY_NAME && !cf_names.contains(cf_name); + let is_obsolete = cf_name != rocksdb::DEFAULT_COLUMN_FAMILY_NAME + && !cfs_and_options.contains_key(cf_name); is_obsolete.then_some(cf_name) }) .collect(); @@ -216,18 +305,22 @@ impl RocksDB { } // Open obsolete CFs as well; RocksDB initialization will panic otherwise. - let all_cf_names = cf_names.iter().copied().chain(obsolete_cfs); - let cfs = all_cf_names.map(|cf_name| { + let cf_names = cfs_and_options.keys().copied().collect(); + let all_cfs_and_options = cfs_and_options + .into_iter() + .chain(obsolete_cfs.into_iter().map(|name| (name, false))); + let cfs = all_cfs_and_options.map(|(cf_name, requires_tuning)| { let mut block_based_options = BlockBasedOptions::default(); block_based_options.set_bloom_filter(10.0, false); if let Some(cache) = &caches.shared { block_based_options.set_block_cache(cache); } - let cf_options = Self::rocksdb_options(Some(block_based_options)); + let memtable_capacity = options.large_memtable_capacity.filter(|_| requires_tuning); + let cf_options = Self::rocksdb_options(memtable_capacity, Some(block_based_options)); ColumnFamilyDescriptor::new(cf_name, cf_options) }); - let db = DB::open_cf_descriptors(&options, path, cfs).expect("failed to init rocksdb"); + let db = DB::open_cf_descriptors(&db_options, path, cfs).expect("failed to init rocksdb"); let inner = Arc::new(RocksDBInner { db, db_name: CF::DB_NAME, @@ -237,6 +330,13 @@ impl RocksDB { }); RocksdbSizeMetrics::register(CF::DB_NAME, Arc::downgrade(&inner)); + tracing::info!( + "Initialized RocksDB `{}` at `{}` with {options:?}", + CF::DB_NAME, + path.display() + ); + + inner.wait_for_writes_to_resume(); Self { inner, sync_writes: false, @@ -253,16 +353,21 @@ impl RocksDB { self } - fn rocksdb_options(block_based_options: Option) -> Options { + fn rocksdb_options( + memtable_capacity: Option, + block_based_options: Option, + ) -> Options { let mut options = Options::default(); options.create_missing_column_families(true); options.create_if_missing(true); let num_cpus = num_cpus::get() as i32; options.increase_parallelism(num_cpus); + if let Some(memtable_capacity) = memtable_capacity { + options.optimize_level_style_compaction(memtable_capacity); + } // Settings below are taken as per PingCAP recommendations: // https://www.pingcap.com/blog/how-to-troubleshoot-rocksdb-write-stalls-in-tikv/ - options.set_max_write_buffer_number(5); let max_background_jobs = (num_cpus - 1).clamp(1, 8); options.set_max_background_jobs(max_background_jobs); @@ -323,14 +428,18 @@ impl RocksDB { match self.write_inner(raw_batch) { Ok(()) => return Ok(()), Err(err) => { - let should_retry = StalledWritesRetries::is_write_stall_error(&err) - && retry_count < retries.retry_count; - if should_retry { + let is_stalled_write = StalledWritesRetries::is_write_stall_error(&err); + if is_stalled_write { + METRICS.report_stalled_write(CF::DB_NAME); + } + + if is_stalled_write && retry_count < retries.retry_count { + let retry_interval = retries.interval(retry_count); tracing::warn!( - "Writes stalled when writing to DB `{}`; will retry after a delay", + "Writes stalled when writing to DB `{}`; will retry after {retry_interval:?}", CF::DB_NAME ); - thread::sleep(retries.interval); + thread::sleep(retry_interval); retry_count += 1; raw_batch = rocksdb::WriteBatch::from_data(&raw_batch_bytes); } else { @@ -454,6 +563,20 @@ mod tests { use super::*; + #[test] + fn retry_interval_computation() { + let retries = StalledWritesRetries::default(); + assert_close(retries.interval(0), Duration::from_millis(50)); + assert_close(retries.interval(1), Duration::from_millis(75)); + assert_close(retries.interval(2), Duration::from_micros(112_500)); + } + + fn assert_close(lhs: Duration, rhs: Duration) { + let lhs_millis = (lhs.as_secs_f64() * 1_000.0).round() as u64; + let rhs_millis = (rhs.as_secs_f64() * 1_000.0).round() as u64; + assert_eq!(lhs_millis, rhs_millis); + } + #[derive(Debug, Clone, Copy)] enum OldColumnFamilies { Default, diff --git a/core/lib/storage/src/lib.rs b/core/lib/storage/src/lib.rs index 8c86876b5861..6ea928309ff4 100644 --- a/core/lib/storage/src/lib.rs +++ b/core/lib/storage/src/lib.rs @@ -1,5 +1,5 @@ pub mod db; mod metrics; -pub use db::RocksDB; +pub use db::{RocksDB, RocksDBOptions}; pub use rocksdb; diff --git a/core/lib/storage/src/metrics.rs b/core/lib/storage/src/metrics.rs index 1ea4824a6921..a8f4fb1e7b4e 100644 --- a/core/lib/storage/src/metrics.rs +++ b/core/lib/storage/src/metrics.rs @@ -1,7 +1,7 @@ //! General-purpose RocksDB metrics. All metrics code in the crate should be in this module. use once_cell::sync::Lazy; -use vise::{Buckets, Collector, EncodeLabelSet, Family, Gauge, Histogram, Metrics}; +use vise::{Buckets, Collector, Counter, EncodeLabelSet, Family, Gauge, Histogram, Metrics, Unit}; use std::{ collections::HashMap, @@ -41,12 +41,18 @@ pub(crate) struct RocksdbMetrics { /// Size of a serialized `WriteBatch` written to a RocksDB instance. #[metrics(buckets = BYTE_SIZE_BUCKETS)] write_batch_size: Family>, + /// Number of stalled writes for a RocksDB instance. + write_stalled: Family, } impl RocksdbMetrics { pub(crate) fn report_batch_size(&self, db: &'static str, batch_size: usize) { self.write_batch_size[&db.into()].observe(batch_size); } + + pub(crate) fn report_stalled_write(&self, db: &'static str) { + self.write_stalled[&db.into()].inc(); + } } #[vise::register] @@ -56,6 +62,20 @@ pub(crate) static METRICS: vise::Global = vise::Global::new(); #[derive(Debug, Metrics)] #[metrics(prefix = "rocksdb")] pub(crate) struct RocksdbSizeMetrics { + /// Boolean gauge indicating whether writing to the column family is currently stopped. + pub writes_stopped: Family>, + /// Number of immutable memtables. Large value increases risks of write stalls. + pub immutable_mem_tables: Family>, + /// Number of level-0 SST files. Large value increases risks of write stalls. + pub level0_files: Family>, + /// Number of memtable flushes running for the column family. + pub running_flushes: Family>, + /// Number of compactions running for the column family. + pub running_compactions: Family>, + /// Estimated number of bytes for pending compactions. + #[metrics(unit = Unit::Bytes)] + pub pending_compactions: Family>, + /// Estimated size of all live data in the column family of a RocksDB instance. pub live_data_size: Family>, /// Total size of all SST files in the column family of a RocksDB instance. @@ -93,7 +113,7 @@ impl RocksdbSizeMetrics { .expect("instances are poisoned") .retain(|_, instance| { if let Some(instance) = instance.upgrade() { - instance.report_sizes(&metrics); + instance.collect_metrics(&metrics); true } else { false diff --git a/core/lib/zksync_core/src/metadata_calculator/helpers.rs b/core/lib/zksync_core/src/metadata_calculator/helpers.rs index e80cabd81668..f1ab5e7a123a 100644 --- a/core/lib/zksync_core/src/metadata_calculator/helpers.rs +++ b/core/lib/zksync_core/src/metadata_calculator/helpers.rs @@ -18,7 +18,7 @@ use zksync_merkle_tree::{ domain::{TreeMetadata, ZkSyncTree, ZkSyncTreeReader}, Key, MerkleTreeColumnFamily, NoVersionError, TreeEntryWithProof, }; -use zksync_storage::RocksDB; +use zksync_storage::{RocksDB, RocksDBOptions}; use zksync_types::{block::L1BatchHeader, L1BatchNumber, StorageLog, H256}; use super::metrics::{LoadChangesStage, TreeUpdateStage, METRICS}; @@ -61,15 +61,16 @@ impl AsyncTree { mode: MerkleTreeMode, multi_get_chunk_size: usize, block_cache_capacity: usize, + memtable_capacity: usize, ) -> Self { tracing::info!( "Initializing Merkle tree at `{db_path}` with {multi_get_chunk_size} multi-get chunk size, \ - {block_cache_capacity}B block cache", + {block_cache_capacity}B block cache, {memtable_capacity}B memtable capacity", db_path = db_path.display() ); let mut tree = tokio::task::spawn_blocking(move || { - let db = Self::create_db(&db_path, block_cache_capacity); + let db = Self::create_db(&db_path, block_cache_capacity, memtable_capacity); match mode { MerkleTreeMode::Full => ZkSyncTree::new(db), MerkleTreeMode::Lightweight => ZkSyncTree::new_lightweight(db), @@ -85,8 +86,18 @@ impl AsyncTree { } } - fn create_db(path: &Path, block_cache_capacity: usize) -> RocksDB { - let db = RocksDB::with_cache(path, Some(block_cache_capacity)); + fn create_db( + path: &Path, + block_cache_capacity: usize, + memtable_capacity: usize, + ) -> RocksDB { + let db = RocksDB::with_options( + path, + RocksDBOptions { + block_cache_capacity: Some(block_cache_capacity), + large_memtable_capacity: Some(memtable_capacity), + }, + ); if cfg!(test) { // We need sync writes for the unit tests to execute reliably. With the default config, // some writes to RocksDB may occur, but not be visible to the test code. @@ -439,13 +450,23 @@ mod tests { extend_db_state(&mut storage, logs).await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let mut tree = - AsyncTree::new(temp_dir.path().to_owned(), MerkleTreeMode::Full, 500, 0).await; + let mut tree = create_tree(&temp_dir).await; for number in 0..3 { assert_log_equivalence(&mut storage, &mut tree, L1BatchNumber(number)).await; } } + async fn create_tree(temp_dir: &TempDir) -> AsyncTree { + AsyncTree::new( + temp_dir.path().to_owned(), + MerkleTreeMode::Full, + 500, + 0, + 16 << 20, // 16 MiB + ) + .await + } + async fn assert_log_equivalence( storage: &mut StorageProcessor<'_>, tree: &mut AsyncTree, @@ -540,8 +561,7 @@ mod tests { extend_db_state(&mut storage, logs).await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let mut tree = - AsyncTree::new(temp_dir.path().to_owned(), MerkleTreeMode::Full, 500, 0).await; + let mut tree = create_tree(&temp_dir).await; for batch_number in 0..5 { assert_log_equivalence(&mut storage, &mut tree, L1BatchNumber(batch_number)).await; } @@ -580,8 +600,7 @@ mod tests { assert_eq!(read_logs_count, 7); let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let mut tree = - AsyncTree::new(temp_dir.path().to_owned(), MerkleTreeMode::Full, 500, 0).await; + let mut tree = create_tree(&temp_dir).await; for batch_number in 0..3 { assert_log_equivalence(&mut storage, &mut tree, L1BatchNumber(batch_number)).await; } diff --git a/core/lib/zksync_core/src/metadata_calculator/mod.rs b/core/lib/zksync_core/src/metadata_calculator/mod.rs index d0359c754f29..296dcd7aabcb 100644 --- a/core/lib/zksync_core/src/metadata_calculator/mod.rs +++ b/core/lib/zksync_core/src/metadata_calculator/mod.rs @@ -70,8 +70,11 @@ pub struct MetadataCalculatorConfig<'a> { /// Chunk size for multi-get operations. Can speed up loading data for the Merkle tree on some environments, /// but the effects vary wildly depending on the setup (e.g., the filesystem used). pub multi_get_chunk_size: usize, - /// Capacity of RocksDB block cache in bytes. Reasonable values range from ~100 MB to several GB. + /// Capacity of RocksDB block cache in bytes. Reasonable values range from ~100 MiB to several GB. pub block_cache_capacity: usize, + /// Capacity of RocksDB memtables. Can be set to a reasonably large value (order of 512 MiB) + /// to mitigate write stalls. + pub memtable_capacity: usize, } impl<'a> MetadataCalculatorConfig<'a> { @@ -87,6 +90,7 @@ impl<'a> MetadataCalculatorConfig<'a> { max_l1_batches_per_iter: db_config.merkle_tree.max_l1_batches_per_iter, multi_get_chunk_size: db_config.merkle_tree.multi_get_chunk_size, block_cache_capacity: db_config.merkle_tree.block_cache_size(), + memtable_capacity: db_config.merkle_tree.memtable_capacity(), } } } diff --git a/core/lib/zksync_core/src/metadata_calculator/updater.rs b/core/lib/zksync_core/src/metadata_calculator/updater.rs index 0298ac7871b9..384ef1b2bdd4 100644 --- a/core/lib/zksync_core/src/metadata_calculator/updater.rs +++ b/core/lib/zksync_core/src/metadata_calculator/updater.rs @@ -44,6 +44,7 @@ impl TreeUpdater { mode, config.multi_get_chunk_size, config.block_cache_capacity, + config.memtable_capacity, ) .await; Self {