diff --git a/core/lib/config/src/configs/vm_runner.rs b/core/lib/config/src/configs/vm_runner.rs index 6250830398eb..eb3d4a9d4b24 100644 --- a/core/lib/config/src/configs/vm_runner.rs +++ b/core/lib/config/src/configs/vm_runner.rs @@ -1,16 +1,19 @@ use serde::Deserialize; +use zksync_basic_types::L1BatchNumber; #[derive(Debug, Deserialize, Clone, PartialEq, Default)] pub struct ProtectiveReadsWriterConfig { /// Path to the RocksDB data directory that serves state cache. - #[serde(default = "ProtectiveReadsWriterConfig::default_protective_reads_db_path")] - pub protective_reads_db_path: String, + #[serde(default = "ProtectiveReadsWriterConfig::default_db_path")] + pub db_path: String, /// How many max batches should be processed at the same time. - pub protective_reads_window_size: u32, + pub window_size: u32, + /// All batches before this one (inclusive) are always considered to be processed. + pub first_processed_batch: L1BatchNumber, } impl ProtectiveReadsWriterConfig { - fn default_protective_reads_db_path() -> String { + fn default_db_path() -> String { "./db/protective_reads_writer".to_owned() } } diff --git a/core/lib/dal/.sqlx/query-1f38966f65ce0ed8365b969d0a1f125cf30578457040c14fd6882c73a87fb3d6.json b/core/lib/dal/.sqlx/query-decbf1c9c344253f692d0eae57323edbf31a923e7a45a431267e1bd9fc67b47b.json similarity index 55% rename from core/lib/dal/.sqlx/query-1f38966f65ce0ed8365b969d0a1f125cf30578457040c14fd6882c73a87fb3d6.json rename to core/lib/dal/.sqlx/query-decbf1c9c344253f692d0eae57323edbf31a923e7a45a431267e1bd9fc67b47b.json index 94a17c87888e..b2a1ae0eb956 100644 --- a/core/lib/dal/.sqlx/query-1f38966f65ce0ed8365b969d0a1f125cf30578457040c14fd6882c73a87fb3d6.json +++ b/core/lib/dal/.sqlx/query-decbf1c9c344253f692d0eae57323edbf31a923e7a45a431267e1bd9fc67b47b.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n COALESCE(MAX(l1_batch_number), 0) AS \"last_processed_l1_batch!\"\n FROM\n vm_runner_protective_reads\n ", + "query": "\n SELECT\n COALESCE(MAX(l1_batch_number), $1) AS \"last_processed_l1_batch!\"\n FROM\n vm_runner_protective_reads\n ", "describe": { "columns": [ { @@ -10,11 +10,13 @@ } ], "parameters": { - "Left": [] + "Left": [ + "Int8" + ] }, "nullable": [ null ] }, - "hash": "1f38966f65ce0ed8365b969d0a1f125cf30578457040c14fd6882c73a87fb3d6" + "hash": "decbf1c9c344253f692d0eae57323edbf31a923e7a45a431267e1bd9fc67b47b" } diff --git a/core/lib/dal/src/vm_runner_dal.rs b/core/lib/dal/src/vm_runner_dal.rs index 3693f78a6a7a..39e0f89630ee 100644 --- a/core/lib/dal/src/vm_runner_dal.rs +++ b/core/lib/dal/src/vm_runner_dal.rs @@ -11,14 +11,16 @@ pub struct VmRunnerDal<'c, 'a> { impl VmRunnerDal<'_, '_> { pub async fn get_protective_reads_latest_processed_batch( &mut self, + default_batch: L1BatchNumber, ) -> DalResult { let row = sqlx::query!( r#" SELECT - COALESCE(MAX(l1_batch_number), 0) AS "last_processed_l1_batch!" + COALESCE(MAX(l1_batch_number), $1) AS "last_processed_l1_batch!" FROM vm_runner_protective_reads - "# + "#, + default_batch.0 as i32 ) .instrument("get_protective_reads_latest_processed_batch") .report_latency() diff --git a/core/lib/protobuf_config/src/proto/config/vm_runner.proto b/core/lib/protobuf_config/src/proto/config/vm_runner.proto index a7c829f05869..c0c82d4d415f 100644 --- a/core/lib/protobuf_config/src/proto/config/vm_runner.proto +++ b/core/lib/protobuf_config/src/proto/config/vm_runner.proto @@ -3,6 +3,7 @@ syntax = "proto3"; package zksync.config.vm_runner; message ProtectiveReadsWriter { - optional string protective_reads_db_path = 1; // required; fs path - optional uint64 protective_reads_window_size = 2; // required + optional string db_path = 1; // required; fs path + optional uint64 window_size = 2; // required + optional uint64 first_processed_batch = 3; // required } diff --git a/core/lib/protobuf_config/src/vm_runner.rs b/core/lib/protobuf_config/src/vm_runner.rs index 227e22cd5d22..78bfee750521 100644 --- a/core/lib/protobuf_config/src/vm_runner.rs +++ b/core/lib/protobuf_config/src/vm_runner.rs @@ -1,4 +1,5 @@ use anyhow::Context; +use zksync_basic_types::L1BatchNumber; use zksync_config::configs::{self}; use zksync_protobuf::{required, ProtoRepr}; @@ -9,19 +10,19 @@ impl ProtoRepr for proto::ProtectiveReadsWriter { fn read(&self) -> anyhow::Result { Ok(Self::Type { - protective_reads_db_path: required(&self.protective_reads_db_path) - .context("protective_reads_db_path")? - .clone(), - protective_reads_window_size: *required(&self.protective_reads_window_size) - .context("protective_reads_window_size")? - as u32, + db_path: required(&self.db_path).context("db_path")?.clone(), + window_size: *required(&self.window_size).context("window_size")? as u32, + first_processed_batch: L1BatchNumber( + *required(&self.first_processed_batch).context("first_batch")? as u32, + ), }) } fn build(this: &Self::Type) -> Self { Self { - protective_reads_db_path: Some(this.protective_reads_db_path.clone()), - protective_reads_window_size: Some(this.protective_reads_window_size as u64), + db_path: Some(this.db_path.clone()), + window_size: Some(this.window_size as u64), + first_processed_batch: Some(this.first_processed_batch.0 as u64), } } } diff --git a/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs b/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs index 332793031fa5..a55f8dd7ac85 100644 --- a/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs +++ b/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs @@ -43,20 +43,16 @@ impl WiringLayer for ProtectiveReadsWriterLayer { // One for `ConcurrentOutputHandlerFactoryTask`/`VmRunner` as they need occasional access // to DB for querying last processed batch and last ready to be loaded batch. // - // `self.protective_reads_writer_config` connections for `ProtectiveReadsOutputHandlerFactory` + // `window_size` connections for `ProtectiveReadsOutputHandlerFactory` // as there can be multiple output handlers holding multi-second connections to write // large amount of protective reads. master_pool - .get_custom( - self.protective_reads_writer_config - .protective_reads_window_size - + 2, - ) + .get_custom(self.protective_reads_writer_config.window_size + 2) .await?, - self.protective_reads_writer_config.protective_reads_db_path, + self.protective_reads_writer_config.db_path, self.zksync_network_id, - self.protective_reads_writer_config - .protective_reads_window_size, + self.protective_reads_writer_config.first_processed_batch, + self.protective_reads_writer_config.window_size, ) .await?; diff --git a/core/node/vm_runner/src/impls/protective_reads.rs b/core/node/vm_runner/src/impls/protective_reads.rs index 03a5f1254aa6..e47e54541f5a 100644 --- a/core/node/vm_runner/src/impls/protective_reads.rs +++ b/core/node/vm_runner/src/impls/protective_reads.rs @@ -26,9 +26,13 @@ impl ProtectiveReadsWriter { pool: ConnectionPool, rocksdb_path: String, chain_id: L2ChainId, + first_processed_batch: L1BatchNumber, window_size: u32, ) -> anyhow::Result<(Self, ProtectiveReadsWriterTasks)> { - let io = ProtectiveReadsIo { window_size }; + let io = ProtectiveReadsIo { + first_processed_batch, + window_size, + }; let (loader, loader_task) = VmRunnerStorage::new(pool.clone(), rocksdb_path, io.clone(), chain_id).await?; let output_handler_factory = ProtectiveReadsOutputHandlerFactory { pool: pool.clone() }; @@ -74,6 +78,7 @@ pub struct ProtectiveReadsWriterTasks { #[derive(Debug, Clone)] pub struct ProtectiveReadsIo { + first_processed_batch: L1BatchNumber, window_size: u32, } @@ -89,7 +94,7 @@ impl VmRunnerIo for ProtectiveReadsIo { ) -> anyhow::Result { Ok(conn .vm_runner_dal() - .get_protective_reads_latest_processed_batch() + .get_protective_reads_latest_processed_batch(self.first_processed_batch) .await?) } diff --git a/etc/env/base/vm_runner.toml b/etc/env/base/vm_runner.toml index d9e10e8b357d..c8f259efc3b7 100644 --- a/etc/env/base/vm_runner.toml +++ b/etc/env/base/vm_runner.toml @@ -4,6 +4,8 @@ [vm_runner.protective_reads] # Path to the directory that contains RocksDB with protective reads writer cache. -protective_reads_db_path = "./db/main/protective_reads" +db_path = "./db/main/protective_reads" # Amount of batches that can be processed in parallel. -protective_reads_window_size = 3 +window_size = 3 +# All batches before this one (inclusive) are always considered to be processed. +first_processed_batch = 0 diff --git a/etc/env/file_based/general.yaml b/etc/env/file_based/general.yaml index fdccdf03b5f7..c6b9288a1f12 100644 --- a/etc/env/file_based/general.yaml +++ b/etc/env/file_based/general.yaml @@ -323,5 +323,6 @@ observability: level: debug protective_reads_writer: - protective_reads_db_path: "./db/main/protective_reads" - protective_reads_window_size: 3 + db_path: "./db/main/protective_reads" + window_size: 3 + first_processed_batch: 0