Skip to content

Commit

Permalink
feat(en): Add pruning health checks and rework pruning config (#1790)
Browse files Browse the repository at this point in the history
## What ❔

- Adds health checks for pruning tasks.
- Reworks the metadata calculator health check so that it's up-to-date
in case of pruning.
- Extends the snapshot recovery IT so that it tests pruning as well.

## Why ❔

- Pruning is an important aspect to test, and IT would allow to test it
in (somewhat) realistic conditions.
- Other changes are required for the integration test, but they also
make sense in general.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.

feat(db): Implement weak references to RocksDB
test(en): Add integration test for pruning
  • Loading branch information
slowli authored Apr 30, 2024
1 parent d62dd08 commit e0d4daa
Show file tree
Hide file tree
Showing 23 changed files with 1,015 additions and 416 deletions.
37 changes: 33 additions & 4 deletions core/bin/external_node/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
env, fmt,
num::{NonZeroU32, NonZeroUsize},
num::{NonZeroU32, NonZeroU64, NonZeroUsize},
str::FromStr,
time::Duration,
};
Expand Down Expand Up @@ -380,11 +380,24 @@ pub(crate) struct OptionalENConfig {
#[serde(default = "OptionalENConfig::default_snapshots_recovery_postgres_max_concurrency")]
pub snapshots_recovery_postgres_max_concurrency: NonZeroUsize,

/// Enables pruning of the historical node state (Postgres and Merkle tree). The node will retain
/// recent state and will continuously remove (prune) old enough parts of the state in the background.
#[serde(default)]
pub pruning_enabled: bool,
/// Number of L1 batches pruned at a time.
#[serde(default = "OptionalENConfig::default_pruning_chunk_size")]
pub pruning_chunk_size: u32,

/// If set, l1 batches will be pruned after they are that long
pub pruning_data_retention_hours: Option<u64>,
/// Delta between soft- and hard-removing data from Postgres. Should be reasonably large (order of 60 seconds).
/// The default value is 60 seconds.
#[serde(default = "OptionalENConfig::default_pruning_removal_delay_sec")]
pruning_removal_delay_sec: NonZeroU64,
/// If set, L1 batches will be pruned after the batch timestamp is this old (in seconds). Note that an L1 batch
/// may be temporarily retained for other reasons; e.g., a batch cannot be pruned until it is executed on L1,
/// which happens roughly 24 hours after its generation on the mainnet. Thus, in practice this value can specify
/// the retention period greater than that implicitly imposed by other criteria (e.g., 7 or 30 days).
/// If set to 0, L1 batches will not be retained based on their timestamp. The default value is 1 hour.
#[serde(default = "OptionalENConfig::default_pruning_data_retention_sec")]
pruning_data_retention_sec: u64,
}

#[derive(Debug, Clone, PartialEq, Deserialize)]
Expand Down Expand Up @@ -528,6 +541,14 @@ impl OptionalENConfig {
10
}

fn default_pruning_removal_delay_sec() -> NonZeroU64 {
NonZeroU64::new(60).unwrap()
}

fn default_pruning_data_retention_sec() -> u64 {
3_600 // 1 hour
}

pub fn polling_interval(&self) -> Duration {
Duration::from_millis(self.polling_interval)
}
Expand Down Expand Up @@ -604,6 +625,14 @@ impl OptionalENConfig {
Duration::from_millis(self.mempool_cache_update_interval)
}

pub fn pruning_removal_delay(&self) -> Duration {
Duration::from_secs(self.pruning_removal_delay_sec.get())
}

pub fn pruning_data_retention(&self) -> Duration {
Duration::from_secs(self.pruning_data_retention_sec)
}

#[cfg(test)]
fn mock() -> Self {
// Set all values to their defaults
Expand Down
5 changes: 2 additions & 3 deletions core/bin/external_node/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,8 @@ pub(crate) async fn ensure_storage_initialized(
InitDecision::SnapshotRecovery => {
anyhow::ensure!(
consider_snapshot_recovery,
"Snapshot recovery is required to proceed, but it is not enabled. Enable by supplying \
`--enable-snapshots-recovery` command-line arg to the node binary, or reset the node storage \
to sync from genesis"
"Snapshot recovery is required to proceed, but it is not enabled. Enable by setting \
`EN_SNAPSHOTS_RECOVERY_ENABLED=true` env variable to the node binary, or use a Postgres dump for recovery"
);

tracing::warn!("Proceeding with snapshot recovery. This is an experimental feature; use at your own risk");
Expand Down
38 changes: 28 additions & 10 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,24 @@ async fn run_tree(
.await
.context("failed creating DB pool for Merkle tree recovery")?;

let metadata_calculator = MetadataCalculator::new(metadata_calculator_config, None, tree_pool)
.await
.context("failed initializing metadata calculator")?
.with_recovery_pool(recovery_pool);
let mut metadata_calculator =
MetadataCalculator::new(metadata_calculator_config, None, tree_pool)
.await
.context("failed initializing metadata calculator")?
.with_recovery_pool(recovery_pool);

let tree_reader = Arc::new(metadata_calculator.tree_reader());
app_health.insert_component(metadata_calculator.tree_health_check())?;
app_health.insert_custom_component(Arc::new(metadata_calculator.tree_health_check()))?;

if config.optional.pruning_enabled {
tracing::warn!("Proceeding with node state pruning for the Merkle tree. This is an experimental feature; use at your own risk");

let pruning_task =
metadata_calculator.pruning_task(config.optional.pruning_removal_delay() / 2);
app_health.insert_component(pruning_task.health_check())?;
let pruning_task_handle = tokio::spawn(pruning_task.run(stop_receiver.clone()));
task_futures.push(pruning_task_handle);
}

if let Some(api_config) = api_config {
let address = (Ipv4Addr::UNSPECIFIED, api_config.port).into();
Expand Down Expand Up @@ -281,21 +292,22 @@ async fn run_core(
}
}));

if let Some(data_retention_hours) = config.optional.pruning_data_retention_hours {
let minimum_l1_batch_age = Duration::from_secs(3600 * data_retention_hours);
if config.optional.pruning_enabled {
tracing::warn!("Proceeding with node state pruning for Postgres. This is an experimental feature; use at your own risk");

let minimum_l1_batch_age = config.optional.pruning_data_retention();
tracing::info!(
"Configured pruning of batches after they become {minimum_l1_batch_age:?} old"
);
let db_pruner = DbPruner::new(
DbPrunerConfig {
// don't change this value without adjusting API server pruning info cache max age
soft_and_hard_pruning_time_delta: Duration::from_secs(60),
next_iterations_delay: Duration::from_secs(30),
removal_delay: config.optional.pruning_removal_delay(),
pruned_batch_chunk_size: config.optional.pruning_chunk_size,
minimum_l1_batch_age,
},
connection_pool.clone(),
);
app_health.insert_component(db_pruner.health_check())?;
task_handles.push(tokio::spawn(db_pruner.run(stop_receiver.clone())));
}

Expand Down Expand Up @@ -498,13 +510,18 @@ async fn run_api(
mempool_cache_update_task.run(stop_receiver.clone()),
));

// The refresh interval should be several times lower than the pruning removal delay, so that
// soft-pruning will timely propagate to the API server.
let pruning_info_refresh_interval = config.optional.pruning_removal_delay() / 5;

if components.contains(&Component::HttpApi) {
let mut builder =
ApiBuilder::jsonrpsee_backend(config.clone().into(), connection_pool.clone())
.http(config.required.http_port)
.with_filter_limit(config.optional.filters_limit)
.with_batch_request_size_limit(config.optional.max_batch_request_size)
.with_response_body_size_limit(config.optional.max_response_body_size())
.with_pruning_info_refresh_interval(pruning_info_refresh_interval)
.with_tx_sender(tx_sender.clone())
.with_vm_barrier(vm_barrier.clone())
.with_sync_state(sync_state.clone())
Expand Down Expand Up @@ -534,6 +551,7 @@ async fn run_api(
.with_batch_request_size_limit(config.optional.max_batch_request_size)
.with_response_body_size_limit(config.optional.max_response_body_size())
.with_polling_interval(config.optional.polling_interval())
.with_pruning_info_refresh_interval(pruning_info_refresh_interval)
.with_tx_sender(tx_sender)
.with_vm_barrier(vm_barrier)
.with_sync_state(sync_state)
Expand Down
16 changes: 16 additions & 0 deletions core/lib/health_check/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ impl Health {
pub fn status(&self) -> HealthStatus {
self.status
}

/// Returns health details. Mostly useful for testing.
pub fn details(&self) -> Option<&serde_json::Value> {
self.details.as_ref()
}
}

impl From<HealthStatus> for Health {
Expand Down Expand Up @@ -347,6 +352,17 @@ impl ReactiveHealthCheck {
};
(this, updater)
}

/// Waits until the specified `condition` is true for the tracked [`Health`], and returns health.
/// Mostly useful for testing.
///
/// If the health updater associated with this check is dropped, this method can wait indefinitely.
pub async fn wait_for(&mut self, condition: impl FnMut(&Health) -> bool) -> Health {
match self.health_receiver.wait_for(condition).await {
Ok(health) => health.clone(),
Err(_) => future::pending().await,
}
}
}

#[async_trait]
Expand Down
10 changes: 10 additions & 0 deletions core/lib/merkle_tree/src/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,16 @@ impl Clone for ZkSyncTreeReader {
}

impl ZkSyncTreeReader {
/// Creates a tree reader based on the provided database.
pub fn new(db: RocksDBWrapper) -> Self {
Self(MerkleTree::new(db))
}

/// Returns a reference to the database this.
pub fn db(&self) -> &RocksDBWrapper {
&self.0.db
}

/// Returns the current root hash of this tree.
pub fn root_hash(&self) -> ValueHash {
self.0.latest_root_hash()
Expand Down
17 changes: 10 additions & 7 deletions core/lib/merkle_tree/src/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,13 @@ impl<DB: PruneDatabase> MerkleTreePruner<DB> {
// We must retain at least one tree version.
let last_prunable_version = self.last_prunable_version();
if last_prunable_version.is_none() {
tracing::info!("Nothing to prune; skipping");
tracing::debug!("Nothing to prune; skipping");
return None;
}
let target_retained_version = last_prunable_version?.min(target_retained_version);
let stale_key_new_versions = min_stale_key_version..=target_retained_version;
if stale_key_new_versions.is_empty() {
tracing::info!(
tracing::debug!(
"No Merkle tree versions can be pruned; min stale key version is {min_stale_key_version}, \
target retained version is {target_retained_version}"
);
Expand All @@ -165,7 +165,7 @@ impl<DB: PruneDatabase> MerkleTreePruner<DB> {
load_stale_keys_latency.observe();

if pruned_keys.is_empty() {
tracing::info!("No stale keys to remove; skipping");
tracing::debug!("No stale keys to remove; skipping");
return None;
}
let deleted_stale_key_versions = min_stale_key_version..(max_stale_key_version + 1);
Expand Down Expand Up @@ -203,21 +203,24 @@ impl<DB: PruneDatabase> MerkleTreePruner<DB> {
let mut wait_interval = Duration::ZERO;
while !self.wait_for_abort(wait_interval) {
let retained_version = self.target_retained_version.load(Ordering::Relaxed);
if let Some(stats) = self.prune_up_to(retained_version) {
wait_interval = if let Some(stats) = self.prune_up_to(retained_version) {
tracing::debug!(
"Performed pruning for target retained version {retained_version}: {stats:?}"
);
stats.report();
if stats.has_more_work() {
continue;
// Continue pruning right away instead of waiting for abort.
Duration::ZERO
} else {
self.poll_interval
}
} else {
tracing::debug!(
"Pruning was not performed; waiting {:?}",
self.poll_interval
);
}
wait_interval = self.poll_interval;
self.poll_interval
};
}
}
}
Expand Down
61 changes: 58 additions & 3 deletions core/lib/storage/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
path::Path,
sync::{
atomic::{AtomicU64, Ordering},
Arc, Condvar, Mutex,
Arc, Condvar, Mutex, Weak,
},
thread,
time::{Duration, Instant},
Expand Down Expand Up @@ -314,15 +314,26 @@ impl Default for RocksDBOptions {

/// Thin wrapper around a RocksDB instance.
///
/// The wrapper is cheaply cloneable (internally, it wraps a DB instance in an [`Arc`]).
#[derive(Debug, Clone)]
/// The wrapper is cheaply cloneable; internally, it wraps a DB instance in an [`Arc`].
#[derive(Debug)]
pub struct RocksDB<CF> {
inner: Arc<RocksDBInner>,
sync_writes: bool,
stalled_writes_retries: StalledWritesRetries,
_cf: PhantomData<CF>,
}

impl<CF> Clone for RocksDB<CF> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
sync_writes: self.sync_writes,
stalled_writes_retries: self.stalled_writes_retries,
_cf: PhantomData,
}
}
}

impl<CF: NamedColumnFamily> RocksDB<CF> {
pub fn new(path: &Path) -> Result<Self, rocksdb::Error> {
Self::with_options(path, RocksDBOptions::default())
Expand Down Expand Up @@ -448,6 +459,15 @@ impl<CF: NamedColumnFamily> RocksDB<CF> {
options
}

pub fn downgrade(&self) -> WeakRocksDB<CF> {
WeakRocksDB {
inner: Arc::downgrade(&self.inner),
sync_writes: self.sync_writes,
stalled_writes_retries: self.stalled_writes_retries,
_cf: PhantomData,
}
}

pub fn estimated_number_of_entries(&self, cf: CF) -> u64 {
const ERROR_MSG: &str = "failed to get estimated number of entries";

Expand Down Expand Up @@ -628,6 +648,41 @@ impl RocksDB<()> {
}
}

/// Weak reference to a RocksDB instance. Doesn't prevent dropping the underlying instance;
/// to work with it, you should [upgrade](Self::upgrade()) the reference first.
///
/// The wrapper is cheaply cloneable; internally, it wraps a DB instance in a [`Weak`].
#[derive(Debug)]
pub struct WeakRocksDB<CF> {
inner: Weak<RocksDBInner>,
sync_writes: bool,
stalled_writes_retries: StalledWritesRetries,
_cf: PhantomData<CF>,
}

impl<CF: NamedColumnFamily> Clone for WeakRocksDB<CF> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
sync_writes: self.sync_writes,
stalled_writes_retries: self.stalled_writes_retries,
_cf: PhantomData,
}
}
}

impl<CF: NamedColumnFamily> WeakRocksDB<CF> {
/// Tries to upgrade to a strong reference to RocksDB. If the RocksDB instance has been dropped, returns `None`.
pub fn upgrade(&self) -> Option<RocksDB<CF>> {
Some(RocksDB {
inner: self.inner.upgrade()?,
sync_writes: self.sync_writes,
stalled_writes_retries: self.stalled_writes_retries,
_cf: PhantomData,
})
}
}

/// Profiling information for a logical I/O operation on RocksDB. Can be used to profile operations
/// distributed in time, including on multiple threads.
#[must_use = "`start_profiling()` should be called one or more times to actually perform profiling"]
Expand Down
2 changes: 1 addition & 1 deletion core/lib/storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod db;
mod metrics;

pub use db::{RocksDB, RocksDBOptions, StalledWritesRetries};
pub use db::{RocksDB, RocksDBOptions, StalledWritesRetries, WeakRocksDB};
pub use rocksdb;
Loading

0 comments on commit e0d4daa

Please sign in to comment.