Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(en): Add pruning health checks and rework pruning config #1790

Merged
merged 22 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions core/bin/external_node/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
env, fmt,
num::{NonZeroU32, NonZeroUsize},
num::{NonZeroU32, NonZeroU64, NonZeroUsize},
str::FromStr,
time::Duration,
};
Expand Down Expand Up @@ -376,11 +376,24 @@ pub(crate) struct OptionalENConfig {
#[serde(default = "OptionalENConfig::default_snapshots_recovery_postgres_max_concurrency")]
pub snapshots_recovery_postgres_max_concurrency: NonZeroUsize,

/// Enables pruning of the historical node state (Postgres and Merkle tree). The node will retain
/// recent state and will continuously remove (prune) old enough parts of the state in the background.
#[serde(default)]
pub pruning_enabled: bool,
/// Number of L1 batches pruned at a time.
#[serde(default = "OptionalENConfig::default_pruning_chunk_size")]
pub pruning_chunk_size: u32,

/// If set, l1 batches will be pruned after they are that long
pub pruning_data_retention_hours: Option<u64>,
/// Delta between soft- and hard-removing data from Postgres. Should be reasonably large (order of 60 seconds).
/// The default value is 60 seconds.
#[serde(default = "OptionalENConfig::default_pruning_removal_delay_sec")]
pruning_removal_delay_sec: NonZeroU64,
/// If set, L1 batches will be pruned after the batch timestamp is this old (in seconds). Note that an L1 batch
/// may be temporarily retained for other reasons; e.g., a batch cannot be pruned until it is executed on L1,
/// which happens roughly 24 hours after its generation on the mainnet. Thus, in practice this value can specify
/// the retention period greater than that implicitly imposed by other criteria (e.g., 7 or 30 days).
/// If set to 0, L1 batches will not be retained based on their timestamp. The default value is 1 hour.
#[serde(default = "OptionalENConfig::default_pruning_data_retention_sec")]
pruning_data_retention_sec: u64,
}

#[derive(Debug, Clone, PartialEq, Deserialize)]
Expand Down Expand Up @@ -524,6 +537,14 @@ impl OptionalENConfig {
10
}

fn default_pruning_removal_delay_sec() -> NonZeroU64 {
NonZeroU64::new(60).unwrap()
}

fn default_pruning_data_retention_sec() -> u64 {
3_600 // 1 hour
}

pub fn polling_interval(&self) -> Duration {
Duration::from_millis(self.polling_interval)
}
Expand Down Expand Up @@ -600,6 +621,14 @@ impl OptionalENConfig {
Duration::from_millis(self.mempool_cache_update_interval)
}

pub fn pruning_removal_delay(&self) -> Duration {
Duration::from_secs(self.pruning_removal_delay_sec.get())
}

pub fn pruning_data_retention(&self) -> Duration {
Duration::from_secs(self.pruning_data_retention_sec)
}

#[cfg(test)]
fn mock() -> Self {
// Set all values to their defaults
Expand Down
5 changes: 2 additions & 3 deletions core/bin/external_node/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,8 @@ pub(crate) async fn ensure_storage_initialized(
InitDecision::SnapshotRecovery => {
anyhow::ensure!(
consider_snapshot_recovery,
"Snapshot recovery is required to proceed, but it is not enabled. Enable by supplying \
`--enable-snapshots-recovery` command-line arg to the node binary, or reset the node storage \
to sync from genesis"
"Snapshot recovery is required to proceed, but it is not enabled. Enable by setting \
`EN_SNAPSHOTS_RECOVERY_ENABLED=true` env variable to the node binary, or use a Postgres dump for recovery"
);

tracing::warn!("Proceeding with snapshot recovery. This is an experimental feature; use at your own risk");
Expand Down
38 changes: 28 additions & 10 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,24 @@ async fn run_tree(
.await
.context("failed creating DB pool for Merkle tree recovery")?;

let metadata_calculator = MetadataCalculator::new(metadata_calculator_config, None, tree_pool)
.await
.context("failed initializing metadata calculator")?
.with_recovery_pool(recovery_pool);
let mut metadata_calculator =
MetadataCalculator::new(metadata_calculator_config, None, tree_pool)
.await
.context("failed initializing metadata calculator")?
.with_recovery_pool(recovery_pool);

let tree_reader = Arc::new(metadata_calculator.tree_reader());
app_health.insert_component(metadata_calculator.tree_health_check())?;
app_health.insert_custom_component(Arc::new(metadata_calculator.tree_health_check()))?;

if config.optional.pruning_enabled {
tracing::warn!("Proceeding with node state pruning for the Merkle tree. This is an experimental feature; use at your own risk");

let pruning_task =
metadata_calculator.pruning_task(config.optional.pruning_removal_delay() / 2);
app_health.insert_component(pruning_task.health_check())?;
let pruning_task_handle = tokio::spawn(pruning_task.run(stop_receiver.clone()));
task_futures.push(pruning_task_handle);
}

if let Some(api_config) = api_config {
let address = (Ipv4Addr::UNSPECIFIED, api_config.port).into();
Expand Down Expand Up @@ -281,21 +292,22 @@ async fn run_core(
}
}));

if let Some(data_retention_hours) = config.optional.pruning_data_retention_hours {
let minimum_l1_batch_age = Duration::from_secs(3600 * data_retention_hours);
if config.optional.pruning_enabled {
tracing::warn!("Proceeding with node state pruning for Postgres. This is an experimental feature; use at your own risk");

let minimum_l1_batch_age = config.optional.pruning_data_retention();
tracing::info!(
"Configured pruning of batches after they become {minimum_l1_batch_age:?} old"
);
let db_pruner = DbPruner::new(
DbPrunerConfig {
// don't change this value without adjusting API server pruning info cache max age
soft_and_hard_pruning_time_delta: Duration::from_secs(60),
next_iterations_delay: Duration::from_secs(30),
removal_delay: config.optional.pruning_removal_delay(),
pruned_batch_chunk_size: config.optional.pruning_chunk_size,
minimum_l1_batch_age,
},
connection_pool.clone(),
);
app_health.insert_component(db_pruner.health_check())?;
task_handles.push(tokio::spawn(db_pruner.run(stop_receiver.clone())));
}

Expand Down Expand Up @@ -498,13 +510,18 @@ async fn run_api(
mempool_cache_update_task.run(stop_receiver.clone()),
));

// The refresh interval should be several times lower than the pruning removal delay, so that
// soft-pruning will timely propagate to the API server.
let pruning_info_refresh_interval = config.optional.pruning_removal_delay() / 5;

if components.contains(&Component::HttpApi) {
let mut builder =
ApiBuilder::jsonrpsee_backend(config.clone().into(), connection_pool.clone())
.http(config.required.http_port)
.with_filter_limit(config.optional.filters_limit)
.with_batch_request_size_limit(config.optional.max_batch_request_size)
.with_response_body_size_limit(config.optional.max_response_body_size())
.with_pruning_info_refresh_interval(pruning_info_refresh_interval)
.with_tx_sender(tx_sender.clone())
.with_vm_barrier(vm_barrier.clone())
.with_sync_state(sync_state.clone())
Expand Down Expand Up @@ -534,6 +551,7 @@ async fn run_api(
.with_batch_request_size_limit(config.optional.max_batch_request_size)
.with_response_body_size_limit(config.optional.max_response_body_size())
.with_polling_interval(config.optional.polling_interval())
.with_pruning_info_refresh_interval(pruning_info_refresh_interval)
.with_tx_sender(tx_sender)
.with_vm_barrier(vm_barrier)
.with_sync_state(sync_state)
Expand Down
16 changes: 16 additions & 0 deletions core/lib/health_check/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ impl Health {
pub fn status(&self) -> HealthStatus {
self.status
}

/// Returns health details. Mostly useful for testing.
pub fn details(&self) -> Option<&serde_json::Value> {
self.details.as_ref()
}
}

impl From<HealthStatus> for Health {
Expand Down Expand Up @@ -347,6 +352,17 @@ impl ReactiveHealthCheck {
};
(this, updater)
}

/// Waits until the specified `condition` is true for the tracked [`Health`], and returns health.
/// Mostly useful for testing.
///
/// If the health updater associated with this check is dropped, this method can wait indefinitely.
pub async fn wait_for(&mut self, condition: impl FnMut(&Health) -> bool) -> Health {
match self.health_receiver.wait_for(condition).await {
Ok(health) => health.clone(),
Err(_) => future::pending().await,
}
}
}

#[async_trait]
Expand Down
10 changes: 10 additions & 0 deletions core/lib/merkle_tree/src/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,16 @@ impl Clone for ZkSyncTreeReader {
}

impl ZkSyncTreeReader {
/// Creates a tree reader based on the provided database.
pub fn new(db: RocksDBWrapper) -> Self {
Self(MerkleTree::new(db))
}

/// Returns a reference to the database this.
pub fn db(&self) -> &RocksDBWrapper {
&self.0.db
}

/// Returns the current root hash of this tree.
pub fn root_hash(&self) -> ValueHash {
self.0.latest_root_hash()
Expand Down
17 changes: 10 additions & 7 deletions core/lib/merkle_tree/src/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,13 @@ impl<DB: PruneDatabase> MerkleTreePruner<DB> {
// We must retain at least one tree version.
let last_prunable_version = self.last_prunable_version();
if last_prunable_version.is_none() {
tracing::info!("Nothing to prune; skipping");
tracing::debug!("Nothing to prune; skipping");
return None;
}
let target_retained_version = last_prunable_version?.min(target_retained_version);
let stale_key_new_versions = min_stale_key_version..=target_retained_version;
if stale_key_new_versions.is_empty() {
tracing::info!(
tracing::debug!(
"No Merkle tree versions can be pruned; min stale key version is {min_stale_key_version}, \
target retained version is {target_retained_version}"
);
Expand All @@ -165,7 +165,7 @@ impl<DB: PruneDatabase> MerkleTreePruner<DB> {
load_stale_keys_latency.observe();

if pruned_keys.is_empty() {
tracing::info!("No stale keys to remove; skipping");
tracing::debug!("No stale keys to remove; skipping");
return None;
}
let deleted_stale_key_versions = min_stale_key_version..(max_stale_key_version + 1);
Expand Down Expand Up @@ -203,21 +203,24 @@ impl<DB: PruneDatabase> MerkleTreePruner<DB> {
let mut wait_interval = Duration::ZERO;
while !self.wait_for_abort(wait_interval) {
let retained_version = self.target_retained_version.load(Ordering::Relaxed);
if let Some(stats) = self.prune_up_to(retained_version) {
wait_interval = if let Some(stats) = self.prune_up_to(retained_version) {
tracing::debug!(
"Performed pruning for target retained version {retained_version}: {stats:?}"
);
stats.report();
if stats.has_more_work() {
continue;
// Continue pruning right away instead of waiting for abort.
Duration::ZERO
} else {
self.poll_interval
}
} else {
tracing::debug!(
"Pruning was not performed; waiting {:?}",
self.poll_interval
);
}
wait_interval = self.poll_interval;
self.poll_interval
};
}
}
}
Expand Down
61 changes: 58 additions & 3 deletions core/lib/storage/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
path::Path,
sync::{
atomic::{AtomicU64, Ordering},
Arc, Condvar, Mutex,
Arc, Condvar, Mutex, Weak,
},
thread,
time::{Duration, Instant},
Expand Down Expand Up @@ -314,15 +314,26 @@ impl Default for RocksDBOptions {

/// Thin wrapper around a RocksDB instance.
///
/// The wrapper is cheaply cloneable (internally, it wraps a DB instance in an [`Arc`]).
#[derive(Debug, Clone)]
/// The wrapper is cheaply cloneable; internally, it wraps a DB instance in an [`Arc`].
#[derive(Debug)]
pub struct RocksDB<CF> {
inner: Arc<RocksDBInner>,
sync_writes: bool,
stalled_writes_retries: StalledWritesRetries,
_cf: PhantomData<CF>,
}

impl<CF> Clone for RocksDB<CF> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
sync_writes: self.sync_writes,
stalled_writes_retries: self.stalled_writes_retries,
_cf: PhantomData,
}
}
}

impl<CF: NamedColumnFamily> RocksDB<CF> {
pub fn new(path: &Path) -> Result<Self, rocksdb::Error> {
Self::with_options(path, RocksDBOptions::default())
Expand Down Expand Up @@ -448,6 +459,15 @@ impl<CF: NamedColumnFamily> RocksDB<CF> {
options
}

pub fn downgrade(&self) -> WeakRocksDB<CF> {
WeakRocksDB {
inner: Arc::downgrade(&self.inner),
sync_writes: self.sync_writes,
stalled_writes_retries: self.stalled_writes_retries,
_cf: PhantomData,
}
}

pub fn estimated_number_of_entries(&self, cf: CF) -> u64 {
const ERROR_MSG: &str = "failed to get estimated number of entries";

Expand Down Expand Up @@ -628,6 +648,41 @@ impl RocksDB<()> {
}
}

/// Weak reference to a RocksDB instance. Doesn't prevent dropping the underlying instance;
/// to work with it, you should [upgrade](Self::upgrade()) the reference first.
///
/// The wrapper is cheaply cloneable; internally, it wraps a DB instance in a [`Weak`].
#[derive(Debug)]
pub struct WeakRocksDB<CF> {
inner: Weak<RocksDBInner>,
sync_writes: bool,
stalled_writes_retries: StalledWritesRetries,
_cf: PhantomData<CF>,
}

impl<CF: NamedColumnFamily> Clone for WeakRocksDB<CF> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
sync_writes: self.sync_writes,
stalled_writes_retries: self.stalled_writes_retries,
_cf: PhantomData,
}
}
}

impl<CF: NamedColumnFamily> WeakRocksDB<CF> {
/// Tries to upgrade to a strong reference to RocksDB. If the RocksDB instance has been dropped, returns `None`.
pub fn upgrade(&self) -> Option<RocksDB<CF>> {
Some(RocksDB {
inner: self.inner.upgrade()?,
sync_writes: self.sync_writes,
stalled_writes_retries: self.stalled_writes_retries,
_cf: PhantomData,
})
}
}

/// Profiling information for a logical I/O operation on RocksDB. Can be used to profile operations
/// distributed in time, including on multiple threads.
#[must_use = "`start_profiling()` should be called one or more times to actually perform profiling"]
Expand Down
2 changes: 1 addition & 1 deletion core/lib/storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod db;
mod metrics;

pub use db::{RocksDB, RocksDBOptions, StalledWritesRetries};
pub use db::{RocksDB, RocksDBOptions, StalledWritesRetries, WeakRocksDB};
pub use rocksdb;
Loading
Loading