Skip to content

Commit

Permalink
propagate started_at from the start of vm execution
Browse files Browse the repository at this point in the history
  • Loading branch information
itegulov committed Jul 3, 2024
1 parent ab72aa2 commit 522e471
Show file tree
Hide file tree
Showing 9 changed files with 20 additions and 8 deletions.
2 changes: 2 additions & 0 deletions core/node/node_sync/src/sync_state.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::time::Instant;
use std::{sync::Arc, time::Duration};

use async_trait::async_trait;
Expand Down Expand Up @@ -124,6 +125,7 @@ impl StateKeeperOutputHandler for SyncState {
async fn handle_l1_batch(
&mut self,
updates_manager: Arc<UpdatesManager>,
_started_at: Instant,
) -> anyhow::Result<()> {
let sealed_block_number = updates_manager.l2_block.number;
self.set_local_block(sealed_block_number);
Expand Down
4 changes: 3 additions & 1 deletion core/node/state_keeper/src/io/output_handler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Handling outputs produced by the state keeper.
use std::time::Instant;
use std::{fmt, sync::Arc};

use anyhow::Context as _;
Expand All @@ -23,6 +24,7 @@ pub trait StateKeeperOutputHandler: 'static + Send + fmt::Debug {
async fn handle_l1_batch(
&mut self,
_updates_manager: Arc<UpdatesManager>,
_started_at: Instant,
) -> anyhow::Result<()> {
Ok(())
}
Expand Down Expand Up @@ -88,7 +90,7 @@ impl OutputHandler {
) -> anyhow::Result<()> {
for handler in &mut self.inner {
handler
.handle_l1_batch(updates_manager.clone())
.handle_l1_batch(updates_manager.clone(), Instant::now())
.await
.with_context(|| {
format!(
Expand Down
2 changes: 2 additions & 0 deletions core/node/state_keeper/src/io/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ impl StateKeeperOutputHandler for StateKeeperPersistence {
async fn handle_l1_batch(
&mut self,
updates_manager: Arc<UpdatesManager>,
_started_at: Instant,
) -> anyhow::Result<()> {
// We cannot start sealing an L1 batch until we've sealed all L2 blocks included in it.
self.wait_for_all_commands().await;
Expand Down Expand Up @@ -270,6 +271,7 @@ impl StateKeeperOutputHandler for TreeWritesPersistence {
async fn handle_l1_batch(
&mut self,
updates_manager: Arc<UpdatesManager>,
_started_at: Instant,
) -> anyhow::Result<()> {
let mut connection = self.pool.connection_tagged("state_keeper").await?;
let finished_batch = updates_manager
Expand Down
1 change: 1 addition & 0 deletions core/node/state_keeper/src/testonly/test_batch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ impl StateKeeperOutputHandler for TestPersistence {
async fn handle_l1_batch(
&mut self,
updates_manager: Arc<UpdatesManager>,
_started_at: Instant,
) -> anyhow::Result<()> {
let action = self.pop_next_item("seal_l1_batch");
let ScenarioItem::BatchSeal(_, check_fn) = action else {
Expand Down
1 change: 1 addition & 0 deletions core/node/vm_runner/src/impls/protective_reads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ impl StateKeeperOutputHandler for ProtectiveReadsOutputHandler {
async fn handle_l1_batch(
&mut self,
updates_manager: Arc<UpdatesManager>,
_started_at: Instant,
) -> anyhow::Result<()> {
let finished_batch = updates_manager
.l1_batch
Expand Down
7 changes: 4 additions & 3 deletions core/node/vm_runner/src/output_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ impl StateKeeperOutputHandler for AsyncOutputHandler {
async fn handle_l1_batch(
&mut self,
updates_manager: Arc<UpdatesManager>,
started_at: Instant,
) -> anyhow::Result<()> {
let state = mem::replace(self, AsyncOutputHandler::Finished);
match state {
Expand All @@ -173,9 +174,9 @@ impl StateKeeperOutputHandler for AsyncOutputHandler {
} => {
sender
.send(tokio::task::spawn(async move {
let started_at = Instant::now();
let result = handler.handle_l1_batch(updates_manager).await;
METRICS.output_handle_time.observe(started_at.elapsed());
let latency = METRICS.output_handle_time.start();
let result = handler.handle_l1_batch(updates_manager, started_at).await;
latency.observe();
result.map(|_| started_at)
}))
.ok();
Expand Down
7 changes: 4 additions & 3 deletions core/node/vm_runner/src/process.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::time::Instant;
use std::{sync::Arc, time::Duration};

use anyhow::Context;
Expand Down Expand Up @@ -61,7 +62,7 @@ impl VmRunner {
mut updates_manager: UpdatesManager,
mut output_handler: Box<dyn StateKeeperOutputHandler>,
) -> anyhow::Result<()> {
let latency = METRICS.run_vm_time.start();
let started_at = Instant::now();
for (i, l2_block) in l2_blocks.into_iter().enumerate() {
if i > 0 {
// First L2 block in every batch is already preloaded
Expand Down Expand Up @@ -115,9 +116,9 @@ impl VmRunner {
.await
.context("failed finishing L1 batch in executor")?;
updates_manager.finish_batch(finished_batch);
latency.observe();
METRICS.run_vm_time.observe(started_at.elapsed());
output_handler
.handle_l1_batch(Arc::new(updates_manager))
.handle_l1_batch(Arc::new(updates_manager), started_at)
.await
.context("VM runner failed to handle L1 batch")?;
Ok(())
Expand Down
1 change: 1 addition & 0 deletions core/node/vm_runner/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ impl OutputHandlerFactory for TestOutputFactory {
async fn handle_l1_batch(
&mut self,
_updates_manager: Arc<UpdatesManager>,
_started_at: Instant,
) -> anyhow::Result<()> {
if let Some(delay) = self.delay {
tokio::time::sleep(delay).await
Expand Down
3 changes: 2 additions & 1 deletion core/node/vm_runner/src/tests/output_handler.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::time::Instant;
use std::{collections::HashMap, sync::Arc, time::Duration};

use tokio::{
Expand Down Expand Up @@ -80,7 +81,7 @@ impl OutputHandlerTester {
.await
.unwrap();
output_handler
.handle_l1_batch(Arc::new(updates_manager))
.handle_l1_batch(Arc::new(updates_manager), Instant::now())
.await
.unwrap();
});
Expand Down

0 comments on commit 522e471

Please sign in to comment.