diff --git a/core/node/node_sync/src/sync_state.rs b/core/node/node_sync/src/sync_state.rs index e061ff7da012..dd4309e09eb9 100644 --- a/core/node/node_sync/src/sync_state.rs +++ b/core/node/node_sync/src/sync_state.rs @@ -1,4 +1,7 @@ -use std::{sync::Arc, time::Duration}; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; use async_trait::async_trait; use serde::Serialize; @@ -124,6 +127,7 @@ impl StateKeeperOutputHandler for SyncState { async fn handle_l1_batch( &mut self, updates_manager: Arc, + _started_at: Instant, ) -> anyhow::Result<()> { let sealed_block_number = updates_manager.l2_block.number; self.set_local_block(sealed_block_number); diff --git a/core/node/state_keeper/src/io/output_handler.rs b/core/node/state_keeper/src/io/output_handler.rs index ec254eaec669..5b427b58937b 100644 --- a/core/node/state_keeper/src/io/output_handler.rs +++ b/core/node/state_keeper/src/io/output_handler.rs @@ -1,6 +1,6 @@ //! Handling outputs produced by the state keeper. -use std::{fmt, sync::Arc}; +use std::{fmt, sync::Arc, time::Instant}; use anyhow::Context as _; use async_trait::async_trait; @@ -23,6 +23,7 @@ pub trait StateKeeperOutputHandler: 'static + Send + fmt::Debug { async fn handle_l1_batch( &mut self, _updates_manager: Arc, + _started_at: Instant, ) -> anyhow::Result<()> { Ok(()) } @@ -88,7 +89,7 @@ impl OutputHandler { ) -> anyhow::Result<()> { for handler in &mut self.inner { handler - .handle_l1_batch(updates_manager.clone()) + .handle_l1_batch(updates_manager.clone(), Instant::now()) .await .with_context(|| { format!( diff --git a/core/node/state_keeper/src/io/persistence.rs b/core/node/state_keeper/src/io/persistence.rs index de9ac22e1777..a046cc897b60 100644 --- a/core/node/state_keeper/src/io/persistence.rs +++ b/core/node/state_keeper/src/io/persistence.rs @@ -166,6 +166,7 @@ impl StateKeeperOutputHandler for StateKeeperPersistence { async fn handle_l1_batch( &mut self, updates_manager: Arc, + _started_at: Instant, ) -> anyhow::Result<()> { // We cannot start sealing an L1 batch until we've sealed all L2 blocks included in it. self.wait_for_all_commands().await; @@ -270,6 +271,7 @@ impl StateKeeperOutputHandler for TreeWritesPersistence { async fn handle_l1_batch( &mut self, updates_manager: Arc, + _started_at: Instant, ) -> anyhow::Result<()> { let mut connection = self.pool.connection_tagged("state_keeper").await?; let finished_batch = updates_manager diff --git a/core/node/state_keeper/src/testonly/test_batch_executor.rs b/core/node/state_keeper/src/testonly/test_batch_executor.rs index 9cb701797483..c5ee2d6b0edc 100644 --- a/core/node/state_keeper/src/testonly/test_batch_executor.rs +++ b/core/node/state_keeper/src/testonly/test_batch_executor.rs @@ -540,6 +540,7 @@ impl StateKeeperOutputHandler for TestPersistence { async fn handle_l1_batch( &mut self, updates_manager: Arc, + _started_at: Instant, ) -> anyhow::Result<()> { let action = self.pop_next_item("seal_l1_batch"); let ScenarioItem::BatchSeal(_, check_fn) = action else { diff --git a/core/node/vm_runner/src/impls/protective_reads.rs b/core/node/vm_runner/src/impls/protective_reads.rs index 700417746f3a..f6604a875b40 100644 --- a/core/node/vm_runner/src/impls/protective_reads.rs +++ b/core/node/vm_runner/src/impls/protective_reads.rs @@ -134,6 +134,7 @@ impl StateKeeperOutputHandler for ProtectiveReadsOutputHandler { async fn handle_l1_batch( &mut self, updates_manager: Arc, + _started_at: Instant, ) -> anyhow::Result<()> { let finished_batch = updates_manager .l1_batch diff --git a/core/node/vm_runner/src/output_handler.rs b/core/node/vm_runner/src/output_handler.rs index 40c65b5aea46..99f63a26ff89 100644 --- a/core/node/vm_runner/src/output_handler.rs +++ b/core/node/vm_runner/src/output_handler.rs @@ -164,6 +164,7 @@ impl StateKeeperOutputHandler for AsyncOutputHandler { async fn handle_l1_batch( &mut self, updates_manager: Arc, + started_at: Instant, ) -> anyhow::Result<()> { let state = mem::replace(self, AsyncOutputHandler::Finished); match state { @@ -173,9 +174,9 @@ impl StateKeeperOutputHandler for AsyncOutputHandler { } => { sender .send(tokio::task::spawn(async move { - let started_at = Instant::now(); - let result = handler.handle_l1_batch(updates_manager).await; - METRICS.output_handle_time.observe(started_at.elapsed()); + let latency = METRICS.output_handle_time.start(); + let result = handler.handle_l1_batch(updates_manager, started_at).await; + latency.observe(); result.map(|_| started_at) })) .ok(); diff --git a/core/node/vm_runner/src/process.rs b/core/node/vm_runner/src/process.rs index 8a9ebb4e3dc9..4bef66ca1b78 100644 --- a/core/node/vm_runner/src/process.rs +++ b/core/node/vm_runner/src/process.rs @@ -1,4 +1,7 @@ -use std::{sync::Arc, time::Duration}; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; use anyhow::Context; use tokio::{sync::watch, task::JoinHandle}; @@ -61,7 +64,7 @@ impl VmRunner { mut updates_manager: UpdatesManager, mut output_handler: Box, ) -> anyhow::Result<()> { - let latency = METRICS.run_vm_time.start(); + let started_at = Instant::now(); for (i, l2_block) in l2_blocks.into_iter().enumerate() { if i > 0 { // First L2 block in every batch is already preloaded @@ -115,9 +118,9 @@ impl VmRunner { .await .context("failed finishing L1 batch in executor")?; updates_manager.finish_batch(finished_batch); - latency.observe(); + METRICS.run_vm_time.observe(started_at.elapsed()); output_handler - .handle_l1_batch(Arc::new(updates_manager)) + .handle_l1_batch(Arc::new(updates_manager), started_at) .await .context("VM runner failed to handle L1 batch")?; Ok(()) diff --git a/core/node/vm_runner/src/tests/mod.rs b/core/node/vm_runner/src/tests/mod.rs index 8111adf48c81..a8f6cfc355fc 100644 --- a/core/node/vm_runner/src/tests/mod.rs +++ b/core/node/vm_runner/src/tests/mod.rs @@ -167,6 +167,7 @@ impl OutputHandlerFactory for TestOutputFactory { async fn handle_l1_batch( &mut self, _updates_manager: Arc, + _started_at: Instant, ) -> anyhow::Result<()> { if let Some(delay) = self.delay { tokio::time::sleep(delay).await diff --git a/core/node/vm_runner/src/tests/output_handler.rs b/core/node/vm_runner/src/tests/output_handler.rs index 453507328c4f..ca68e9245f0d 100644 --- a/core/node/vm_runner/src/tests/output_handler.rs +++ b/core/node/vm_runner/src/tests/output_handler.rs @@ -1,4 +1,8 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + sync::Arc, + time::{Duration, Instant}, +}; use tokio::{ sync::{watch, RwLock}, @@ -80,7 +84,7 @@ impl OutputHandlerTester { .await .unwrap(); output_handler - .handle_l1_batch(Arc::new(updates_manager)) + .handle_l1_batch(Arc::new(updates_manager), Instant::now()) .await .unwrap(); });