Skip to content

Commit

Permalink
feat(vm-runner): make vm runner report time taken (#2369)
Browse files Browse the repository at this point in the history
## What ❔

A long overdue addition of `time_taken` to VM runner

## Why ❔

Observability

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
itegulov authored Jul 8, 2024
1 parent c6c3f96 commit 275a333
Show file tree
Hide file tree
Showing 21 changed files with 206 additions and 47 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE vm_runner_protective_reads DROP COLUMN IF EXISTS processing_started_at;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE vm_runner_protective_reads ADD COLUMN IF NOT EXISTS processing_started_at TIME;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE vm_runner_bwip DROP COLUMN IF EXISTS processing_started_at;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE vm_runner_bwip ADD COLUMN IF NOT EXISTS processing_started_at TIME;
84 changes: 77 additions & 7 deletions core/lib/dal/src/vm_runner_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ impl VmRunnerDal<'_, '_> {
MAX(l1_batch_number) AS "last_processed_l1_batch"
FROM
vm_runner_protective_reads
WHERE
time_taken IS NOT NULL
"#
)
.instrument("get_protective_reads_latest_processed_batch")
Expand Down Expand Up @@ -46,6 +48,8 @@ impl VmRunnerDal<'_, '_> {
COALESCE(MAX(l1_batch_number), $1) + $2 AS "last_ready_batch"
FROM
vm_runner_protective_reads
WHERE
time_taken IS NOT NULL
)
SELECT
LEAST(last_batch, last_ready_batch) AS "last_ready_batch!"
Expand All @@ -63,23 +67,54 @@ impl VmRunnerDal<'_, '_> {
Ok(L1BatchNumber(row.last_ready_batch as u32))
}

pub async fn mark_protective_reads_batch_as_completed(
pub async fn mark_protective_reads_batch_as_processing(
&mut self,
l1_batch_number: L1BatchNumber,
) -> DalResult<()> {
sqlx::query!(
r#"
INSERT INTO
vm_runner_protective_reads (l1_batch_number, created_at, updated_at)
vm_runner_protective_reads (l1_batch_number, created_at, updated_at, processing_started_at)
VALUES
($1, NOW(), NOW())
($1, NOW(), NOW(), NOW())
ON CONFLICT (l1_batch_number) DO
UPDATE
SET
updated_at = NOW(),
processing_started_at = NOW()
"#,
i64::from(l1_batch_number.0),
)
.instrument("mark_protective_reads_batch_as_processing")
.report_latency()
.execute(self.storage)
.await?;
Ok(())
}

pub async fn mark_protective_reads_batch_as_completed(
&mut self,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<()> {
let update_result = sqlx::query!(
r#"
UPDATE vm_runner_protective_reads
SET
time_taken = NOW() - processing_started_at
WHERE
l1_batch_number = $1
"#,
i64::from(l1_batch_number.0),
)
.instrument("mark_protective_reads_batch_as_completed")
.report_latency()
.execute(self.storage)
.await?;
if update_result.rows_affected() == 0 {
anyhow::bail!(
"Trying to mark an L1 batch as completed while it is not being processed"
);
}
Ok(())
}

Expand Down Expand Up @@ -118,6 +153,8 @@ impl VmRunnerDal<'_, '_> {
MAX(l1_batch_number) AS "last_processed_l1_batch"
FROM
vm_runner_bwip
WHERE
time_taken IS NOT NULL
"#,
)
.instrument("get_bwip_latest_processed_batch")
Expand Down Expand Up @@ -146,6 +183,8 @@ impl VmRunnerDal<'_, '_> {
COALESCE(MAX(l1_batch_number), $1) + $2 AS "last_ready_batch"
FROM
vm_runner_bwip
WHERE
time_taken IS NOT NULL
)
SELECT
LEAST(last_batch, last_ready_batch) AS "last_ready_batch!"
Expand All @@ -163,23 +202,54 @@ impl VmRunnerDal<'_, '_> {
Ok(L1BatchNumber(row.last_ready_batch as u32))
}

pub async fn mark_bwip_batch_as_completed(
pub async fn mark_bwip_batch_as_processing(
&mut self,
l1_batch_number: L1BatchNumber,
) -> DalResult<()> {
sqlx::query!(
r#"
INSERT INTO
vm_runner_bwip (l1_batch_number, created_at, updated_at)
vm_runner_bwip (l1_batch_number, created_at, updated_at, processing_started_at)
VALUES
($1, NOW(), NOW())
($1, NOW(), NOW(), NOW())
ON CONFLICT (l1_batch_number) DO
UPDATE
SET
updated_at = NOW(),
processing_started_at = NOW()
"#,
i64::from(l1_batch_number.0),
)
.instrument("mark_protective_reads_batch_as_processing")
.report_latency()
.execute(self.storage)
.await?;
Ok(())
}

pub async fn mark_bwip_batch_as_completed(
&mut self,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<()> {
let update_result = sqlx::query!(
r#"
UPDATE vm_runner_bwip
SET
time_taken = NOW() - processing_started_at
WHERE
l1_batch_number = $1
"#,
i64::from(l1_batch_number.0),
)
.instrument("mark_bwip_batch_as_completed")
.instrument("mark_protective_reads_batch_as_completed")
.report_latency()
.execute(self.storage)
.await?;
if update_result.rows_affected() == 0 {
anyhow::bail!(
"Trying to mark an L1 batch as completed while it is not being processed"
);
}
Ok(())
}
}
14 changes: 14 additions & 0 deletions core/node/metadata_calculator/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,11 @@ async fn test_postgres_backup_recovery(
.insert_mock_l1_batch(batch_without_metadata)
.await
.unwrap();
storage
.vm_runner_dal()
.mark_protective_reads_batch_as_processing(batch_without_metadata.number)
.await
.unwrap();
storage
.vm_runner_dal()
.mark_protective_reads_batch_as_completed(batch_without_metadata.number)
Expand Down Expand Up @@ -575,6 +580,10 @@ async fn test_postgres_backup_recovery(
.insert_mock_l1_batch(batch_header)
.await
.unwrap();
txn.vm_runner_dal()
.mark_protective_reads_batch_as_processing(batch_header.number)
.await
.unwrap();
txn.vm_runner_dal()
.mark_protective_reads_batch_as_completed(batch_header.number)
.await
Expand Down Expand Up @@ -811,6 +820,11 @@ pub(super) async fn extend_db_state_from_l1_batch(
.mark_l2_blocks_as_executed_in_l1_batch(batch_number)
.await
.unwrap();
storage
.vm_runner_dal()
.mark_protective_reads_batch_as_processing(batch_number)
.await
.unwrap();
storage
.vm_runner_dal()
.mark_protective_reads_batch_as_completed(batch_number)
Expand Down
14 changes: 12 additions & 2 deletions core/node/vm_runner/src/impls/bwip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,26 @@ impl VmRunnerIo for BasicWitnessInputProducerIo {
.await?)
}

async fn mark_l1_batch_as_completed(
async fn mark_l1_batch_as_processing(
&self,
conn: &mut Connection<'_, Core>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<()> {
Ok(conn
.vm_runner_dal()
.mark_bwip_batch_as_completed(l1_batch_number)
.mark_bwip_batch_as_processing(l1_batch_number)
.await?)
}

async fn mark_l1_batch_as_completed(
&self,
conn: &mut Connection<'_, Core>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<()> {
conn.vm_runner_dal()
.mark_bwip_batch_as_completed(l1_batch_number)
.await
}
}

#[derive(Debug)]
Expand Down
Loading

0 comments on commit 275a333

Please sign in to comment.