diff --git a/core/lib/dal/.sqlx/query-2482716de397893c52840eb39391ad3349e4d932d3de64b6dade97481cd171a4.json b/core/lib/dal/.sqlx/query-00c0389f4cde049078885cdf05bdb7dbe0bb23c4fc87a78be2d01b77da2ecbd3.json similarity index 64% rename from core/lib/dal/.sqlx/query-2482716de397893c52840eb39391ad3349e4d932d3de64b6dade97481cd171a4.json rename to core/lib/dal/.sqlx/query-00c0389f4cde049078885cdf05bdb7dbe0bb23c4fc87a78be2d01b77da2ecbd3.json index b5c9869d1467..d83713192cb4 100644 --- a/core/lib/dal/.sqlx/query-2482716de397893c52840eb39391ad3349e4d932d3de64b6dade97481cd171a4.json +++ b/core/lib/dal/.sqlx/query-00c0389f4cde049078885cdf05bdb7dbe0bb23c4fc87a78be2d01b77da2ecbd3.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n WITH\n available_batches AS (\n SELECT\n MAX(number) AS \"last_batch\"\n FROM\n l1_batches\n ),\n processed_batches AS (\n SELECT\n COALESCE(MAX(l1_batch_number), $1) + $2 AS \"last_ready_batch\"\n FROM\n vm_runner_bwip\n )\n SELECT\n LEAST(last_batch, last_ready_batch) AS \"last_ready_batch!\"\n FROM\n available_batches\n FULL JOIN processed_batches ON TRUE\n ", + "query": "\n WITH\n available_batches AS (\n SELECT\n MAX(number) AS \"last_batch\"\n FROM\n l1_batches\n ),\n processed_batches AS (\n SELECT\n COALESCE(MAX(l1_batch_number), $1) + $2 AS \"last_ready_batch\"\n FROM\n vm_runner_protective_reads\n WHERE\n time_taken IS NOT NULL\n )\n SELECT\n LEAST(last_batch, last_ready_batch) AS \"last_ready_batch!\"\n FROM\n available_batches\n FULL JOIN processed_batches ON TRUE\n ", "describe": { "columns": [ { @@ -19,5 +19,5 @@ true ] }, - "hash": "2482716de397893c52840eb39391ad3349e4d932d3de64b6dade97481cd171a4" + "hash": "00c0389f4cde049078885cdf05bdb7dbe0bb23c4fc87a78be2d01b77da2ecbd3" } diff --git a/core/lib/dal/.sqlx/query-1bbfac481c402bcb3bb888b84146d922fa1fc9c202072fbc04cae1bbf97195aa.json b/core/lib/dal/.sqlx/query-1bbfac481c402bcb3bb888b84146d922fa1fc9c202072fbc04cae1bbf97195aa.json new file mode 100644 index 000000000000..f24a28ffdc28 --- /dev/null +++ b/core/lib/dal/.sqlx/query-1bbfac481c402bcb3bb888b84146d922fa1fc9c202072fbc04cae1bbf97195aa.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO\n vm_runner_bwip (l1_batch_number, created_at, updated_at, processing_started_at)\n VALUES\n ($1, NOW(), NOW(), NOW())\n ON CONFLICT (l1_batch_number) DO\n UPDATE\n SET\n updated_at = NOW(),\n processing_started_at = NOW()\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "1bbfac481c402bcb3bb888b84146d922fa1fc9c202072fbc04cae1bbf97195aa" +} diff --git a/core/lib/dal/.sqlx/query-a85a15aa2e0be1c1f50d15a8354afcf939e8352e21689baf861b61a666bdc1fd.json b/core/lib/dal/.sqlx/query-1df2ddeea407a09acdabb35d3e0bfd5b1d36459ae4b720fd3ec9047e89f645ec.json similarity index 67% rename from core/lib/dal/.sqlx/query-a85a15aa2e0be1c1f50d15a8354afcf939e8352e21689baf861b61a666bdc1fd.json rename to core/lib/dal/.sqlx/query-1df2ddeea407a09acdabb35d3e0bfd5b1d36459ae4b720fd3ec9047e89f645ec.json index cf1fad78a462..316400f97401 100644 --- a/core/lib/dal/.sqlx/query-a85a15aa2e0be1c1f50d15a8354afcf939e8352e21689baf861b61a666bdc1fd.json +++ b/core/lib/dal/.sqlx/query-1df2ddeea407a09acdabb35d3e0bfd5b1d36459ae4b720fd3ec9047e89f645ec.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n MAX(l1_batch_number) AS \"last_processed_l1_batch\"\n FROM\n vm_runner_bwip\n ", + "query": "\n SELECT\n MAX(l1_batch_number) AS \"last_processed_l1_batch\"\n FROM\n vm_runner_protective_reads\n WHERE\n time_taken IS NOT NULL\n ", "describe": { "columns": [ { @@ -16,5 +16,5 @@ null ] }, - "hash": "a85a15aa2e0be1c1f50d15a8354afcf939e8352e21689baf861b61a666bdc1fd" + "hash": "1df2ddeea407a09acdabb35d3e0bfd5b1d36459ae4b720fd3ec9047e89f645ec" } diff --git a/core/lib/dal/.sqlx/query-3f0966f082e9e7cdfa18c107a1283b7955a058705093d7372726c3fc7ce506ad.json b/core/lib/dal/.sqlx/query-3f0966f082e9e7cdfa18c107a1283b7955a058705093d7372726c3fc7ce506ad.json new file mode 100644 index 000000000000..7b95614bfdff --- /dev/null +++ b/core/lib/dal/.sqlx/query-3f0966f082e9e7cdfa18c107a1283b7955a058705093d7372726c3fc7ce506ad.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE vm_runner_protective_reads\n SET\n time_taken = NOW() - processing_started_at\n WHERE\n l1_batch_number = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "3f0966f082e9e7cdfa18c107a1283b7955a058705093d7372726c3fc7ce506ad" +} diff --git a/core/lib/dal/.sqlx/query-a3f24c7f2298398517db009f7e5373c57d2dc6ec03d84f91a221ab8097e587cc.json b/core/lib/dal/.sqlx/query-a3f24c7f2298398517db009f7e5373c57d2dc6ec03d84f91a221ab8097e587cc.json deleted file mode 100644 index 617fd4e81ea1..000000000000 --- a/core/lib/dal/.sqlx/query-a3f24c7f2298398517db009f7e5373c57d2dc6ec03d84f91a221ab8097e587cc.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO\n vm_runner_bwip (l1_batch_number, created_at, updated_at)\n VALUES\n ($1, NOW(), NOW())\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [] - }, - "hash": "a3f24c7f2298398517db009f7e5373c57d2dc6ec03d84f91a221ab8097e587cc" -} diff --git a/core/lib/dal/.sqlx/query-aab0254e6bf2c109d97e84053cb08f1ce816a56308fb9fe581b8683f76cbbbc3.json b/core/lib/dal/.sqlx/query-aab0254e6bf2c109d97e84053cb08f1ce816a56308fb9fe581b8683f76cbbbc3.json new file mode 100644 index 000000000000..850dfc675743 --- /dev/null +++ b/core/lib/dal/.sqlx/query-aab0254e6bf2c109d97e84053cb08f1ce816a56308fb9fe581b8683f76cbbbc3.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE vm_runner_bwip\n SET\n time_taken = NOW() - processing_started_at\n WHERE\n l1_batch_number = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "aab0254e6bf2c109d97e84053cb08f1ce816a56308fb9fe581b8683f76cbbbc3" +} diff --git a/core/lib/dal/.sqlx/query-5f09cee144c84ea8f69d017f10ca96a8c4d88eb02b621cfa6aeb4e10c6ec0bc4.json b/core/lib/dal/.sqlx/query-c731b37e17334619d42121e2740c312512dfab93fd8f32c94461b7a85e3a410e.json similarity index 69% rename from core/lib/dal/.sqlx/query-5f09cee144c84ea8f69d017f10ca96a8c4d88eb02b621cfa6aeb4e10c6ec0bc4.json rename to core/lib/dal/.sqlx/query-c731b37e17334619d42121e2740c312512dfab93fd8f32c94461b7a85e3a410e.json index 5b793f251353..d32a9867e304 100644 --- a/core/lib/dal/.sqlx/query-5f09cee144c84ea8f69d017f10ca96a8c4d88eb02b621cfa6aeb4e10c6ec0bc4.json +++ b/core/lib/dal/.sqlx/query-c731b37e17334619d42121e2740c312512dfab93fd8f32c94461b7a85e3a410e.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n MAX(l1_batch_number) AS \"last_processed_l1_batch\"\n FROM\n vm_runner_protective_reads\n ", + "query": "\n SELECT\n MAX(l1_batch_number) AS \"last_processed_l1_batch\"\n FROM\n vm_runner_bwip\n WHERE\n time_taken IS NOT NULL\n ", "describe": { "columns": [ { @@ -16,5 +16,5 @@ null ] }, - "hash": "5f09cee144c84ea8f69d017f10ca96a8c4d88eb02b621cfa6aeb4e10c6ec0bc4" + "hash": "c731b37e17334619d42121e2740c312512dfab93fd8f32c94461b7a85e3a410e" } diff --git a/core/lib/dal/.sqlx/query-d3abe74360732659a1a35a176679411ba30ac67080552279d821d66b1b804db3.json b/core/lib/dal/.sqlx/query-d3abe74360732659a1a35a176679411ba30ac67080552279d821d66b1b804db3.json new file mode 100644 index 000000000000..2b5eeec2e638 --- /dev/null +++ b/core/lib/dal/.sqlx/query-d3abe74360732659a1a35a176679411ba30ac67080552279d821d66b1b804db3.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO\n vm_runner_protective_reads (l1_batch_number, created_at, updated_at, processing_started_at)\n VALUES\n ($1, NOW(), NOW(), NOW())\n ON CONFLICT (l1_batch_number) DO\n UPDATE\n SET\n updated_at = NOW(),\n processing_started_at = NOW()\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "d3abe74360732659a1a35a176679411ba30ac67080552279d821d66b1b804db3" +} diff --git a/core/lib/dal/.sqlx/query-0a2138a1cbf21546931867319ccbfe1e597151ecfaeb3cfa6624f2a1978ef23f.json b/core/lib/dal/.sqlx/query-e7d0b7c132b80195dae7cbf50355eb148aa6d1dbd69bf3fe48522101a6ea0bcb.json similarity index 64% rename from core/lib/dal/.sqlx/query-0a2138a1cbf21546931867319ccbfe1e597151ecfaeb3cfa6624f2a1978ef23f.json rename to core/lib/dal/.sqlx/query-e7d0b7c132b80195dae7cbf50355eb148aa6d1dbd69bf3fe48522101a6ea0bcb.json index eaef732751ec..576484cd4206 100644 --- a/core/lib/dal/.sqlx/query-0a2138a1cbf21546931867319ccbfe1e597151ecfaeb3cfa6624f2a1978ef23f.json +++ b/core/lib/dal/.sqlx/query-e7d0b7c132b80195dae7cbf50355eb148aa6d1dbd69bf3fe48522101a6ea0bcb.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n WITH\n available_batches AS (\n SELECT\n MAX(number) AS \"last_batch\"\n FROM\n l1_batches\n ),\n processed_batches AS (\n SELECT\n COALESCE(MAX(l1_batch_number), $1) + $2 AS \"last_ready_batch\"\n FROM\n vm_runner_protective_reads\n )\n SELECT\n LEAST(last_batch, last_ready_batch) AS \"last_ready_batch!\"\n FROM\n available_batches\n FULL JOIN processed_batches ON TRUE\n ", + "query": "\n WITH\n available_batches AS (\n SELECT\n MAX(number) AS \"last_batch\"\n FROM\n l1_batches\n ),\n processed_batches AS (\n SELECT\n COALESCE(MAX(l1_batch_number), $1) + $2 AS \"last_ready_batch\"\n FROM\n vm_runner_bwip\n WHERE\n time_taken IS NOT NULL\n )\n SELECT\n LEAST(last_batch, last_ready_batch) AS \"last_ready_batch!\"\n FROM\n available_batches\n FULL JOIN processed_batches ON TRUE\n ", "describe": { "columns": [ { @@ -19,5 +19,5 @@ true ] }, - "hash": "0a2138a1cbf21546931867319ccbfe1e597151ecfaeb3cfa6624f2a1978ef23f" + "hash": "e7d0b7c132b80195dae7cbf50355eb148aa6d1dbd69bf3fe48522101a6ea0bcb" } diff --git a/core/lib/dal/.sqlx/query-f2f1b6c4f4686b423a4c449c56e1687e91d8367347b3830830a4c76407d60bc5.json b/core/lib/dal/.sqlx/query-f2f1b6c4f4686b423a4c449c56e1687e91d8367347b3830830a4c76407d60bc5.json deleted file mode 100644 index e49cc211cdcd..000000000000 --- a/core/lib/dal/.sqlx/query-f2f1b6c4f4686b423a4c449c56e1687e91d8367347b3830830a4c76407d60bc5.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO\n vm_runner_protective_reads (l1_batch_number, created_at, updated_at)\n VALUES\n ($1, NOW(), NOW())\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [] - }, - "hash": "f2f1b6c4f4686b423a4c449c56e1687e91d8367347b3830830a4c76407d60bc5" -} diff --git a/core/lib/dal/migrations/20240705164305_protective_reads_add_processing_started_at.down.sql b/core/lib/dal/migrations/20240705164305_protective_reads_add_processing_started_at.down.sql new file mode 100644 index 000000000000..3e13998726f7 --- /dev/null +++ b/core/lib/dal/migrations/20240705164305_protective_reads_add_processing_started_at.down.sql @@ -0,0 +1 @@ +ALTER TABLE vm_runner_protective_reads DROP COLUMN IF EXISTS processing_started_at; diff --git a/core/lib/dal/migrations/20240705164305_protective_reads_add_processing_started_at.up.sql b/core/lib/dal/migrations/20240705164305_protective_reads_add_processing_started_at.up.sql new file mode 100644 index 000000000000..e44b16cae441 --- /dev/null +++ b/core/lib/dal/migrations/20240705164305_protective_reads_add_processing_started_at.up.sql @@ -0,0 +1 @@ +ALTER TABLE vm_runner_protective_reads ADD COLUMN IF NOT EXISTS processing_started_at TIME; diff --git a/core/lib/dal/migrations/20240708152005_bwip_add_processing_started_at.down.sql b/core/lib/dal/migrations/20240708152005_bwip_add_processing_started_at.down.sql new file mode 100644 index 000000000000..86bd163acbc4 --- /dev/null +++ b/core/lib/dal/migrations/20240708152005_bwip_add_processing_started_at.down.sql @@ -0,0 +1 @@ +ALTER TABLE vm_runner_bwip DROP COLUMN IF EXISTS processing_started_at; diff --git a/core/lib/dal/migrations/20240708152005_bwip_add_processing_started_at.up.sql b/core/lib/dal/migrations/20240708152005_bwip_add_processing_started_at.up.sql new file mode 100644 index 000000000000..244e53b1b8c6 --- /dev/null +++ b/core/lib/dal/migrations/20240708152005_bwip_add_processing_started_at.up.sql @@ -0,0 +1 @@ +ALTER TABLE vm_runner_bwip ADD COLUMN IF NOT EXISTS processing_started_at TIME; diff --git a/core/lib/dal/src/vm_runner_dal.rs b/core/lib/dal/src/vm_runner_dal.rs index b8a34069752c..64e378926573 100644 --- a/core/lib/dal/src/vm_runner_dal.rs +++ b/core/lib/dal/src/vm_runner_dal.rs @@ -18,6 +18,8 @@ impl VmRunnerDal<'_, '_> { MAX(l1_batch_number) AS "last_processed_l1_batch" FROM vm_runner_protective_reads + WHERE + time_taken IS NOT NULL "# ) .instrument("get_protective_reads_latest_processed_batch") @@ -46,6 +48,8 @@ impl VmRunnerDal<'_, '_> { COALESCE(MAX(l1_batch_number), $1) + $2 AS "last_ready_batch" FROM vm_runner_protective_reads + WHERE + time_taken IS NOT NULL ) SELECT LEAST(last_batch, last_ready_batch) AS "last_ready_batch!" @@ -63,16 +67,42 @@ impl VmRunnerDal<'_, '_> { Ok(L1BatchNumber(row.last_ready_batch as u32)) } - pub async fn mark_protective_reads_batch_as_completed( + pub async fn mark_protective_reads_batch_as_processing( &mut self, l1_batch_number: L1BatchNumber, ) -> DalResult<()> { sqlx::query!( r#" INSERT INTO - vm_runner_protective_reads (l1_batch_number, created_at, updated_at) + vm_runner_protective_reads (l1_batch_number, created_at, updated_at, processing_started_at) VALUES - ($1, NOW(), NOW()) + ($1, NOW(), NOW(), NOW()) + ON CONFLICT (l1_batch_number) DO + UPDATE + SET + updated_at = NOW(), + processing_started_at = NOW() + "#, + i64::from(l1_batch_number.0), + ) + .instrument("mark_protective_reads_batch_as_processing") + .report_latency() + .execute(self.storage) + .await?; + Ok(()) + } + + pub async fn mark_protective_reads_batch_as_completed( + &mut self, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result<()> { + let update_result = sqlx::query!( + r#" + UPDATE vm_runner_protective_reads + SET + time_taken = NOW() - processing_started_at + WHERE + l1_batch_number = $1 "#, i64::from(l1_batch_number.0), ) @@ -80,6 +110,11 @@ impl VmRunnerDal<'_, '_> { .report_latency() .execute(self.storage) .await?; + if update_result.rows_affected() == 0 { + anyhow::bail!( + "Trying to mark an L1 batch as completed while it is not being processed" + ); + } Ok(()) } @@ -118,6 +153,8 @@ impl VmRunnerDal<'_, '_> { MAX(l1_batch_number) AS "last_processed_l1_batch" FROM vm_runner_bwip + WHERE + time_taken IS NOT NULL "#, ) .instrument("get_bwip_latest_processed_batch") @@ -146,6 +183,8 @@ impl VmRunnerDal<'_, '_> { COALESCE(MAX(l1_batch_number), $1) + $2 AS "last_ready_batch" FROM vm_runner_bwip + WHERE + time_taken IS NOT NULL ) SELECT LEAST(last_batch, last_ready_batch) AS "last_ready_batch!" @@ -163,23 +202,54 @@ impl VmRunnerDal<'_, '_> { Ok(L1BatchNumber(row.last_ready_batch as u32)) } - pub async fn mark_bwip_batch_as_completed( + pub async fn mark_bwip_batch_as_processing( &mut self, l1_batch_number: L1BatchNumber, ) -> DalResult<()> { sqlx::query!( r#" INSERT INTO - vm_runner_bwip (l1_batch_number, created_at, updated_at) + vm_runner_bwip (l1_batch_number, created_at, updated_at, processing_started_at) VALUES - ($1, NOW(), NOW()) + ($1, NOW(), NOW(), NOW()) + ON CONFLICT (l1_batch_number) DO + UPDATE + SET + updated_at = NOW(), + processing_started_at = NOW() + "#, + i64::from(l1_batch_number.0), + ) + .instrument("mark_protective_reads_batch_as_processing") + .report_latency() + .execute(self.storage) + .await?; + Ok(()) + } + + pub async fn mark_bwip_batch_as_completed( + &mut self, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result<()> { + let update_result = sqlx::query!( + r#" + UPDATE vm_runner_bwip + SET + time_taken = NOW() - processing_started_at + WHERE + l1_batch_number = $1 "#, i64::from(l1_batch_number.0), ) - .instrument("mark_bwip_batch_as_completed") + .instrument("mark_protective_reads_batch_as_completed") .report_latency() .execute(self.storage) .await?; + if update_result.rows_affected() == 0 { + anyhow::bail!( + "Trying to mark an L1 batch as completed while it is not being processed" + ); + } Ok(()) } } diff --git a/core/node/metadata_calculator/src/tests.rs b/core/node/metadata_calculator/src/tests.rs index cd980682d2fa..b878b0c4a533 100644 --- a/core/node/metadata_calculator/src/tests.rs +++ b/core/node/metadata_calculator/src/tests.rs @@ -544,6 +544,11 @@ async fn test_postgres_backup_recovery( .insert_mock_l1_batch(batch_without_metadata) .await .unwrap(); + storage + .vm_runner_dal() + .mark_protective_reads_batch_as_processing(batch_without_metadata.number) + .await + .unwrap(); storage .vm_runner_dal() .mark_protective_reads_batch_as_completed(batch_without_metadata.number) @@ -575,6 +580,10 @@ async fn test_postgres_backup_recovery( .insert_mock_l1_batch(batch_header) .await .unwrap(); + txn.vm_runner_dal() + .mark_protective_reads_batch_as_processing(batch_header.number) + .await + .unwrap(); txn.vm_runner_dal() .mark_protective_reads_batch_as_completed(batch_header.number) .await @@ -811,6 +820,11 @@ pub(super) async fn extend_db_state_from_l1_batch( .mark_l2_blocks_as_executed_in_l1_batch(batch_number) .await .unwrap(); + storage + .vm_runner_dal() + .mark_protective_reads_batch_as_processing(batch_number) + .await + .unwrap(); storage .vm_runner_dal() .mark_protective_reads_batch_as_completed(batch_number) diff --git a/core/node/vm_runner/src/impls/bwip.rs b/core/node/vm_runner/src/impls/bwip.rs index f3bdf55400e6..c861273c964d 100644 --- a/core/node/vm_runner/src/impls/bwip.rs +++ b/core/node/vm_runner/src/impls/bwip.rs @@ -119,16 +119,26 @@ impl VmRunnerIo for BasicWitnessInputProducerIo { .await?) } - async fn mark_l1_batch_as_completed( + async fn mark_l1_batch_as_processing( &self, conn: &mut Connection<'_, Core>, l1_batch_number: L1BatchNumber, ) -> anyhow::Result<()> { Ok(conn .vm_runner_dal() - .mark_bwip_batch_as_completed(l1_batch_number) + .mark_bwip_batch_as_processing(l1_batch_number) .await?) } + + async fn mark_l1_batch_as_completed( + &self, + conn: &mut Connection<'_, Core>, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result<()> { + conn.vm_runner_dal() + .mark_bwip_batch_as_completed(l1_batch_number) + .await + } } #[derive(Debug)] diff --git a/core/node/vm_runner/src/impls/protective_reads.rs b/core/node/vm_runner/src/impls/protective_reads.rs index f6bac1491806..4748789ae6d9 100644 --- a/core/node/vm_runner/src/impls/protective_reads.rs +++ b/core/node/vm_runner/src/impls/protective_reads.rs @@ -108,16 +108,26 @@ impl VmRunnerIo for ProtectiveReadsIo { .await?) } - async fn mark_l1_batch_as_completed( + async fn mark_l1_batch_as_processing( &self, conn: &mut Connection<'_, Core>, l1_batch_number: L1BatchNumber, ) -> anyhow::Result<()> { Ok(conn .vm_runner_dal() - .mark_protective_reads_batch_as_completed(l1_batch_number) + .mark_protective_reads_batch_as_processing(l1_batch_number) .await?) } + + async fn mark_l1_batch_as_completed( + &self, + conn: &mut Connection<'_, Core>, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result<()> { + conn.vm_runner_dal() + .mark_protective_reads_batch_as_completed(l1_batch_number) + .await + } } #[derive(Debug)] diff --git a/core/node/vm_runner/src/io.rs b/core/node/vm_runner/src/io.rs index e67da0e8235c..2e118f6cfd13 100644 --- a/core/node/vm_runner/src/io.rs +++ b/core/node/vm_runner/src/io.rs @@ -31,6 +31,18 @@ pub trait VmRunnerIo: Debug + Send + Sync + 'static { conn: &mut Connection<'_, Core>, ) -> anyhow::Result; + /// Marks the specified batch as being in progress. Must be called before a batch can be marked + /// as completed. + /// + /// # Errors + /// + /// Propagates DB errors. + async fn mark_l1_batch_as_processing( + &self, + conn: &mut Connection<'_, Core>, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result<()>; + /// Marks the specified batch as the latest completed batch. All earlier batches are considered /// to be completed too. No guarantees about later batches. /// diff --git a/core/node/vm_runner/src/process.rs b/core/node/vm_runner/src/process.rs index b300915cef64..e84ec76d0726 100644 --- a/core/node/vm_runner/src/process.rs +++ b/core/node/vm_runner/src/process.rs @@ -198,6 +198,9 @@ impl VmRunner { .create_handler(next_batch) .await?; + self.io + .mark_l1_batch_as_processing(&mut self.pool.connection().await?, next_batch) + .await?; let handle = tokio::task::spawn(Self::process_batch( batch_executor, batch_data.l2_blocks, diff --git a/core/node/vm_runner/src/tests/mod.rs b/core/node/vm_runner/src/tests/mod.rs index c592122b1e09..50acba610ba5 100644 --- a/core/node/vm_runner/src/tests/mod.rs +++ b/core/node/vm_runner/src/tests/mod.rs @@ -55,6 +55,14 @@ impl VmRunnerIo for Arc> { Ok(io.current + io.max) } + async fn mark_l1_batch_as_processing( + &self, + _conn: &mut Connection<'_, Core>, + _l1_batch_number: L1BatchNumber, + ) -> anyhow::Result<()> { + Ok(()) + } + async fn mark_l1_batch_as_completed( &self, _conn: &mut Connection<'_, Core>,