From 9bea4bc65bc1f162683de8b0d27fae0453327ef8 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Thu, 6 Jun 2024 16:36:41 +1000 Subject: [PATCH 1/3] add metrics for vm runner --- Cargo.lock | 1 + core/node/vm_runner/Cargo.toml | 1 + core/node/vm_runner/src/lib.rs | 1 + core/node/vm_runner/src/metrics.rs | 28 +++++++++++++++++++++++ core/node/vm_runner/src/output_handler.rs | 12 +++++++--- core/node/vm_runner/src/process.rs | 13 +++++++++-- core/node/vm_runner/src/storage.rs | 6 +++-- 7 files changed, 55 insertions(+), 7 deletions(-) create mode 100644 core/node/vm_runner/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 0bb1fd0fced5..191200cdb1a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9441,6 +9441,7 @@ dependencies = [ "tempfile", "tokio", "tracing", + "vise", "vm_utils", "zksync_contracts", "zksync_dal", diff --git a/core/node/vm_runner/Cargo.toml b/core/node/vm_runner/Cargo.toml index b3ede5a796be..5571bb7f3fde 100644 --- a/core/node/vm_runner/Cargo.toml +++ b/core/node/vm_runner/Cargo.toml @@ -26,6 +26,7 @@ async-trait.workspace = true once_cell.workspace = true tracing.workspace = true dashmap.workspace = true +vise.workspace = true [dev-dependencies] zksync_node_test_utils.workspace = true diff --git a/core/node/vm_runner/src/lib.rs b/core/node/vm_runner/src/lib.rs index ca9f8bdc0eb4..50cf2a4433c1 100644 --- a/core/node/vm_runner/src/lib.rs +++ b/core/node/vm_runner/src/lib.rs @@ -9,6 +9,7 @@ mod output_handler; mod process; mod storage; +mod metrics; #[cfg(test)] mod tests; diff --git a/core/node/vm_runner/src/metrics.rs b/core/node/vm_runner/src/metrics.rs new file mode 100644 index 000000000000..4252ad5f0d4f --- /dev/null +++ b/core/node/vm_runner/src/metrics.rs @@ -0,0 +1,28 @@ +//! Metrics for `VmRunner`. + +use std::time::Duration; + +use vise::{Buckets, Gauge, Histogram, Metrics}; + +#[derive(Debug, Metrics)] +#[metrics(prefix = "vm_runner")] +pub(super) struct VmRunnerMetrics { + /// Last batch that has been marked as processed. + pub last_processed_batch: Gauge, + /// Last batch that is ready to be processed. + pub last_ready_batch: Gauge, + /// Current amount of batches that are being processed. + pub in_progress_l1_batches: Gauge, + /// Total latency of loading an L1 batch (RocksDB mode only). + #[metrics(buckets = Buckets::LATENCIES)] + pub storage_load_time: Histogram, + /// Total latency of running VM on an L1 batch. + #[metrics(buckets = Buckets::LATENCIES)] + pub run_vm_time: Histogram, + /// Total latency of handling output of an L1 batch. + #[metrics(buckets = Buckets::LATENCIES)] + pub output_handle_time: Histogram, +} + +#[vise::register] +pub(super) static METRICS: vise::Global = vise::Global::new(); diff --git a/core/node/vm_runner/src/output_handler.rs b/core/node/vm_runner/src/output_handler.rs index 49bed83cd96e..92afff688b29 100644 --- a/core/node/vm_runner/src/output_handler.rs +++ b/core/node/vm_runner/src/output_handler.rs @@ -2,7 +2,7 @@ use std::{ fmt::{Debug, Formatter}, mem, sync::Arc, - time::Duration, + time::{Duration, Instant}, }; use anyhow::Context; @@ -16,7 +16,7 @@ use zksync_dal::{ConnectionPool, Core}; use zksync_state_keeper::{StateKeeperOutputHandler, UpdatesManager}; use zksync_types::L1BatchNumber; -use crate::VmRunnerIo; +use crate::{metrics::METRICS, VmRunnerIo}; type BatchReceiver = oneshot::Receiver>>; @@ -173,7 +173,10 @@ impl StateKeeperOutputHandler for AsyncOutputHandler { } => { sender .send(tokio::task::spawn(async move { - handler.handle_l1_batch(updates_manager).await + let started_at = Instant::now(); + let result = handler.handle_l1_batch(updates_manager).await; + METRICS.output_handle_time.observe(started_at.elapsed()); + result })) .ok(); Ok(()) @@ -248,6 +251,9 @@ impl ConcurrentOutputHandlerFactoryTask { self.io .mark_l1_batch_as_completed(&mut conn, latest_processed_batch) .await?; + METRICS + .last_processed_batch + .set(latest_processed_batch.0 as u64); } } } diff --git a/core/node/vm_runner/src/process.rs b/core/node/vm_runner/src/process.rs index 5e51b5e658f7..9f8976e1b8d1 100644 --- a/core/node/vm_runner/src/process.rs +++ b/core/node/vm_runner/src/process.rs @@ -1,4 +1,7 @@ -use std::{sync::Arc, time::Duration}; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; use anyhow::Context; use multivm::interface::L2BlockEnv; @@ -10,7 +13,7 @@ use zksync_state_keeper::{ }; use zksync_types::{block::L2BlockExecutionData, L1BatchNumber}; -use crate::{storage::StorageLoader, OutputHandlerFactory, VmRunnerIo}; +use crate::{metrics::METRICS, storage::StorageLoader, OutputHandlerFactory, VmRunnerIo}; /// VM runner represents a logic layer of L1 batch / L2 block processing flow akin to that of state /// keeper. The difference is that VM runner is designed to be run on batches/blocks that have @@ -61,6 +64,7 @@ impl VmRunner { mut updates_manager: UpdatesManager, mut output_handler: Box, ) -> anyhow::Result<()> { + let started_at = Instant::now(); for (i, l2_block) in l2_blocks.into_iter().enumerate() { if i > 0 { // First L2 block in every batch is already preloaded @@ -114,6 +118,7 @@ impl VmRunner { .await .context("failed finishing L1 batch in executor")?; updates_manager.finish_batch(finished_batch); + METRICS.run_vm_time.observe(started_at.elapsed()); output_handler .handle_l1_batch(Arc::new(updates_manager)) .await @@ -148,11 +153,15 @@ impl VmRunner { } } task_handles = retained_handles; + METRICS + .in_progress_l1_batches + .set(task_handles.len() as u64); let last_ready_batch = self .io .last_ready_to_be_loaded_batch(&mut self.pool.connection().await?) .await?; + METRICS.last_ready_batch.set(last_ready_batch.0 as u64); if next_batch > last_ready_batch { // Next batch is not ready to be processed yet tokio::time::sleep(SLEEP_INTERVAL).await; diff --git a/core/node/vm_runner/src/storage.rs b/core/node/vm_runner/src/storage.rs index e7a8b147c76f..e4d0b5243da3 100644 --- a/core/node/vm_runner/src/storage.rs +++ b/core/node/vm_runner/src/storage.rs @@ -2,7 +2,7 @@ use std::{ collections::{BTreeMap, HashMap}, fmt::Debug, sync::Arc, - time::Duration, + time::{Duration, Instant}, }; use anyhow::Context as _; @@ -19,7 +19,7 @@ use zksync_state::{ use zksync_storage::RocksDB; use zksync_types::{block::L2BlockExecutionData, L1BatchNumber, L2ChainId}; -use crate::VmRunnerIo; +use crate::{metrics::METRICS, VmRunnerIo}; #[async_trait] pub trait StorageLoader: ReadStorageFactory { @@ -338,6 +338,7 @@ impl StorageSyncTask { drop(state); let max_desired = self.io.last_ready_to_be_loaded_batch(&mut conn).await?; for l1_batch_number in max_present.0 + 1..=max_desired.0 { + let started_at = Instant::now(); let l1_batch_number = L1BatchNumber(l1_batch_number); let Some(execute_data) = Self::load_batch_execute_data( &mut conn, @@ -374,6 +375,7 @@ impl StorageSyncTask { .storage .insert(l1_batch_number, BatchData { execute_data, diff }); drop(state); + METRICS.storage_load_time.observe(started_at.elapsed()); } drop(conn); } From bfb7c420c9133e2b18d47e77f8c050919a7d3fdc Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Tue, 11 Jun 2024 22:48:22 +1000 Subject: [PATCH 2/3] address comments --- core/node/vm_runner/src/output_handler.rs | 7 ++++--- core/node/vm_runner/src/process.rs | 8 ++++---- core/node/vm_runner/src/storage.rs | 4 ++-- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/core/node/vm_runner/src/output_handler.rs b/core/node/vm_runner/src/output_handler.rs index 92afff688b29..e1b25a42920c 100644 --- a/core/node/vm_runner/src/output_handler.rs +++ b/core/node/vm_runner/src/output_handler.rs @@ -12,6 +12,7 @@ use tokio::{ sync::{oneshot, watch}, task::JoinHandle, }; +use vise::Histogram; use zksync_dal::{ConnectionPool, Core}; use zksync_state_keeper::{StateKeeperOutputHandler, UpdatesManager}; use zksync_types::L1BatchNumber; @@ -173,9 +174,9 @@ impl StateKeeperOutputHandler for AsyncOutputHandler { } => { sender .send(tokio::task::spawn(async move { - let started_at = Instant::now(); + let latency = METRICS.output_handle_time.start(); let result = handler.handle_l1_batch(updates_manager).await; - METRICS.output_handle_time.observe(started_at.elapsed()); + latency.observe(); result })) .ok(); @@ -253,7 +254,7 @@ impl ConcurrentOutputHandlerFactoryTask { .await?; METRICS .last_processed_batch - .set(latest_processed_batch.0 as u64); + .set(latest_processed_batch.0.into()); } } } diff --git a/core/node/vm_runner/src/process.rs b/core/node/vm_runner/src/process.rs index 9f8976e1b8d1..6be46658007d 100644 --- a/core/node/vm_runner/src/process.rs +++ b/core/node/vm_runner/src/process.rs @@ -64,7 +64,7 @@ impl VmRunner { mut updates_manager: UpdatesManager, mut output_handler: Box, ) -> anyhow::Result<()> { - let started_at = Instant::now(); + let latency = METRICS.run_vm_time.start(); for (i, l2_block) in l2_blocks.into_iter().enumerate() { if i > 0 { // First L2 block in every batch is already preloaded @@ -118,7 +118,7 @@ impl VmRunner { .await .context("failed finishing L1 batch in executor")?; updates_manager.finish_batch(finished_batch); - METRICS.run_vm_time.observe(started_at.elapsed()); + latency.observe(); output_handler .handle_l1_batch(Arc::new(updates_manager)) .await @@ -155,13 +155,13 @@ impl VmRunner { task_handles = retained_handles; METRICS .in_progress_l1_batches - .set(task_handles.len() as u64); + .set(task_handles.len().into()); let last_ready_batch = self .io .last_ready_to_be_loaded_batch(&mut self.pool.connection().await?) .await?; - METRICS.last_ready_batch.set(last_ready_batch.0 as u64); + METRICS.last_ready_batch.set(last_ready_batch.0.into()); if next_batch > last_ready_batch { // Next batch is not ready to be processed yet tokio::time::sleep(SLEEP_INTERVAL).await; diff --git a/core/node/vm_runner/src/storage.rs b/core/node/vm_runner/src/storage.rs index e4d0b5243da3..5f8f32cc76c6 100644 --- a/core/node/vm_runner/src/storage.rs +++ b/core/node/vm_runner/src/storage.rs @@ -338,7 +338,7 @@ impl StorageSyncTask { drop(state); let max_desired = self.io.last_ready_to_be_loaded_batch(&mut conn).await?; for l1_batch_number in max_present.0 + 1..=max_desired.0 { - let started_at = Instant::now(); + let latency = METRICS.storage_load_time.start(); let l1_batch_number = L1BatchNumber(l1_batch_number); let Some(execute_data) = Self::load_batch_execute_data( &mut conn, @@ -375,7 +375,7 @@ impl StorageSyncTask { .storage .insert(l1_batch_number, BatchData { execute_data, diff }); drop(state); - METRICS.storage_load_time.observe(started_at.elapsed()); + latency.observe(); } drop(conn); } From 80c9f29990925867eec53c6047cb9f68642d5540 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Tue, 11 Jun 2024 23:08:07 +1000 Subject: [PATCH 3/3] lint --- core/node/vm_runner/src/output_handler.rs | 3 +-- core/node/vm_runner/src/process.rs | 7 ++----- core/node/vm_runner/src/storage.rs | 2 +- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/core/node/vm_runner/src/output_handler.rs b/core/node/vm_runner/src/output_handler.rs index e1b25a42920c..4052c245a44f 100644 --- a/core/node/vm_runner/src/output_handler.rs +++ b/core/node/vm_runner/src/output_handler.rs @@ -2,7 +2,7 @@ use std::{ fmt::{Debug, Formatter}, mem, sync::Arc, - time::{Duration, Instant}, + time::Duration, }; use anyhow::Context; @@ -12,7 +12,6 @@ use tokio::{ sync::{oneshot, watch}, task::JoinHandle, }; -use vise::Histogram; use zksync_dal::{ConnectionPool, Core}; use zksync_state_keeper::{StateKeeperOutputHandler, UpdatesManager}; use zksync_types::L1BatchNumber; diff --git a/core/node/vm_runner/src/process.rs b/core/node/vm_runner/src/process.rs index 6be46658007d..e19a131ce0d7 100644 --- a/core/node/vm_runner/src/process.rs +++ b/core/node/vm_runner/src/process.rs @@ -1,7 +1,4 @@ -use std::{ - sync::Arc, - time::{Duration, Instant}, -}; +use std::{sync::Arc, time::Duration}; use anyhow::Context; use multivm::interface::L2BlockEnv; @@ -155,7 +152,7 @@ impl VmRunner { task_handles = retained_handles; METRICS .in_progress_l1_batches - .set(task_handles.len().into()); + .set(task_handles.len() as u64); let last_ready_batch = self .io diff --git a/core/node/vm_runner/src/storage.rs b/core/node/vm_runner/src/storage.rs index 5f8f32cc76c6..7a53f6034a75 100644 --- a/core/node/vm_runner/src/storage.rs +++ b/core/node/vm_runner/src/storage.rs @@ -2,7 +2,7 @@ use std::{ collections::{BTreeMap, HashMap}, fmt::Debug, sync::Arc, - time::{Duration, Instant}, + time::Duration, }; use anyhow::Context as _;