Skip to content

Commit

Permalink
add metrics for vm runner
Browse files Browse the repository at this point in the history
  • Loading branch information
itegulov committed Jun 11, 2024
1 parent 4c18755 commit 1056047
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/node/vm_runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ async-trait.workspace = true
once_cell.workspace = true
tracing.workspace = true
dashmap.workspace = true
vise.workspace = true

[dev-dependencies]
zksync_node_test_utils.workspace = true
Expand Down
1 change: 1 addition & 0 deletions core/node/vm_runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod output_handler;
mod process;
mod storage;

mod metrics;
#[cfg(test)]
mod tests;

Expand Down
28 changes: 28 additions & 0 deletions core/node/vm_runner/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//! Metrics for `VmRunner`.
use std::time::Duration;

use vise::{Buckets, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, Metrics, Unit};

#[derive(Debug, Metrics)]
#[metrics(prefix = "vm_runner")]
pub(super) struct VmRunnerMetrics {
/// Last batch that has been marked as processed.
pub last_processed_batch: Gauge<u64>,
/// Last batch that is ready to be processed.
pub last_ready_batch: Gauge<u64>,
/// Current amount of batches that are being processed.
pub in_progress_l1_batches: Gauge<u64>,
/// Total latency of loading an L1 batch (RocksDB mode only).
#[metrics(buckets = Buckets::LATENCIES)]
pub storage_load_time: Histogram<Duration>,
/// Total latency of running VM on an L1 batch.
#[metrics(buckets = Buckets::LATENCIES)]
pub run_vm_time: Histogram<Duration>,
/// Total latency of handling output of an L1 batch.
#[metrics(buckets = Buckets::LATENCIES)]
pub output_handle_time: Histogram<Duration>,
}

#[vise::register]
pub(super) static METRICS: vise::Global<VmRunnerMetrics> = vise::Global::new();
12 changes: 9 additions & 3 deletions core/node/vm_runner/src/output_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
fmt::{Debug, Formatter},
mem,
sync::Arc,
time::Duration,
time::{Duration, Instant},
};

use anyhow::Context;
Expand All @@ -16,7 +16,7 @@ use zksync_dal::{ConnectionPool, Core};
use zksync_state_keeper::{StateKeeperOutputHandler, UpdatesManager};
use zksync_types::L1BatchNumber;

use crate::VmRunnerIo;
use crate::{metrics::METRICS, VmRunnerIo};

type BatchReceiver = oneshot::Receiver<JoinHandle<anyhow::Result<()>>>;

Expand Down Expand Up @@ -173,7 +173,10 @@ impl StateKeeperOutputHandler for AsyncOutputHandler {
} => {
sender
.send(tokio::task::spawn(async move {
handler.handle_l1_batch(updates_manager).await
let started_at = Instant::now();
let result = handler.handle_l1_batch(updates_manager).await;
METRICS.output_handle_time.observe(started_at.elapsed());
result
}))
.ok();
Ok(())
Expand Down Expand Up @@ -248,6 +251,9 @@ impl<Io: VmRunnerIo> ConcurrentOutputHandlerFactoryTask<Io> {
self.io
.mark_l1_batch_as_completed(&mut conn, latest_processed_batch)
.await?;
METRICS
.last_processed_batch
.set(latest_processed_batch.0 as u64);
}
}
}
Expand Down
13 changes: 11 additions & 2 deletions core/node/vm_runner/src/process.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{sync::Arc, time::Duration};
use std::{
sync::Arc,
time::{Duration, Instant},
};

use anyhow::Context;
use multivm::interface::L2BlockEnv;
Expand All @@ -10,7 +13,7 @@ use zksync_state_keeper::{
};
use zksync_types::{block::L2BlockExecutionData, L1BatchNumber};

use crate::{storage::StorageLoader, OutputHandlerFactory, VmRunnerIo};
use crate::{metrics::METRICS, storage::StorageLoader, OutputHandlerFactory, VmRunnerIo};

/// VM runner represents a logic layer of L1 batch / L2 block processing flow akin to that of state
/// keeper. The difference is that VM runner is designed to be run on batches/blocks that have
Expand Down Expand Up @@ -61,6 +64,7 @@ impl VmRunner {
mut updates_manager: UpdatesManager,
mut output_handler: Box<dyn StateKeeperOutputHandler>,
) -> anyhow::Result<()> {
let started_at = Instant::now();
for (i, l2_block) in l2_blocks.into_iter().enumerate() {
if i > 0 {
// First L2 block in every batch is already preloaded
Expand Down Expand Up @@ -114,6 +118,7 @@ impl VmRunner {
.await
.context("failed finishing L1 batch in executor")?;
updates_manager.finish_batch(finished_batch);
METRICS.run_vm_time.observe(started_at.elapsed());
output_handler
.handle_l1_batch(Arc::new(updates_manager))
.await
Expand Down Expand Up @@ -148,11 +153,15 @@ impl VmRunner {
}
}
task_handles = retained_handles;
METRICS
.in_progress_l1_batches
.set(task_handles.len() as u64);

let last_ready_batch = self
.io
.last_ready_to_be_loaded_batch(&mut self.pool.connection().await?)
.await?;
METRICS.last_ready_batch.set(last_ready_batch.0 as u64);
if next_batch > last_ready_batch {
// Next batch is not ready to be processed yet
tokio::time::sleep(SLEEP_INTERVAL).await;
Expand Down
6 changes: 4 additions & 2 deletions core/node/vm_runner/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
collections::{BTreeMap, HashMap},
fmt::Debug,
sync::Arc,
time::Duration,
time::{Duration, Instant},
};

use anyhow::Context as _;
Expand All @@ -19,7 +19,7 @@ use zksync_state::{
use zksync_storage::RocksDB;
use zksync_types::{block::L2BlockExecutionData, L1BatchNumber, L2ChainId};

use crate::VmRunnerIo;
use crate::{metrics::METRICS, VmRunnerIo};

#[async_trait]
pub trait StorageLoader: ReadStorageFactory {
Expand Down Expand Up @@ -338,6 +338,7 @@ impl<Io: VmRunnerIo> StorageSyncTask<Io> {
drop(state);
let max_desired = self.io.last_ready_to_be_loaded_batch(&mut conn).await?;
for l1_batch_number in max_present.0 + 1..=max_desired.0 {
let started_at = Instant::now();
let l1_batch_number = L1BatchNumber(l1_batch_number);
let Some(execute_data) = Self::load_batch_execute_data(
&mut conn,
Expand Down Expand Up @@ -374,6 +375,7 @@ impl<Io: VmRunnerIo> StorageSyncTask<Io> {
.storage
.insert(l1_batch_number, BatchData { execute_data, diff });
drop(state);
METRICS.storage_load_time.observe(started_at.elapsed());
}
drop(conn);
}
Expand Down

0 comments on commit 1056047

Please sign in to comment.