Skip to content

Commit

Permalink
make vm runner report time taken
Browse files Browse the repository at this point in the history
  • Loading branch information
itegulov committed Jul 2, 2024
1 parent 65973cc commit ab72aa2
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 33 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

13 changes: 10 additions & 3 deletions core/lib/dal/src/vm_runner_dal.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use zksync_db_connection::{connection::Connection, error::DalResult, instrument::InstrumentExt};
use std::time::Instant;

use zksync_db_connection::{
connection::Connection, error::DalResult, instrument::InstrumentExt,
utils::duration_to_naive_time,
};
use zksync_types::L1BatchNumber;

use crate::Core;
Expand Down Expand Up @@ -68,15 +73,17 @@ impl VmRunnerDal<'_, '_> {
pub async fn mark_protective_reads_batch_as_completed(
&mut self,
l1_batch_number: L1BatchNumber,
started_at: Instant,
) -> DalResult<()> {
sqlx::query!(
r#"
INSERT INTO
vm_runner_protective_reads (l1_batch_number, created_at, updated_at)
vm_runner_protective_reads (l1_batch_number, created_at, updated_at, time_taken)
VALUES
($1, NOW(), NOW())
($1, NOW(), NOW(), $2)
"#,
i64::from(l1_batch_number.0),
duration_to_naive_time(started_at.elapsed()),
)
.instrument("mark_protective_reads_batch_as_completed")
.report_latency()
Expand Down
14 changes: 10 additions & 4 deletions core/node/metadata_calculator/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
//! Tests for the metadata calculator component life cycle.
use std::{future::Future, ops, panic, path::Path, sync::Arc, time::Duration};
use std::{
future::Future,
ops, panic,
path::Path,
sync::Arc,
time::{Duration, Instant},
};

use assert_matches::assert_matches;
use itertools::Itertools;
Expand Down Expand Up @@ -545,7 +551,7 @@ async fn test_postgres_backup_recovery(
.unwrap();
storage
.vm_runner_dal()
.mark_protective_reads_batch_as_completed(batch_without_metadata.number)
.mark_protective_reads_batch_as_completed(batch_without_metadata.number, Instant::now())
.await
.unwrap();
insert_initial_writes_for_batch(&mut storage, batch_without_metadata.number).await;
Expand Down Expand Up @@ -575,7 +581,7 @@ async fn test_postgres_backup_recovery(
.await
.unwrap();
txn.vm_runner_dal()
.mark_protective_reads_batch_as_completed(batch_header.number)
.mark_protective_reads_batch_as_completed(batch_header.number, Instant::now())
.await
.unwrap();
insert_initial_writes_for_batch(&mut txn, batch_header.number).await;
Expand Down Expand Up @@ -812,7 +818,7 @@ pub(super) async fn extend_db_state_from_l1_batch(
.unwrap();
storage
.vm_runner_dal()
.mark_protective_reads_batch_as_completed(batch_number)
.mark_protective_reads_batch_as_completed(batch_number, Instant::now())
.await
.unwrap();
insert_initial_writes_for_batch(storage, batch_number).await;
Expand Down
5 changes: 3 additions & 2 deletions core/node/vm_runner/src/impls/protective_reads.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{sync::Arc, time::Instant};

use anyhow::Context;
use async_trait::async_trait;
Expand Down Expand Up @@ -111,10 +111,11 @@ impl VmRunnerIo for ProtectiveReadsIo {
&self,
conn: &mut Connection<'_, Core>,
l1_batch_number: L1BatchNumber,
started_at: Instant,
) -> anyhow::Result<()> {
Ok(conn
.vm_runner_dal()
.mark_protective_reads_batch_as_completed(l1_batch_number)
.mark_protective_reads_batch_as_completed(l1_batch_number, started_at)
.await?)
}
}
Expand Down
3 changes: 2 additions & 1 deletion core/node/vm_runner/src/io.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::Debug;
use std::{fmt::Debug, time::Instant};

use async_trait::async_trait;
use zksync_dal::{Connection, Core};
Expand Down Expand Up @@ -41,5 +41,6 @@ pub trait VmRunnerIo: Debug + Send + Sync + 'static {
&self,
conn: &mut Connection<'_, Core>,
l1_batch_number: L1BatchNumber,
started_at: Instant,
) -> anyhow::Result<()>;
}
16 changes: 8 additions & 8 deletions core/node/vm_runner/src/output_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
fmt::{Debug, Formatter},
mem,
sync::Arc,
time::Duration,
time::{Duration, Instant},
};

use anyhow::Context;
Expand All @@ -18,7 +18,7 @@ use zksync_types::L1BatchNumber;

use crate::{metrics::METRICS, VmRunnerIo};

type BatchReceiver = oneshot::Receiver<JoinHandle<anyhow::Result<()>>>;
type BatchReceiver = oneshot::Receiver<JoinHandle<anyhow::Result<Instant>>>;

/// Functionality to produce a [`StateKeeperOutputHandler`] implementation for a specific L1 batch.
///
Expand Down Expand Up @@ -131,7 +131,7 @@ impl<Io: VmRunnerIo, F: OutputHandlerFactory> OutputHandlerFactory
enum AsyncOutputHandler {
Running {
handler: Box<dyn StateKeeperOutputHandler>,
sender: oneshot::Sender<JoinHandle<anyhow::Result<()>>>,
sender: oneshot::Sender<JoinHandle<anyhow::Result<Instant>>>,
},
Finished,
}
Expand Down Expand Up @@ -173,10 +173,10 @@ impl StateKeeperOutputHandler for AsyncOutputHandler {
} => {
sender
.send(tokio::task::spawn(async move {
let latency = METRICS.output_handle_time.start();
let started_at = Instant::now();
let result = handler.handle_l1_batch(updates_manager).await;
latency.observe();
result
METRICS.output_handle_time.observe(started_at.elapsed());
result.map(|_| started_at)
}))
.ok();
Ok(())
Expand Down Expand Up @@ -243,13 +243,13 @@ impl<Io: VmRunnerIo> ConcurrentOutputHandlerFactoryTask<Io> {
.context("handler was dropped before the batch was fully processed")?;
// Wait until the handle is resolved, meaning that the `handle_l1_batch`
// computation has finished, and we can consider this batch to be completed
handle
let started_at = handle
.await
.context("failed to await for batch to be processed")??;
latest_processed_batch += 1;
let mut conn = self.pool.connection_tagged(self.io.name()).await?;
self.io
.mark_l1_batch_as_completed(&mut conn, latest_processed_batch)
.mark_l1_batch_as_completed(&mut conn, latest_processed_batch, started_at)
.await?;
METRICS
.last_processed_batch
Expand Down
8 changes: 7 additions & 1 deletion core/node/vm_runner/src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use std::{collections::HashMap, ops, sync::Arc, time::Duration};
use std::{
collections::HashMap,
ops,
sync::Arc,
time::{Duration, Instant},
};

use async_trait::async_trait;
use rand::{prelude::SliceRandom, Rng};
Expand Down Expand Up @@ -59,6 +64,7 @@ impl VmRunnerIo for Arc<RwLock<IoMock>> {
&self,
_conn: &mut Connection<'_, Core>,
l1_batch_number: L1BatchNumber,
_started_at: Instant,
) -> anyhow::Result<()> {
self.write().await.current = l1_batch_number;
Ok(())
Expand Down

0 comments on commit ab72aa2

Please sign in to comment.