Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(db): Fix write stalls in RocksDB (for real this time) #292

Merged
merged 4 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion core/bin/external_node/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,14 @@ pub struct OptionalENConfig {
/// The default value is 128 MiB.
#[serde(default = "OptionalENConfig::default_merkle_tree_block_cache_size_mb")]
merkle_tree_block_cache_size_mb: usize,

/// Byte capacity of memtables (recent, non-persisted changes to RocksDB). Setting this to a reasonably
/// large value (order of 512 MiB) is helpful for large DBs that experience write stalls.
#[serde(default = "OptionalENConfig::default_merkle_tree_memtable_capacity_mb")]
merkle_tree_memtable_capacity_mb: usize,
/// Timeout to wait for the Merkle tree database to run compaction on startup so that it doesn't have
/// stopped writes.
#[serde(default = "OptionalENConfig::default_merkle_tree_init_stopped_writes_timeout_sec")]
merkle_tree_init_stopped_writes_timeout_sec: u64,

// Other config settings
/// Port on which the Prometheus exporter server is listening.
Expand Down Expand Up @@ -283,6 +286,10 @@ impl OptionalENConfig {
256
}

const fn default_merkle_tree_init_stopped_writes_timeout_sec() -> u64 {
30
}

const fn default_fee_history_limit() -> u64 {
1_024
}
Expand Down Expand Up @@ -332,6 +339,12 @@ impl OptionalENConfig {
self.merkle_tree_memtable_capacity_mb * BYTES_IN_MEGABYTE
}

/// Returns the timeout to wait for the Merkle tree database to run compaction on startup so that
/// it doesn't have stopped writes.
pub fn merkle_tree_init_stopped_writes_timeout(&self) -> Duration {
Duration::from_secs(self.merkle_tree_init_stopped_writes_timeout_sec)
}

pub fn api_namespaces(&self) -> Vec<Namespace> {
self.api_namespaces
.clone()
Expand Down
1 change: 1 addition & 0 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ async fn init_tasks(
multi_get_chunk_size: config.optional.merkle_tree_multi_get_chunk_size,
block_cache_capacity: config.optional.merkle_tree_block_cache_size(),
memtable_capacity: config.optional.merkle_tree_memtable_capacity(),
init_stopped_writes_timeout: config.optional.merkle_tree_init_stopped_writes_timeout(),
})
.await;
healthchecks.push(Box::new(metadata_calculator.tree_health_check()));
Expand Down
23 changes: 23 additions & 0 deletions core/lib/config/src/configs/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ pub struct MerkleTreeConfig {
/// large value (order of 512 MiB) is helpful for large DBs that experience write stalls.
#[serde(default = "MerkleTreeConfig::default_memtable_capacity_mb")]
pub memtable_capacity_mb: usize,
/// Timeout to wait for the Merkle tree database to run compaction on startup so that it doesn't have
/// stopped writes.
#[serde(default = "MerkleTreeConfig::default_init_stopped_writes_timeout_sec")]
pub init_stopped_writes_timeout_sec: u64,
/// Maximum number of L1 batches to be processed by the Merkle tree at a time.
#[serde(default = "MerkleTreeConfig::default_max_l1_batches_per_iter")]
pub max_l1_batches_per_iter: usize,
Expand All @@ -56,6 +60,7 @@ impl Default for MerkleTreeConfig {
multi_get_chunk_size: Self::default_multi_get_chunk_size(),
block_cache_size_mb: Self::default_block_cache_size_mb(),
memtable_capacity_mb: Self::default_memtable_capacity_mb(),
init_stopped_writes_timeout_sec: Self::default_init_stopped_writes_timeout_sec(),
max_l1_batches_per_iter: Self::default_max_l1_batches_per_iter(),
}
}
Expand All @@ -82,6 +87,10 @@ impl MerkleTreeConfig {
256
}

const fn default_init_stopped_writes_timeout_sec() -> u64 {
30
}

const fn default_max_l1_batches_per_iter() -> usize {
20
}
Expand All @@ -95,6 +104,12 @@ impl MerkleTreeConfig {
pub fn memtable_capacity(&self) -> usize {
self.memtable_capacity_mb * super::BYTES_IN_MEGABYTE
}

/// Returns the timeout to wait for the Merkle tree database to run compaction on startup so that
/// it doesn't have stopped writes.
pub fn init_stopped_writes_timeout(&self) -> Duration {
Duration::from_secs(self.init_stopped_writes_timeout_sec)
}
}

/// Database configuration.
Expand Down Expand Up @@ -165,6 +180,8 @@ mod tests {
DATABASE_MERKLE_TREE_PATH="/db/tree"
DATABASE_MERKLE_TREE_MODE=lightweight
DATABASE_MERKLE_TREE_MULTI_GET_CHUNK_SIZE=250
DATABASE_MERKLE_TREE_MEMTABLE_CAPACITY_MB=512
DATABASE_MERKLE_TREE_INIT_STOPPED_WRITES_TIMEOUT_SEC=60
DATABASE_MERKLE_TREE_MAX_L1_BATCHES_PER_ITER=50
DATABASE_BACKUP_COUNT=5
DATABASE_BACKUP_INTERVAL_MS=60000
Expand All @@ -178,6 +195,8 @@ mod tests {
assert_eq!(db_config.merkle_tree.mode, MerkleTreeMode::Lightweight);
assert_eq!(db_config.merkle_tree.multi_get_chunk_size, 250);
assert_eq!(db_config.merkle_tree.max_l1_batches_per_iter, 50);
assert_eq!(db_config.merkle_tree.memtable_capacity_mb, 512);
assert_eq!(db_config.merkle_tree.init_stopped_writes_timeout_sec, 60);
assert_eq!(db_config.backup_count, 5);
assert_eq!(db_config.backup_interval().as_secs(), 60);
}
Expand All @@ -192,6 +211,8 @@ mod tests {
"DATABASE_MERKLE_TREE_MODE",
"DATABASE_MERKLE_TREE_MULTI_GET_CHUNK_SIZE",
"DATABASE_MERKLE_TREE_BLOCK_CACHE_SIZE_MB",
"DATABASE_MERKLE_TREE_MEMTABLE_CAPACITY_MB",
"DATABASE_MERKLE_TREE_INIT_STOPPED_WRITES_TIMEOUT_SEC",
"DATABASE_MERKLE_TREE_MAX_L1_BATCHES_PER_ITER",
"DATABASE_BACKUP_COUNT",
"DATABASE_BACKUP_INTERVAL_MS",
Expand All @@ -205,6 +226,8 @@ mod tests {
assert_eq!(db_config.merkle_tree.multi_get_chunk_size, 500);
assert_eq!(db_config.merkle_tree.max_l1_batches_per_iter, 20);
assert_eq!(db_config.merkle_tree.block_cache_size_mb, 128);
assert_eq!(db_config.merkle_tree.memtable_capacity_mb, 256);
assert_eq!(db_config.merkle_tree.init_stopped_writes_timeout_sec, 30);
assert_eq!(db_config.backup_count, 5);
assert_eq!(db_config.backup_interval().as_secs(), 60);

Expand Down
37 changes: 27 additions & 10 deletions core/lib/storage/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
path::Path,
sync::{Arc, Condvar, Mutex},
thread,
time::Duration,
time::{Duration, Instant},
};

use crate::metrics::{RocksdbLabels, RocksdbSizeMetrics, METRICS};
Expand Down Expand Up @@ -176,11 +176,12 @@ impl RocksDBInner {
/// Waits until writes are not stopped for any of the CFs. Writes can stop immediately on DB initialization
/// if there are too many level-0 SST files; in this case, it may help waiting several seconds until
/// these files are compacted.
fn wait_for_writes_to_resume(&self) {
const RETRY_COUNT: usize = 10;
fn wait_for_writes_to_resume(&self, timeout: Duration) {
const RETRY_INTERVAL: Duration = Duration::from_secs(1);

for retry in 0..RETRY_COUNT {
let started_at = Instant::now();
let mut retry = 0;
while started_at.elapsed() < timeout {
let cfs_with_stopped_writes = self.cf_names.iter().copied().filter(|cf_name| {
let cf = self.db.cf_handle(cf_name).unwrap();
// ^ `unwrap()` is safe (CF existence is checked during DB initialization)
Expand All @@ -192,16 +193,16 @@ impl RocksDBInner {
} else {
tracing::info!(
"Writes are stopped for column families {cfs_with_stopped_writes:?} in DB `{}` \
(retry: {retry}/{RETRY_COUNT})",
(retry: {retry})",
self.db_name
);
thread::sleep(RETRY_INTERVAL);
retry += 1;
}
}

tracing::warn!(
"Exceeded {RETRY_COUNT} retries waiting for writes to resume in DB `{}`; \
proceeding with stopped writes",
"Exceeded retries waiting for writes to resume in DB `{}`; proceeding with stopped writes",
self.db_name
);
}
Expand All @@ -212,15 +213,17 @@ struct StalledWritesRetries {
max_batch_size: usize,
retry_count: usize,
start_interval: Duration,
max_interval: Duration,
scale_factor: f64,
}

impl Default for StalledWritesRetries {
fn default() -> Self {
Self {
max_batch_size: 128 << 20, // 128 MiB
retry_count: 10,
retry_count: 20,
start_interval: Duration::from_millis(50),
max_interval: Duration::from_secs(2),
scale_factor: 1.5,
}
}
Expand All @@ -230,6 +233,7 @@ impl StalledWritesRetries {
fn interval(&self, retry_index: usize) -> Duration {
self.start_interval
.mul_f64(self.scale_factor.powi(retry_index as i32))
.min(self.max_interval)
}

// **NB.** The error message may change between RocksDB versions!
Expand All @@ -240,7 +244,7 @@ impl StalledWritesRetries {
}

/// [`RocksDB`] options.
#[derive(Debug, Clone, Copy, Default)]
#[derive(Debug, Clone, Copy)]
pub struct RocksDBOptions {
/// Byte capacity of the block cache (the main RocksDB cache for reads). If not set, default RocksDB
/// cache options will be used.
Expand All @@ -250,6 +254,18 @@ pub struct RocksDBOptions {
/// Setting this to a reasonably large value (order of 512 MiB) is helpful for large DBs that experience
/// write stalls. If not set, large CFs will not be configured specially.
pub large_memtable_capacity: Option<usize>,
/// Timeout to wait for the database to run compaction on startup so that it doesn't have stopped writes.
pub init_stopped_writes_timeout: Duration,
}

impl Default for RocksDBOptions {
fn default() -> Self {
Self {
block_cache_capacity: None,
large_memtable_capacity: None,
init_stopped_writes_timeout: Duration::from_secs(10),
}
}
}

/// Thin wrapper around a RocksDB instance.
Expand Down Expand Up @@ -336,7 +352,7 @@ impl<CF: NamedColumnFamily> RocksDB<CF> {
path.display()
);

inner.wait_for_writes_to_resume();
inner.wait_for_writes_to_resume(options.init_stopped_writes_timeout);
Self {
inner,
sync_writes: false,
Expand Down Expand Up @@ -569,6 +585,7 @@ mod tests {
assert_close(retries.interval(0), Duration::from_millis(50));
assert_close(retries.interval(1), Duration::from_millis(75));
assert_close(retries.interval(2), Duration::from_micros(112_500));
assert_close(retries.interval(20), retries.max_interval);
}

fn assert_close(lhs: Duration, rhs: Duration) {
Expand Down
13 changes: 11 additions & 2 deletions core/lib/zksync_core/src/metadata_calculator/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl AsyncTree {
multi_get_chunk_size: usize,
block_cache_capacity: usize,
memtable_capacity: usize,
init_stopped_writes_timeout: Duration,
) -> Self {
tracing::info!(
"Initializing Merkle tree at `{db_path}` with {multi_get_chunk_size} multi-get chunk size, \
Expand All @@ -70,7 +71,12 @@ impl AsyncTree {
);

let mut tree = tokio::task::spawn_blocking(move || {
let db = Self::create_db(&db_path, block_cache_capacity, memtable_capacity);
let db = Self::create_db(
&db_path,
block_cache_capacity,
memtable_capacity,
init_stopped_writes_timeout,
);
match mode {
MerkleTreeMode::Full => ZkSyncTree::new(db),
MerkleTreeMode::Lightweight => ZkSyncTree::new_lightweight(db),
Expand All @@ -90,12 +96,14 @@ impl AsyncTree {
path: &Path,
block_cache_capacity: usize,
memtable_capacity: usize,
init_stopped_writes_timeout: Duration,
) -> RocksDB<MerkleTreeColumnFamily> {
let db = RocksDB::with_options(
path,
RocksDBOptions {
block_cache_capacity: Some(block_cache_capacity),
large_memtable_capacity: Some(memtable_capacity),
init_stopped_writes_timeout,
},
);
if cfg!(test) {
Expand Down Expand Up @@ -462,7 +470,8 @@ mod tests {
MerkleTreeMode::Full,
500,
0,
16 << 20, // 16 MiB
16 << 20, // 16 MiB,
Duration::ZERO, // writes should never be stalled in tests
)
.await
}
Expand Down
4 changes: 4 additions & 0 deletions core/lib/zksync_core/src/metadata_calculator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ pub struct MetadataCalculatorConfig<'a> {
/// Capacity of RocksDB memtables. Can be set to a reasonably large value (order of 512 MiB)
/// to mitigate write stalls.
pub memtable_capacity: usize,
/// Timeout to wait for the Merkle tree database to run compaction on startup so that it doesn't have
/// stopped writes.
pub init_stopped_writes_timeout: Duration,
}

impl<'a> MetadataCalculatorConfig<'a> {
Expand All @@ -91,6 +94,7 @@ impl<'a> MetadataCalculatorConfig<'a> {
multi_get_chunk_size: db_config.merkle_tree.multi_get_chunk_size,
block_cache_capacity: db_config.merkle_tree.block_cache_size(),
memtable_capacity: db_config.merkle_tree.memtable_capacity(),
init_stopped_writes_timeout: db_config.merkle_tree.init_stopped_writes_timeout(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions core/lib/zksync_core/src/metadata_calculator/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl TreeUpdater {
config.multi_get_chunk_size,
config.block_cache_capacity,
config.memtable_capacity,
config.init_stopped_writes_timeout,
)
.await;
Self {
Expand Down