diff --git a/core/lib/zksync_core_leftovers/src/lib.rs b/core/lib/zksync_core_leftovers/src/lib.rs index 34bc282ad7cd..4f8664ab74dc 100644 --- a/core/lib/zksync_core_leftovers/src/lib.rs +++ b/core/lib/zksync_core_leftovers/src/lib.rs @@ -192,6 +192,9 @@ impl FromStr for Components { "proof_data_handler" => Ok(Components(vec![Component::ProofDataHandler])), "consensus" => Ok(Components(vec![Component::Consensus])), "commitment_generator" => Ok(Components(vec![Component::CommitmentGenerator])), + "vm_runner_protective_reads" => { + Ok(Components(vec![Component::VmRunnerProtectiveReads])) + } other => Err(format!("{} is not a valid component name", other)), } } diff --git a/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs b/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs index 8962c0aeb7cc..1b7867cdef1f 100644 --- a/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs +++ b/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs @@ -37,7 +37,7 @@ impl WiringLayer for ProtectiveReadsWriterLayer { let master_pool = context.get_resource::>().await?; let (protective_reads_writer, tasks) = ProtectiveReadsWriter::new( - master_pool.get_singleton().await?, // TODO: check pool size + master_pool.get_custom(3).await?, self.protective_reads_writer_config.protective_reads_db_path, self.zksync_network_id, self.protective_reads_writer_config diff --git a/core/node/state_keeper/src/updates/mod.rs b/core/node/state_keeper/src/updates/mod.rs index bb33a6f58678..772ee71641a0 100644 --- a/core/node/state_keeper/src/updates/mod.rs +++ b/core/node/state_keeper/src/updates/mod.rs @@ -123,7 +123,7 @@ impl UpdatesManager { ); } - pub(crate) fn finish_batch(&mut self, finished_batch: FinishedL1Batch) { + pub fn finish_batch(&mut self, finished_batch: FinishedL1Batch) { assert!( self.l1_batch.finished.is_none(), "Cannot finish already finished batch" diff --git a/core/node/vm_runner/src/process.rs b/core/node/vm_runner/src/process.rs index 5ff7d7cc0b87..5e51b5e658f7 100644 --- a/core/node/vm_runner/src/process.rs +++ b/core/node/vm_runner/src/process.rs @@ -109,10 +109,11 @@ impl VmRunner { .await .context("VM runner failed to handle L2 block")?; } - batch_executor + let finished_batch = batch_executor .finish_batch() .await .context("failed finishing L1 batch in executor")?; + updates_manager.finish_batch(finished_batch); output_handler .handle_l1_batch(Arc::new(updates_manager)) .await diff --git a/core/node/vm_runner/src/storage.rs b/core/node/vm_runner/src/storage.rs index 3a8552701178..e7a8b147c76f 100644 --- a/core/node/vm_runner/src/storage.rs +++ b/core/node/vm_runner/src/storage.rs @@ -300,10 +300,10 @@ impl StorageSyncTask { if rocksdb_builder.l1_batch_number().await == Some(latest_processed_batch + 1) { // RocksDB is already caught up, we might not need to do anything. // Just need to check that the memory diff is up-to-date in case this is a fresh start. + let last_ready_batch = self.io.last_ready_to_be_loaded_batch(&mut conn).await?; let state = self.state.read().await; - if state - .storage - .contains_key(&self.io.last_ready_to_be_loaded_batch(&mut conn).await?) + if last_ready_batch == latest_processed_batch + || state.storage.contains_key(&last_ready_batch) { // No need to do anything, killing time until last processed batch is updated. drop(conn); diff --git a/etc/env/base/vm_runner.toml b/etc/env/base/vm_runner.toml new file mode 100644 index 000000000000..d9e10e8b357d --- /dev/null +++ b/etc/env/base/vm_runner.toml @@ -0,0 +1,9 @@ +# Configuration for the VM runner crate + +[vm_runner] + +[vm_runner.protective_reads] +# Path to the directory that contains RocksDB with protective reads writer cache. +protective_reads_db_path = "./db/main/protective_reads" +# Amount of batches that can be processed in parallel. +protective_reads_window_size = 3