Skip to content

Commit

Permalink
adapt bwip to the new trait method
Browse files Browse the repository at this point in the history
  • Loading branch information
itegulov committed Jul 8, 2024
1 parent a83f829 commit 816bbf9
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 24 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE vm_runner_bwip DROP COLUMN IF EXISTS processing_started_at;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE vm_runner_bwip ADD COLUMN IF NOT EXISTS processing_started_at TIME;
43 changes: 39 additions & 4 deletions core/lib/dal/src/vm_runner_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ impl VmRunnerDal<'_, '_> {
MAX(l1_batch_number) AS "last_processed_l1_batch"
FROM
vm_runner_bwip
WHERE
time_taken IS NOT NULL
"#,
)
.instrument("get_bwip_latest_processed_batch")
Expand Down Expand Up @@ -181,6 +183,8 @@ impl VmRunnerDal<'_, '_> {
COALESCE(MAX(l1_batch_number), $1) + $2 AS "last_ready_batch"
FROM
vm_runner_bwip
WHERE
time_taken IS NOT NULL
)
SELECT
LEAST(last_batch, last_ready_batch) AS "last_ready_batch!"
Expand All @@ -198,23 +202,54 @@ impl VmRunnerDal<'_, '_> {
Ok(L1BatchNumber(row.last_ready_batch as u32))
}

pub async fn mark_bwip_batch_as_completed(
pub async fn mark_bwip_batch_as_processing(
&mut self,
l1_batch_number: L1BatchNumber,
) -> DalResult<()> {
sqlx::query!(
r#"
INSERT INTO
vm_runner_bwip (l1_batch_number, created_at, updated_at)
vm_runner_bwip (l1_batch_number, created_at, updated_at, processing_started_at)
VALUES
($1, NOW(), NOW())
($1, NOW(), NOW(), NOW())
ON CONFLICT (l1_batch_number) DO
UPDATE
SET
updated_at = NOW(),
processing_started_at = NOW()
"#,
i64::from(l1_batch_number.0),
)
.instrument("mark_bwip_batch_as_completed")
.instrument("mark_protective_reads_batch_as_processing")
.report_latency()
.execute(self.storage)
.await?;
Ok(())
}

pub async fn mark_bwip_batch_as_completed(
&mut self,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<()> {
let update_result = sqlx::query!(
r#"
UPDATE vm_runner_bwip
SET
time_taken = NOW() - processing_started_at
WHERE
l1_batch_number = $1
"#,
i64::from(l1_batch_number.0),
)
.instrument("mark_protective_reads_batch_as_completed")
.report_latency()
.execute(self.storage)
.await?;
if update_result.rows_affected() == 0 {
anyhow::bail!(
"Trying to mark an L1 batch as completed while it is not being processed"
);
}
Ok(())
}
}
14 changes: 12 additions & 2 deletions core/node/vm_runner/src/impls/bwip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,26 @@ impl VmRunnerIo for BasicWitnessInputProducerIo {
.await?)
}

async fn mark_l1_batch_as_completed(
async fn mark_l1_batch_as_processing(
&self,
conn: &mut Connection<'_, Core>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<()> {
Ok(conn
.vm_runner_dal()
.mark_bwip_batch_as_completed(l1_batch_number)
.mark_bwip_batch_as_processing(l1_batch_number)
.await?)
}

async fn mark_l1_batch_as_completed(
&self,
conn: &mut Connection<'_, Core>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<()> {
conn.vm_runner_dal()
.mark_bwip_batch_as_completed(l1_batch_number)
.await
}
}

#[derive(Debug)]
Expand Down

0 comments on commit 816bbf9

Please sign in to comment.