From d63098db3f177aeab2cf57678a67dcb46f3111d0 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Thu, 23 May 2024 00:17:35 +1000 Subject: [PATCH 01/16] basic implementation of protective reads vm runner --- ...ab68c996a7ef2bd24e79163b064c25bffc82a.json | 14 ++ ...f125cf30578457040c14fd6882c73a87fb3d6.json | 20 +++ ...d0584968c93eecf9cf97e6c05a63b88a79503.json | 22 +++ ..._vm_runner_protective_reads_table.down.sql | 1 + ...dd_vm_runner_protective_reads_table.up.sql | 7 + core/lib/dal/src/lib.rs | 8 + core/lib/dal/src/vm_runner_dal.rs | 88 +++++++++++ core/node/vm_runner/src/impls/mod.rs | 3 + .../vm_runner/src/impls/protective_reads.rs | 149 ++++++++++++++++++ core/node/vm_runner/src/lib.rs | 2 + 10 files changed, 314 insertions(+) create mode 100644 core/lib/dal/.sqlx/query-08688d7e6bcfaad2bf8ff58c447ab68c996a7ef2bd24e79163b064c25bffc82a.json create mode 100644 core/lib/dal/.sqlx/query-1f38966f65ce0ed8365b969d0a1f125cf30578457040c14fd6882c73a87fb3d6.json create mode 100644 core/lib/dal/.sqlx/query-5c4d3e33075b6f9d53431369e03d0584968c93eecf9cf97e6c05a63b88a79503.json create mode 100644 core/lib/dal/migrations/20240522215934_add_vm_runner_protective_reads_table.down.sql create mode 100644 core/lib/dal/migrations/20240522215934_add_vm_runner_protective_reads_table.up.sql create mode 100644 core/lib/dal/src/vm_runner_dal.rs create mode 100644 core/node/vm_runner/src/impls/mod.rs create mode 100644 core/node/vm_runner/src/impls/protective_reads.rs diff --git a/core/lib/dal/.sqlx/query-08688d7e6bcfaad2bf8ff58c447ab68c996a7ef2bd24e79163b064c25bffc82a.json b/core/lib/dal/.sqlx/query-08688d7e6bcfaad2bf8ff58c447ab68c996a7ef2bd24e79163b064c25bffc82a.json new file mode 100644 index 000000000000..ecbd8cffde5f --- /dev/null +++ b/core/lib/dal/.sqlx/query-08688d7e6bcfaad2bf8ff58c447ab68c996a7ef2bd24e79163b064c25bffc82a.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO\n vm_runner_protective_reads (\n l1_batch_number,\n created_at,\n updated_at\n )\n VALUES\n (\n $1,\n NOW(),\n NOW()\n )\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "08688d7e6bcfaad2bf8ff58c447ab68c996a7ef2bd24e79163b064c25bffc82a" +} diff --git a/core/lib/dal/.sqlx/query-1f38966f65ce0ed8365b969d0a1f125cf30578457040c14fd6882c73a87fb3d6.json b/core/lib/dal/.sqlx/query-1f38966f65ce0ed8365b969d0a1f125cf30578457040c14fd6882c73a87fb3d6.json new file mode 100644 index 000000000000..94a17c87888e --- /dev/null +++ b/core/lib/dal/.sqlx/query-1f38966f65ce0ed8365b969d0a1f125cf30578457040c14fd6882c73a87fb3d6.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n COALESCE(MAX(l1_batch_number), 0) AS \"last_processed_l1_batch!\"\n FROM\n vm_runner_protective_reads\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "last_processed_l1_batch!", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + null + ] + }, + "hash": "1f38966f65ce0ed8365b969d0a1f125cf30578457040c14fd6882c73a87fb3d6" +} diff --git a/core/lib/dal/.sqlx/query-5c4d3e33075b6f9d53431369e03d0584968c93eecf9cf97e6c05a63b88a79503.json b/core/lib/dal/.sqlx/query-5c4d3e33075b6f9d53431369e03d0584968c93eecf9cf97e6c05a63b88a79503.json new file mode 100644 index 000000000000..a1d16f629df3 --- /dev/null +++ b/core/lib/dal/.sqlx/query-5c4d3e33075b6f9d53431369e03d0584968c93eecf9cf97e6c05a63b88a79503.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH\n available_batches AS (\n SELECT MAX(number) as \"last_batch\"\n FROM l1_batches\n ),\n processed_batches AS (\n SELECT COALESCE(MAX(l1_batch_number), 0) + $1 AS \"last_ready_batch\"\n FROM vm_runner_protective_reads\n )\n SELECT\n LEAST(last_batch, last_ready_batch) AS \"last_ready_batch!\"\n FROM\n available_batches\n FULL JOIN processed_batches ON true\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "last_ready_batch!", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + true + ] + }, + "hash": "5c4d3e33075b6f9d53431369e03d0584968c93eecf9cf97e6c05a63b88a79503" +} diff --git a/core/lib/dal/migrations/20240522215934_add_vm_runner_protective_reads_table.down.sql b/core/lib/dal/migrations/20240522215934_add_vm_runner_protective_reads_table.down.sql new file mode 100644 index 000000000000..773b22aa4fa1 --- /dev/null +++ b/core/lib/dal/migrations/20240522215934_add_vm_runner_protective_reads_table.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS vm_runner_protective_reads; diff --git a/core/lib/dal/migrations/20240522215934_add_vm_runner_protective_reads_table.up.sql b/core/lib/dal/migrations/20240522215934_add_vm_runner_protective_reads_table.up.sql new file mode 100644 index 000000000000..170569508281 --- /dev/null +++ b/core/lib/dal/migrations/20240522215934_add_vm_runner_protective_reads_table.up.sql @@ -0,0 +1,7 @@ +CREATE TABLE IF NOT EXISTS vm_runner_protective_reads +( + l1_batch_number BIGINT NOT NULL PRIMARY KEY, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + time_taken TIME +); diff --git a/core/lib/dal/src/lib.rs b/core/lib/dal/src/lib.rs index f9c585758c4d..b0c4601cd9f6 100644 --- a/core/lib/dal/src/lib.rs +++ b/core/lib/dal/src/lib.rs @@ -11,6 +11,7 @@ pub use zksync_db_connection::{ error::{DalError, DalResult}, }; +use crate::vm_runner_dal::VmRunnerDal; use crate::{ blocks_dal::BlocksDal, blocks_web3_dal::BlocksWeb3Dal, consensus_dal::ConsensusDal, contract_verification_dal::ContractVerificationDal, eth_sender_dal::EthSenderDal, @@ -55,6 +56,7 @@ pub mod tokens_dal; pub mod tokens_web3_dal; pub mod transactions_dal; pub mod transactions_web3_dal; +pub mod vm_runner_dal; #[cfg(test)] mod tests; @@ -119,6 +121,8 @@ where fn snapshot_recovery_dal(&mut self) -> SnapshotRecoveryDal<'_, 'a>; fn pruning_dal(&mut self) -> PruningDal<'_, 'a>; + + fn vm_runner_dal(&mut self) -> VmRunnerDal<'_, 'a>; } #[derive(Clone, Debug)] @@ -229,4 +233,8 @@ impl<'a> CoreDal<'a> for Connection<'a, Core> { fn pruning_dal(&mut self) -> PruningDal<'_, 'a> { PruningDal { storage: self } } + + fn vm_runner_dal(&mut self) -> VmRunnerDal<'_, 'a> { + VmRunnerDal { storage: self } + } } diff --git a/core/lib/dal/src/vm_runner_dal.rs b/core/lib/dal/src/vm_runner_dal.rs new file mode 100644 index 000000000000..8c19a674894a --- /dev/null +++ b/core/lib/dal/src/vm_runner_dal.rs @@ -0,0 +1,88 @@ +use crate::Core; +use zksync_db_connection::connection::Connection; +use zksync_db_connection::error::DalResult; +use zksync_db_connection::instrument::InstrumentExt; +use zksync_types::L1BatchNumber; + +#[derive(Debug)] +pub struct VmRunnerDal<'c, 'a> { + pub(crate) storage: &'c mut Connection<'a, Core>, +} + +impl VmRunnerDal<'_, '_> { + pub async fn get_protective_reads_latest_processed_batch( + &mut self, + ) -> DalResult { + let row = sqlx::query!( + r#" + SELECT + COALESCE(MAX(l1_batch_number), 0) AS "last_processed_l1_batch!" + FROM + vm_runner_protective_reads + "# + ) + .instrument("get_protective_reads_latest_processed_batch") + .report_latency() + .fetch_one(self.storage) + .await?; + Ok(L1BatchNumber(row.last_processed_l1_batch as u32)) + } + + pub async fn get_protective_reads_last_ready_batch( + &mut self, + window_size: u32, + ) -> DalResult { + let row = sqlx::query!( + r#" + WITH + available_batches AS ( + SELECT MAX(number) as "last_batch" + FROM l1_batches + ), + processed_batches AS ( + SELECT COALESCE(MAX(l1_batch_number), 0) + $1 AS "last_ready_batch" + FROM vm_runner_protective_reads + ) + SELECT + LEAST(last_batch, last_ready_batch) AS "last_ready_batch!" + FROM + available_batches + FULL JOIN processed_batches ON true + "#, + window_size as i32 + ) + .instrument("get_protective_reads_last_ready_batch") + .report_latency() + .fetch_one(self.storage) + .await?; + Ok(L1BatchNumber(row.last_ready_batch as u32)) + } + + pub async fn mark_protective_reads_batch_as_completed( + &mut self, + l1_batch_number: L1BatchNumber, + ) -> DalResult<()> { + sqlx::query!( + r#" + INSERT INTO + vm_runner_protective_reads ( + l1_batch_number, + created_at, + updated_at + ) + VALUES + ( + $1, + NOW(), + NOW() + ) + "#, + i64::from(l1_batch_number.0), + ) + .instrument("mark_protective_reads_batch_as_completed") + .report_latency() + .execute(self.storage) + .await?; + Ok(()) + } +} diff --git a/core/node/vm_runner/src/impls/mod.rs b/core/node/vm_runner/src/impls/mod.rs new file mode 100644 index 000000000000..70d01f6932ef --- /dev/null +++ b/core/node/vm_runner/src/impls/mod.rs @@ -0,0 +1,3 @@ +mod protective_reads; + +pub use protective_reads::{ProtectiveReadsWriter, ProtectiveReadsWriterTasks}; diff --git a/core/node/vm_runner/src/impls/protective_reads.rs b/core/node/vm_runner/src/impls/protective_reads.rs new file mode 100644 index 000000000000..b83b73d63e08 --- /dev/null +++ b/core/node/vm_runner/src/impls/protective_reads.rs @@ -0,0 +1,149 @@ +use crate::storage::StorageSyncTask; +use crate::{ + ConcurrentOutputHandlerFactory, ConcurrentOutputHandlerFactoryTask, OutputHandlerFactory, + VmRunner, VmRunnerIo, VmRunnerStorage, +}; +use anyhow::Context; +use async_trait::async_trait; +use std::sync::Arc; +use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; +use zksync_state_keeper::{MainBatchExecutor, StateKeeperOutputHandler, UpdatesManager}; +use zksync_types::{L1BatchNumber, L2ChainId}; + +pub struct ProtectiveReadsWriter { + vm_runner: VmRunner, +} + +impl ProtectiveReadsWriter { + pub async fn new( + pool: ConnectionPool, + rocksdb_path: String, + chain_id: L2ChainId, + window_size: u32, + ) -> anyhow::Result<(Self, ProtectiveReadsWriterTasks)> { + let io = ProtectiveReadsIo { window_size }; + let (loader, loader_task) = + VmRunnerStorage::new(pool.clone(), rocksdb_path, io.clone(), chain_id).await?; + let output_handler_factory = ProtectiveReadsOutputHandlerFactory { pool: pool.clone() }; + let (output_handler_factory, output_handler_factory_task) = + ConcurrentOutputHandlerFactory::new(pool.clone(), io.clone(), output_handler_factory); + let batch_processor = MainBatchExecutor::new(false, false); + let vm_runner = VmRunner::new( + pool, + Box::new(io), + Arc::new(loader), + Box::new(output_handler_factory), + Box::new(batch_processor), + ); + Ok(( + Self { vm_runner }, + ProtectiveReadsWriterTasks { + loader_task, + output_handler_factory_task, + }, + )) + } +} + +pub struct ProtectiveReadsWriterTasks { + pub loader_task: StorageSyncTask, + pub output_handler_factory_task: ConcurrentOutputHandlerFactoryTask, +} + +#[derive(Debug, Clone)] +pub struct ProtectiveReadsIo { + window_size: u32, +} + +#[async_trait] +impl VmRunnerIo for ProtectiveReadsIo { + fn name(&self) -> &'static str { + "protective_reads_writer" + } + + async fn latest_processed_batch( + &self, + conn: &mut Connection<'_, Core>, + ) -> anyhow::Result { + Ok(conn + .vm_runner_dal() + .get_protective_reads_latest_processed_batch() + .await?) + } + + async fn last_ready_to_be_loaded_batch( + &self, + conn: &mut Connection<'_, Core>, + ) -> anyhow::Result { + Ok(conn + .vm_runner_dal() + .get_protective_reads_last_ready_batch(self.window_size) + .await?) + } + + async fn mark_l1_batch_as_completed( + &self, + conn: &mut Connection<'_, Core>, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result<()> { + Ok(conn + .vm_runner_dal() + .mark_protective_reads_batch_as_completed(l1_batch_number) + .await?) + } +} + +#[derive(Debug)] +struct ProtectiveReadsOutputHandler { + pool: ConnectionPool, +} + +#[async_trait] +impl StateKeeperOutputHandler for ProtectiveReadsOutputHandler { + async fn handle_l2_block(&mut self, _updates_manager: &UpdatesManager) -> anyhow::Result<()> { + Ok(()) + } + + async fn handle_l1_batch( + &mut self, + updates_manager: Arc, + ) -> anyhow::Result<()> { + let finished_batch = updates_manager + .l1_batch + .finished + .as_ref() + .context("L1 batch is not actually finished")?; + let (_, protective_reads): (Vec<_>, Vec<_>) = finished_batch + .final_execution_state + .deduplicated_storage_log_queries + .iter() + .partition(|log_query| log_query.rw_flag); + + let mut connection = self + .pool + .connection_tagged("protective_reads_writer") + .await?; + connection + .storage_logs_dedup_dal() + .insert_protective_reads(updates_manager.l1_batch.number, &protective_reads) + .await?; + Ok(()) + } +} + +#[derive(Debug)] +struct ProtectiveReadsOutputHandlerFactory { + pool: ConnectionPool, +} + +#[async_trait] +impl OutputHandlerFactory for ProtectiveReadsOutputHandlerFactory { + async fn create_handler( + &mut self, + _l1_batch_number: L1BatchNumber, + ) -> anyhow::Result> { + Ok(Box::new(ProtectiveReadsOutputHandler { + pool: self.pool.clone(), + })) + } +} diff --git a/core/node/vm_runner/src/lib.rs b/core/node/vm_runner/src/lib.rs index 4664d4eb8e11..00af898f2253 100644 --- a/core/node/vm_runner/src/lib.rs +++ b/core/node/vm_runner/src/lib.rs @@ -3,6 +3,7 @@ #![warn(missing_debug_implementations, missing_docs)] +mod impls; mod io; mod output_handler; mod process; @@ -11,6 +12,7 @@ mod storage; #[cfg(test)] mod tests; +pub use impls::{ProtectiveReadsWriter, ProtectiveReadsWriterTasks}; pub use io::VmRunnerIo; pub use output_handler::{ ConcurrentOutputHandlerFactory, ConcurrentOutputHandlerFactoryTask, OutputHandlerFactory, From 86929d6f9f07582355bb08dafe8596df8acaa8ff Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Thu, 23 May 2024 00:21:28 +1000 Subject: [PATCH 02/16] fmt --- core/lib/dal/src/lib.rs | 3 +- core/lib/dal/src/vm_runner_dal.rs | 33 ++++++++----------- .../vm_runner/src/impls/protective_reads.rs | 13 ++++---- 3 files changed, 22 insertions(+), 27 deletions(-) diff --git a/core/lib/dal/src/lib.rs b/core/lib/dal/src/lib.rs index b0c4601cd9f6..8b048a035121 100644 --- a/core/lib/dal/src/lib.rs +++ b/core/lib/dal/src/lib.rs @@ -11,7 +11,6 @@ pub use zksync_db_connection::{ error::{DalError, DalResult}, }; -use crate::vm_runner_dal::VmRunnerDal; use crate::{ blocks_dal::BlocksDal, blocks_web3_dal::BlocksWeb3Dal, consensus_dal::ConsensusDal, contract_verification_dal::ContractVerificationDal, eth_sender_dal::EthSenderDal, @@ -24,7 +23,7 @@ use crate::{ sync_dal::SyncDal, system_dal::SystemDal, tee_verifier_input_producer_dal::TeeVerifierInputProducerDal, tokens_dal::TokensDal, tokens_web3_dal::TokensWeb3Dal, transactions_dal::TransactionsDal, - transactions_web3_dal::TransactionsWeb3Dal, + transactions_web3_dal::TransactionsWeb3Dal, vm_runner_dal::VmRunnerDal, }; pub mod blocks_dal; diff --git a/core/lib/dal/src/vm_runner_dal.rs b/core/lib/dal/src/vm_runner_dal.rs index 8c19a674894a..3693f78a6a7a 100644 --- a/core/lib/dal/src/vm_runner_dal.rs +++ b/core/lib/dal/src/vm_runner_dal.rs @@ -1,9 +1,8 @@ -use crate::Core; -use zksync_db_connection::connection::Connection; -use zksync_db_connection::error::DalResult; -use zksync_db_connection::instrument::InstrumentExt; +use zksync_db_connection::{connection::Connection, error::DalResult, instrument::InstrumentExt}; use zksync_types::L1BatchNumber; +use crate::Core; + #[derive(Debug)] pub struct VmRunnerDal<'c, 'a> { pub(crate) storage: &'c mut Connection<'a, Core>, @@ -36,18 +35,22 @@ impl VmRunnerDal<'_, '_> { r#" WITH available_batches AS ( - SELECT MAX(number) as "last_batch" - FROM l1_batches + SELECT + MAX(number) AS "last_batch" + FROM + l1_batches ), processed_batches AS ( - SELECT COALESCE(MAX(l1_batch_number), 0) + $1 AS "last_ready_batch" - FROM vm_runner_protective_reads + SELECT + COALESCE(MAX(l1_batch_number), 0) + $1 AS "last_ready_batch" + FROM + vm_runner_protective_reads ) SELECT LEAST(last_batch, last_ready_batch) AS "last_ready_batch!" FROM available_batches - FULL JOIN processed_batches ON true + FULL JOIN processed_batches ON TRUE "#, window_size as i32 ) @@ -65,17 +68,9 @@ impl VmRunnerDal<'_, '_> { sqlx::query!( r#" INSERT INTO - vm_runner_protective_reads ( - l1_batch_number, - created_at, - updated_at - ) + vm_runner_protective_reads (l1_batch_number, created_at, updated_at) VALUES - ( - $1, - NOW(), - NOW() - ) + ($1, NOW(), NOW()) "#, i64::from(l1_batch_number.0), ) diff --git a/core/node/vm_runner/src/impls/protective_reads.rs b/core/node/vm_runner/src/impls/protective_reads.rs index b83b73d63e08..f8e3a24dab64 100644 --- a/core/node/vm_runner/src/impls/protective_reads.rs +++ b/core/node/vm_runner/src/impls/protective_reads.rs @@ -1,15 +1,16 @@ -use crate::storage::StorageSyncTask; -use crate::{ - ConcurrentOutputHandlerFactory, ConcurrentOutputHandlerFactoryTask, OutputHandlerFactory, - VmRunner, VmRunnerIo, VmRunnerStorage, -}; +use std::sync::Arc; + use anyhow::Context; use async_trait::async_trait; -use std::sync::Arc; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; use zksync_state_keeper::{MainBatchExecutor, StateKeeperOutputHandler, UpdatesManager}; use zksync_types::{L1BatchNumber, L2ChainId}; +use crate::{ + storage::StorageSyncTask, ConcurrentOutputHandlerFactory, ConcurrentOutputHandlerFactoryTask, + OutputHandlerFactory, VmRunner, VmRunnerIo, VmRunnerStorage, +}; + pub struct ProtectiveReadsWriter { vm_runner: VmRunner, } From f489cc990202aa48a40a9bbf10aa1ee3761167cc Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Thu, 23 May 2024 14:49:12 +1000 Subject: [PATCH 03/16] db prepare --- ...ab68c996a7ef2bd24e79163b064c25bffc82a.json | 14 ------------ ...d0584968c93eecf9cf97e6c05a63b88a79503.json | 22 ------------------- ...5d03a811221d4ddf26e2e0ddc34147a0d8e23.json | 22 +++++++++++++++++++ ...1687e91d8367347b3830830a4c76407d60bc5.json | 14 ++++++++++++ 4 files changed, 36 insertions(+), 36 deletions(-) delete mode 100644 core/lib/dal/.sqlx/query-08688d7e6bcfaad2bf8ff58c447ab68c996a7ef2bd24e79163b064c25bffc82a.json delete mode 100644 core/lib/dal/.sqlx/query-5c4d3e33075b6f9d53431369e03d0584968c93eecf9cf97e6c05a63b88a79503.json create mode 100644 core/lib/dal/.sqlx/query-c31632143b459ea6684908ce7a15d03a811221d4ddf26e2e0ddc34147a0d8e23.json create mode 100644 core/lib/dal/.sqlx/query-f2f1b6c4f4686b423a4c449c56e1687e91d8367347b3830830a4c76407d60bc5.json diff --git a/core/lib/dal/.sqlx/query-08688d7e6bcfaad2bf8ff58c447ab68c996a7ef2bd24e79163b064c25bffc82a.json b/core/lib/dal/.sqlx/query-08688d7e6bcfaad2bf8ff58c447ab68c996a7ef2bd24e79163b064c25bffc82a.json deleted file mode 100644 index ecbd8cffde5f..000000000000 --- a/core/lib/dal/.sqlx/query-08688d7e6bcfaad2bf8ff58c447ab68c996a7ef2bd24e79163b064c25bffc82a.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO\n vm_runner_protective_reads (\n l1_batch_number,\n created_at,\n updated_at\n )\n VALUES\n (\n $1,\n NOW(),\n NOW()\n )\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [] - }, - "hash": "08688d7e6bcfaad2bf8ff58c447ab68c996a7ef2bd24e79163b064c25bffc82a" -} diff --git a/core/lib/dal/.sqlx/query-5c4d3e33075b6f9d53431369e03d0584968c93eecf9cf97e6c05a63b88a79503.json b/core/lib/dal/.sqlx/query-5c4d3e33075b6f9d53431369e03d0584968c93eecf9cf97e6c05a63b88a79503.json deleted file mode 100644 index a1d16f629df3..000000000000 --- a/core/lib/dal/.sqlx/query-5c4d3e33075b6f9d53431369e03d0584968c93eecf9cf97e6c05a63b88a79503.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n WITH\n available_batches AS (\n SELECT MAX(number) as \"last_batch\"\n FROM l1_batches\n ),\n processed_batches AS (\n SELECT COALESCE(MAX(l1_batch_number), 0) + $1 AS \"last_ready_batch\"\n FROM vm_runner_protective_reads\n )\n SELECT\n LEAST(last_batch, last_ready_batch) AS \"last_ready_batch!\"\n FROM\n available_batches\n FULL JOIN processed_batches ON true\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "last_ready_batch!", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [ - true - ] - }, - "hash": "5c4d3e33075b6f9d53431369e03d0584968c93eecf9cf97e6c05a63b88a79503" -} diff --git a/core/lib/dal/.sqlx/query-c31632143b459ea6684908ce7a15d03a811221d4ddf26e2e0ddc34147a0d8e23.json b/core/lib/dal/.sqlx/query-c31632143b459ea6684908ce7a15d03a811221d4ddf26e2e0ddc34147a0d8e23.json new file mode 100644 index 000000000000..dcbfb1d0bd24 --- /dev/null +++ b/core/lib/dal/.sqlx/query-c31632143b459ea6684908ce7a15d03a811221d4ddf26e2e0ddc34147a0d8e23.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH\n available_batches AS (\n SELECT\n MAX(number) AS \"last_batch\"\n FROM\n l1_batches\n ),\n processed_batches AS (\n SELECT\n COALESCE(MAX(l1_batch_number), 0) + $1 AS \"last_ready_batch\"\n FROM\n vm_runner_protective_reads\n )\n SELECT\n LEAST(last_batch, last_ready_batch) AS \"last_ready_batch!\"\n FROM\n available_batches\n FULL JOIN processed_batches ON TRUE\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "last_ready_batch!", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + true + ] + }, + "hash": "c31632143b459ea6684908ce7a15d03a811221d4ddf26e2e0ddc34147a0d8e23" +} diff --git a/core/lib/dal/.sqlx/query-f2f1b6c4f4686b423a4c449c56e1687e91d8367347b3830830a4c76407d60bc5.json b/core/lib/dal/.sqlx/query-f2f1b6c4f4686b423a4c449c56e1687e91d8367347b3830830a4c76407d60bc5.json new file mode 100644 index 000000000000..e49cc211cdcd --- /dev/null +++ b/core/lib/dal/.sqlx/query-f2f1b6c4f4686b423a4c449c56e1687e91d8367347b3830830a4c76407d60bc5.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO\n vm_runner_protective_reads (l1_batch_number, created_at, updated_at)\n VALUES\n ($1, NOW(), NOW())\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "f2f1b6c4f4686b423a4c449c56e1687e91d8367347b3830830a4c76407d60bc5" +} From 026ee0968873ec734dbe9d40fdb772fba64b6d41 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Thu, 23 May 2024 15:04:21 +1000 Subject: [PATCH 04/16] lint --- core/node/vm_runner/src/impls/protective_reads.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/node/vm_runner/src/impls/protective_reads.rs b/core/node/vm_runner/src/impls/protective_reads.rs index f8e3a24dab64..c07c246ccd34 100644 --- a/core/node/vm_runner/src/impls/protective_reads.rs +++ b/core/node/vm_runner/src/impls/protective_reads.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use anyhow::Context; use async_trait::async_trait; +use tokio::sync::watch; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; use zksync_state_keeper::{MainBatchExecutor, StateKeeperOutputHandler, UpdatesManager}; use zksync_types::{L1BatchNumber, L2ChainId}; @@ -11,6 +12,7 @@ use crate::{ OutputHandlerFactory, VmRunner, VmRunnerIo, VmRunnerStorage, }; +#[derive(Debug)] pub struct ProtectiveReadsWriter { vm_runner: VmRunner, } @@ -44,8 +46,13 @@ impl ProtectiveReadsWriter { }, )) } + + pub async fn run(self, stop_receiver: &watch::Receiver) -> anyhow::Result<()> { + self.vm_runner.run(stop_receiver).await + } } +#[derive(Debug)] pub struct ProtectiveReadsWriterTasks { pub loader_task: StorageSyncTask, pub output_handler_factory_task: ConcurrentOutputHandlerFactoryTask, From 06e183940492d52ff2acd22f1fc6d5003de4ad02 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Mon, 27 May 2024 20:03:31 +1000 Subject: [PATCH 05/16] add vm runner to node framework --- Cargo.lock | 1 + core/lib/config/src/configs/mod.rs | 1 + core/lib/config/src/configs/vm_runner.rs | 16 +++++ core/node/node_framework/Cargo.toml | 1 + .../src/implementations/layers/mod.rs | 1 + .../implementations/layers/vm_runner/mod.rs | 31 +++++++++ .../layers/vm_runner/protective_reads.rs | 67 +++++++++++++++++++ core/node/vm_runner/src/lib.rs | 2 +- core/node/vm_runner/src/output_handler.rs | 4 ++ core/node/vm_runner/src/storage.rs | 4 ++ 10 files changed, 127 insertions(+), 1 deletion(-) create mode 100644 core/lib/config/src/configs/vm_runner.rs create mode 100644 core/node/node_framework/src/implementations/layers/vm_runner/mod.rs create mode 100644 core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs diff --git a/Cargo.lock b/Cargo.lock index 58f83030c7c3..9a0cd3c4e0a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8955,6 +8955,7 @@ dependencies = [ "zksync_tee_verifier_input_producer", "zksync_types", "zksync_utils", + "zksync_vm_runner", "zksync_web3_decl", ] diff --git a/core/lib/config/src/configs/mod.rs b/core/lib/config/src/configs/mod.rs index 925c30976f97..2709b84c2f5c 100644 --- a/core/lib/config/src/configs/mod.rs +++ b/core/lib/config/src/configs/mod.rs @@ -46,6 +46,7 @@ pub mod proof_data_handler; pub mod secrets; pub mod snapshots_creator; pub mod utils; +pub mod vm_runner; pub mod wallets; const BYTES_IN_MEGABYTE: usize = 1_024 * 1_024; diff --git a/core/lib/config/src/configs/vm_runner.rs b/core/lib/config/src/configs/vm_runner.rs new file mode 100644 index 000000000000..2c79b0a93bc8 --- /dev/null +++ b/core/lib/config/src/configs/vm_runner.rs @@ -0,0 +1,16 @@ +use serde::Deserialize; + +#[derive(Debug, Deserialize, Clone, PartialEq, Default)] +pub struct VmRunnerConfig { + /// Path to the RocksDB data directory that serves state cache. + #[serde(default = "VmRunnerConfig::default_protective_reads_db_path")] + pub protective_reads_db_path: String, + /// How many max batches should be processed at the same time. + pub protective_reads_window_size: u32, +} + +impl VmRunnerConfig { + fn default_protective_reads_db_path() -> String { + "./db/protective_reads".to_owned() + } +} diff --git a/core/node/node_framework/Cargo.toml b/core/node/node_framework/Cargo.toml index f95500a3836d..4e7bcceb5dde 100644 --- a/core/node/node_framework/Cargo.toml +++ b/core/node/node_framework/Cargo.toml @@ -42,6 +42,7 @@ zksync_node_consensus.workspace = true zksync_contract_verification_server.workspace = true zksync_tee_verifier_input_producer.workspace = true zksync_queued_job_processor.workspace = true +zksync_vm_runner.workspace = true tracing.workspace = true thiserror.workspace = true diff --git a/core/node/node_framework/src/implementations/layers/mod.rs b/core/node/node_framework/src/implementations/layers/mod.rs index cee9a0b6906d..66ecdf18c3a5 100644 --- a/core/node/node_framework/src/implementations/layers/mod.rs +++ b/core/node/node_framework/src/implementations/layers/mod.rs @@ -18,4 +18,5 @@ pub mod query_eth_client; pub mod sigint; pub mod state_keeper; pub mod tee_verifier_input_producer; +pub mod vm_runner; pub mod web3_api; diff --git a/core/node/node_framework/src/implementations/layers/vm_runner/mod.rs b/core/node/node_framework/src/implementations/layers/vm_runner/mod.rs new file mode 100644 index 000000000000..adb752ff3179 --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/vm_runner/mod.rs @@ -0,0 +1,31 @@ +use crate::service::StopReceiver; +use crate::task::{Task, TaskId}; +use zksync_vm_runner::{ConcurrentOutputHandlerFactoryTask, StorageSyncTask, VmRunnerIo}; + +pub mod protective_reads; + +#[async_trait::async_trait] +impl Task for StorageSyncTask { + fn id(&self) -> TaskId { + TaskId(format!("vm_runner/{}/storage_sync", self.io().name())) + } + + async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { + StorageSyncTask::run(*self, stop_receiver.0.clone()).await?; + stop_receiver.0.changed().await?; + Ok(()) + } +} + +#[async_trait::async_trait] +impl Task for ConcurrentOutputHandlerFactoryTask { + fn id(&self) -> TaskId { + TaskId(format!("vm_runner/{}/output_handler", self.io().name())) + } + + async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { + ConcurrentOutputHandlerFactoryTask::run(*self, stop_receiver.0.clone()).await?; + stop_receiver.0.changed().await?; + Ok(()) + } +} diff --git a/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs b/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs new file mode 100644 index 000000000000..9117e394a915 --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs @@ -0,0 +1,67 @@ +use crate::implementations::resources::pools::{MasterPool, PoolResource}; +use crate::service::StopReceiver; +use crate::task::{Task, TaskId}; +use crate::{ + service::ServiceContext, + wiring_layer::{WiringError, WiringLayer}, +}; +use zksync_config::configs::chain::NetworkConfig; +use zksync_config::configs::vm_runner::VmRunnerConfig; +use zksync_vm_runner::ProtectiveReadsWriter; + +#[derive(Debug)] +pub struct ProtectiveReadsWriterLayer { + vm_runner_config: VmRunnerConfig, + network_config: NetworkConfig, +} + +impl ProtectiveReadsWriterLayer { + pub fn new(vm_runner_config: VmRunnerConfig, network_config: NetworkConfig) -> Self { + Self { + vm_runner_config, + network_config, + } + } +} + +#[async_trait::async_trait] +impl WiringLayer for ProtectiveReadsWriterLayer { + fn layer_name(&self) -> &'static str { + "vm_runner_protective_reads" + } + + async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { + let master_pool = context.get_resource::>().await?; + + let (protective_reads_writer, tasks) = ProtectiveReadsWriter::new( + master_pool.get_singleton().await?, // TODO: check pool size + self.vm_runner_config.protective_reads_db_path, + self.network_config.zksync_network_id, + self.vm_runner_config.protective_reads_window_size, + ) + .await?; + + context.add_task(Box::new(tasks.loader_task)); + context.add_task(Box::new(tasks.output_handler_factory_task)); + context.add_task(Box::new(ProtectiveReadsWriterTask { + protective_reads_writer, + })); + Ok(()) + } +} + +#[derive(Debug)] +struct ProtectiveReadsWriterTask { + protective_reads_writer: ProtectiveReadsWriter, +} + +#[async_trait::async_trait] +impl Task for ProtectiveReadsWriterTask { + fn id(&self) -> TaskId { + TaskId("vm_runner/protective_reads_writer".to_owned()) + } + + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + self.protective_reads_writer.run(&stop_receiver.0).await + } +} diff --git a/core/node/vm_runner/src/lib.rs b/core/node/vm_runner/src/lib.rs index 00af898f2253..ca9f8bdc0eb4 100644 --- a/core/node/vm_runner/src/lib.rs +++ b/core/node/vm_runner/src/lib.rs @@ -18,4 +18,4 @@ pub use output_handler::{ ConcurrentOutputHandlerFactory, ConcurrentOutputHandlerFactoryTask, OutputHandlerFactory, }; pub use process::VmRunner; -pub use storage::{BatchExecuteData, VmRunnerStorage}; +pub use storage::{BatchExecuteData, StorageSyncTask, VmRunnerStorage}; diff --git a/core/node/vm_runner/src/output_handler.rs b/core/node/vm_runner/src/output_handler.rs index 30fe9e0c9010..61ab54dc1208 100644 --- a/core/node/vm_runner/src/output_handler.rs +++ b/core/node/vm_runner/src/output_handler.rs @@ -203,6 +203,10 @@ impl Debug for ConcurrentOutputHandlerFactoryTask { } impl ConcurrentOutputHandlerFactoryTask { + pub fn io(&self) -> &Io { + &self.io + } + /// Starts running the task which is supposed to last until the end of the node's lifetime. /// /// # Errors diff --git a/core/node/vm_runner/src/storage.rs b/core/node/vm_runner/src/storage.rs index 5ffd1d11e70d..d60a8c921d3e 100644 --- a/core/node/vm_runner/src/storage.rs +++ b/core/node/vm_runner/src/storage.rs @@ -407,4 +407,8 @@ impl StorageSyncTask { l2_blocks, })) } + + pub fn io(&self) -> &Io { + self.io() + } } From dae4fc40b6801db836d9f7314dce98b1fabff1f1 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Mon, 27 May 2024 20:04:29 +1000 Subject: [PATCH 06/16] fmt --- .../src/implementations/layers/vm_runner/mod.rs | 7 +++++-- .../layers/vm_runner/protective_reads.rs | 13 ++++++------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/core/node/node_framework/src/implementations/layers/vm_runner/mod.rs b/core/node/node_framework/src/implementations/layers/vm_runner/mod.rs index adb752ff3179..6e84d96d894a 100644 --- a/core/node/node_framework/src/implementations/layers/vm_runner/mod.rs +++ b/core/node/node_framework/src/implementations/layers/vm_runner/mod.rs @@ -1,7 +1,10 @@ -use crate::service::StopReceiver; -use crate::task::{Task, TaskId}; use zksync_vm_runner::{ConcurrentOutputHandlerFactoryTask, StorageSyncTask, VmRunnerIo}; +use crate::{ + service::StopReceiver, + task::{Task, TaskId}, +}; + pub mod protective_reads; #[async_trait::async_trait] diff --git a/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs b/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs index 9117e394a915..2f0aa9efb4f7 100644 --- a/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs +++ b/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs @@ -1,13 +1,12 @@ -use crate::implementations::resources::pools::{MasterPool, PoolResource}; -use crate::service::StopReceiver; -use crate::task::{Task, TaskId}; +use zksync_config::configs::{chain::NetworkConfig, vm_runner::VmRunnerConfig}; +use zksync_vm_runner::ProtectiveReadsWriter; + use crate::{ - service::ServiceContext, + implementations::resources::pools::{MasterPool, PoolResource}, + service::{ServiceContext, StopReceiver}, + task::{Task, TaskId}, wiring_layer::{WiringError, WiringLayer}, }; -use zksync_config::configs::chain::NetworkConfig; -use zksync_config::configs::vm_runner::VmRunnerConfig; -use zksync_vm_runner::ProtectiveReadsWriter; #[derive(Debug)] pub struct ProtectiveReadsWriterLayer { From b56b3674043e90e1182c6d423d36e02c093b32f2 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Mon, 3 Jun 2024 13:21:32 +1000 Subject: [PATCH 07/16] properly integrate protective reads writer with node framework --- core/bin/zksync_server/src/main.rs | 4 ++- core/bin/zksync_server/src/node_builder.rs | 15 +++++++++++ core/lib/config/src/configs/general.rs | 2 ++ core/lib/config/src/configs/mod.rs | 1 + core/lib/config/src/configs/vm_runner.rs | 8 +++--- core/lib/env_config/src/lib.rs | 1 + core/lib/env_config/src/vm_runner.rs | 8 ++++++ core/lib/protobuf_config/src/general.rs | 6 +++++ core/lib/protobuf_config/src/lib.rs | 1 + .../src/proto/config/general.proto | 2 ++ .../src/proto/config/vm_runner.proto | 8 ++++++ core/lib/protobuf_config/src/vm_runner.rs | 27 +++++++++++++++++++ core/lib/zksync_core_leftovers/src/lib.rs | 2 ++ .../src/temp_config_store/mod.rs | 3 +++ .../layers/vm_runner/protective_reads.rs | 23 +++++++++------- 15 files changed, 97 insertions(+), 14 deletions(-) create mode 100644 core/lib/env_config/src/vm_runner.rs create mode 100644 core/lib/protobuf_config/src/proto/config/vm_runner.proto create mode 100644 core/lib/protobuf_config/src/vm_runner.rs diff --git a/core/bin/zksync_server/src/main.rs b/core/bin/zksync_server/src/main.rs index 955a0232ae3b..f1eedd592386 100644 --- a/core/bin/zksync_server/src/main.rs +++ b/core/bin/zksync_server/src/main.rs @@ -13,7 +13,8 @@ use zksync_config::{ house_keeper::HouseKeeperConfig, ContractsConfig, DatabaseSecrets, FriProofCompressorConfig, FriProverConfig, FriProverGatewayConfig, FriWitnessGeneratorConfig, FriWitnessVectorGeneratorConfig, - L1Secrets, ObservabilityConfig, PrometheusConfig, ProofDataHandlerConfig, Secrets, + L1Secrets, ObservabilityConfig, PrometheusConfig, ProofDataHandlerConfig, + ProtectiveReadsWriterConfig, Secrets, }, ApiConfig, ContractVerifierConfig, DBConfig, EthConfig, EthWatchConfig, GasAdjusterConfig, GenesisConfig, ObjectStoreConfig, PostgresConfig, SnapshotsCreatorConfig, @@ -306,5 +307,6 @@ fn load_env_config() -> anyhow::Result { object_store_config: ObjectStoreConfig::from_env().ok(), observability: ObservabilityConfig::from_env().ok(), snapshot_creator: SnapshotsCreatorConfig::from_env().ok(), + protective_reads_writer_config: ProtectiveReadsWriterConfig::from_env().ok(), }) } diff --git a/core/bin/zksync_server/src/node_builder.rs b/core/bin/zksync_server/src/node_builder.rs index 163835044cac..23a82778e47a 100644 --- a/core/bin/zksync_server/src/node_builder.rs +++ b/core/bin/zksync_server/src/node_builder.rs @@ -13,6 +13,7 @@ use zksync_node_api_server::{ tx_sender::{ApiContracts, TxSenderConfig}, web3::{state::InternalApiConfig, Namespace}, }; +use zksync_node_framework::implementations::layers::vm_runner::protective_reads::ProtectiveReadsWriterLayer; use zksync_node_framework::{ implementations::layers::{ circuit_breaker_checker::CircuitBreakerCheckerLayer, @@ -399,6 +400,17 @@ impl MainNodeBuilder { Ok(self) } + fn add_vm_runner_protective_reads_layer(mut self) -> anyhow::Result { + let protective_reads_writer_config = + try_load_config!(self.configs.protective_reads_writer_config); + self.node.add_layer(ProtectiveReadsWriterLayer::new( + protective_reads_writer_config, + self.genesis_config.l2_chain_id, + )); + + Ok(self) + } + pub fn build(mut self, mut components: Vec) -> anyhow::Result { // Add "base" layers (resources and helper tasks). self = self @@ -480,6 +492,9 @@ impl MainNodeBuilder { Component::CommitmentGenerator => { self = self.add_commitment_generator_layer()?; } + Component::VmRunnerProtectiveReads => { + self = self.add_vm_runner_protective_reads_layer()?; + } } } Ok(self.node.build()?) diff --git a/core/lib/config/src/configs/general.rs b/core/lib/config/src/configs/general.rs index 69d68508a035..4fdaa82b87e0 100644 --- a/core/lib/config/src/configs/general.rs +++ b/core/lib/config/src/configs/general.rs @@ -1,3 +1,4 @@ +use crate::configs::vm_runner::ProtectiveReadsWriterConfig; use crate::{ configs::{ chain::{CircuitBreakerConfig, MempoolConfig, OperationsManagerConfig, StateKeeperConfig}, @@ -32,4 +33,5 @@ pub struct GeneralConfig { pub eth: Option, pub snapshot_creator: Option, pub observability: Option, + pub protective_reads_writer_config: Option, } diff --git a/core/lib/config/src/configs/mod.rs b/core/lib/config/src/configs/mod.rs index 2709b84c2f5c..b2d9571ad292 100644 --- a/core/lib/config/src/configs/mod.rs +++ b/core/lib/config/src/configs/mod.rs @@ -20,6 +20,7 @@ pub use self::{ secrets::{DatabaseSecrets, L1Secrets, Secrets}, snapshots_creator::SnapshotsCreatorConfig, utils::PrometheusConfig, + vm_runner::ProtectiveReadsWriterConfig, }; pub mod api; diff --git a/core/lib/config/src/configs/vm_runner.rs b/core/lib/config/src/configs/vm_runner.rs index 2c79b0a93bc8..6250830398eb 100644 --- a/core/lib/config/src/configs/vm_runner.rs +++ b/core/lib/config/src/configs/vm_runner.rs @@ -1,16 +1,16 @@ use serde::Deserialize; #[derive(Debug, Deserialize, Clone, PartialEq, Default)] -pub struct VmRunnerConfig { +pub struct ProtectiveReadsWriterConfig { /// Path to the RocksDB data directory that serves state cache. - #[serde(default = "VmRunnerConfig::default_protective_reads_db_path")] + #[serde(default = "ProtectiveReadsWriterConfig::default_protective_reads_db_path")] pub protective_reads_db_path: String, /// How many max batches should be processed at the same time. pub protective_reads_window_size: u32, } -impl VmRunnerConfig { +impl ProtectiveReadsWriterConfig { fn default_protective_reads_db_path() -> String { - "./db/protective_reads".to_owned() + "./db/protective_reads_writer".to_owned() } } diff --git a/core/lib/env_config/src/lib.rs b/core/lib/env_config/src/lib.rs index f6290020f38d..9218467fdaba 100644 --- a/core/lib/env_config/src/lib.rs +++ b/core/lib/env_config/src/lib.rs @@ -24,6 +24,7 @@ mod utils; mod genesis; #[cfg(test)] mod test_utils; +mod vm_runner; mod wallets; pub trait FromEnv: Sized { diff --git a/core/lib/env_config/src/vm_runner.rs b/core/lib/env_config/src/vm_runner.rs new file mode 100644 index 000000000000..651d77a9cf08 --- /dev/null +++ b/core/lib/env_config/src/vm_runner.rs @@ -0,0 +1,8 @@ +use crate::{envy_load, FromEnv}; +use zksync_config::configs::ProtectiveReadsWriterConfig; + +impl FromEnv for ProtectiveReadsWriterConfig { + fn from_env() -> anyhow::Result { + envy_load("vm_runner.protective_reads", "VM_RUNNER_PROTECTIVE_READS_") + } +} diff --git a/core/lib/protobuf_config/src/general.rs b/core/lib/protobuf_config/src/general.rs index ccd55a71c2ec..ba2076a09a14 100644 --- a/core/lib/protobuf_config/src/general.rs +++ b/core/lib/protobuf_config/src/general.rs @@ -37,6 +37,8 @@ impl ProtoRepr for proto::GeneralConfig { snapshot_creator: read_optional_repr(&self.snapshot_creator) .context("snapshot_creator")?, observability: read_optional_repr(&self.observability).context("observability")?, + protective_reads_writer_config: read_optional_repr(&self.protective_reads_writer) + .context("vm_runner")?, }) } @@ -68,6 +70,10 @@ impl ProtoRepr for proto::GeneralConfig { eth: this.eth.as_ref().map(ProtoRepr::build), snapshot_creator: this.snapshot_creator.as_ref().map(ProtoRepr::build), observability: this.observability.as_ref().map(ProtoRepr::build), + protective_reads_writer: this + .protective_reads_writer_config + .as_ref() + .map(ProtoRepr::build), } } } diff --git a/core/lib/protobuf_config/src/lib.rs b/core/lib/protobuf_config/src/lib.rs index 25d5662b9ddb..2fd9bbd9e059 100644 --- a/core/lib/protobuf_config/src/lib.rs +++ b/core/lib/protobuf_config/src/lib.rs @@ -27,6 +27,7 @@ pub mod testonly; #[cfg(test)] mod tests; mod utils; +mod vm_runner; mod wallets; use std::str::FromStr; diff --git a/core/lib/protobuf_config/src/proto/config/general.proto b/core/lib/protobuf_config/src/proto/config/general.proto index fdc60c57cfdd..b606417d129a 100644 --- a/core/lib/protobuf_config/src/proto/config/general.proto +++ b/core/lib/protobuf_config/src/proto/config/general.proto @@ -13,6 +13,7 @@ import "zksync/config/house_keeper.proto"; import "zksync/config/observability.proto"; import "zksync/config/snapshots_creator.proto"; import "zksync/config/utils.proto"; +import "zksync/config/vm_runner.proto"; message GeneralConfig { optional config.database.Postgres postgres = 1; @@ -35,4 +36,5 @@ message GeneralConfig { optional config.prover.ProverGateway prover_gateway = 30; optional config.snapshot_creator.SnapshotsCreator snapshot_creator = 31; optional config.observability.Observability observability = 32; + optional config.vm_runner.ProtectiveReadsWriter protective_reads_writer = 33; } diff --git a/core/lib/protobuf_config/src/proto/config/vm_runner.proto b/core/lib/protobuf_config/src/proto/config/vm_runner.proto new file mode 100644 index 000000000000..a7c829f05869 --- /dev/null +++ b/core/lib/protobuf_config/src/proto/config/vm_runner.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +package zksync.config.vm_runner; + +message ProtectiveReadsWriter { + optional string protective_reads_db_path = 1; // required; fs path + optional uint64 protective_reads_window_size = 2; // required +} diff --git a/core/lib/protobuf_config/src/vm_runner.rs b/core/lib/protobuf_config/src/vm_runner.rs new file mode 100644 index 000000000000..227e22cd5d22 --- /dev/null +++ b/core/lib/protobuf_config/src/vm_runner.rs @@ -0,0 +1,27 @@ +use anyhow::Context; +use zksync_config::configs::{self}; +use zksync_protobuf::{required, ProtoRepr}; + +use crate::proto::vm_runner as proto; + +impl ProtoRepr for proto::ProtectiveReadsWriter { + type Type = configs::ProtectiveReadsWriterConfig; + + fn read(&self) -> anyhow::Result { + Ok(Self::Type { + protective_reads_db_path: required(&self.protective_reads_db_path) + .context("protective_reads_db_path")? + .clone(), + protective_reads_window_size: *required(&self.protective_reads_window_size) + .context("protective_reads_window_size")? + as u32, + }) + } + + fn build(this: &Self::Type) -> Self { + Self { + protective_reads_db_path: Some(this.protective_reads_db_path.clone()), + protective_reads_window_size: Some(this.protective_reads_window_size as u64), + } + } +} diff --git a/core/lib/zksync_core_leftovers/src/lib.rs b/core/lib/zksync_core_leftovers/src/lib.rs index b0104cc795e3..34bc282ad7cd 100644 --- a/core/lib/zksync_core_leftovers/src/lib.rs +++ b/core/lib/zksync_core_leftovers/src/lib.rs @@ -154,6 +154,8 @@ pub enum Component { Consensus, /// Component generating commitment for L1 batches. CommitmentGenerator, + /// VM runner-based component that saves protective reads to Postgres. + VmRunnerProtectiveReads, } #[derive(Debug)] diff --git a/core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs b/core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs index cfac1df27cd0..5bc5d12ec8ca 100644 --- a/core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs +++ b/core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs @@ -1,3 +1,4 @@ +use zksync_config::configs::ProtectiveReadsWriterConfig; use zksync_config::{ configs::{ api::{HealthCheckConfig, MerkleTreeApiConfig, Web3JsonRpcConfig}, @@ -61,6 +62,7 @@ pub struct TempConfigStore { pub object_store_config: Option, pub observability: Option, pub snapshot_creator: Option, + pub protective_reads_writer_config: Option, } impl TempConfigStore { @@ -86,6 +88,7 @@ impl TempConfigStore { eth: self.eth_sender_config.clone(), snapshot_creator: self.snapshot_creator.clone(), observability: self.observability.clone(), + protective_reads_writer_config: self.protective_reads_writer_config.clone(), } } diff --git a/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs b/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs index 2f0aa9efb4f7..8962c0aeb7cc 100644 --- a/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs +++ b/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs @@ -1,4 +1,5 @@ -use zksync_config::configs::{chain::NetworkConfig, vm_runner::VmRunnerConfig}; +use zksync_config::configs::vm_runner::ProtectiveReadsWriterConfig; +use zksync_types::L2ChainId; use zksync_vm_runner::ProtectiveReadsWriter; use crate::{ @@ -10,15 +11,18 @@ use crate::{ #[derive(Debug)] pub struct ProtectiveReadsWriterLayer { - vm_runner_config: VmRunnerConfig, - network_config: NetworkConfig, + protective_reads_writer_config: ProtectiveReadsWriterConfig, + zksync_network_id: L2ChainId, } impl ProtectiveReadsWriterLayer { - pub fn new(vm_runner_config: VmRunnerConfig, network_config: NetworkConfig) -> Self { + pub fn new( + protective_reads_writer_config: ProtectiveReadsWriterConfig, + zksync_network_id: L2ChainId, + ) -> Self { Self { - vm_runner_config, - network_config, + protective_reads_writer_config, + zksync_network_id, } } } @@ -34,9 +38,10 @@ impl WiringLayer for ProtectiveReadsWriterLayer { let (protective_reads_writer, tasks) = ProtectiveReadsWriter::new( master_pool.get_singleton().await?, // TODO: check pool size - self.vm_runner_config.protective_reads_db_path, - self.network_config.zksync_network_id, - self.vm_runner_config.protective_reads_window_size, + self.protective_reads_writer_config.protective_reads_db_path, + self.zksync_network_id, + self.protective_reads_writer_config + .protective_reads_window_size, ) .await?; From 9a49802a1094e7bf5b33b46024a3dddc0f57848b Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Mon, 3 Jun 2024 13:22:41 +1000 Subject: [PATCH 08/16] fmt --- core/bin/zksync_server/src/node_builder.rs | 2 +- core/lib/config/src/configs/general.rs | 2 +- core/lib/env_config/src/vm_runner.rs | 3 ++- core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs | 3 +-- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/bin/zksync_server/src/node_builder.rs b/core/bin/zksync_server/src/node_builder.rs index 23a82778e47a..d67b898c95ca 100644 --- a/core/bin/zksync_server/src/node_builder.rs +++ b/core/bin/zksync_server/src/node_builder.rs @@ -13,7 +13,6 @@ use zksync_node_api_server::{ tx_sender::{ApiContracts, TxSenderConfig}, web3::{state::InternalApiConfig, Namespace}, }; -use zksync_node_framework::implementations::layers::vm_runner::protective_reads::ProtectiveReadsWriterLayer; use zksync_node_framework::{ implementations::layers::{ circuit_breaker_checker::CircuitBreakerCheckerLayer, @@ -38,6 +37,7 @@ use zksync_node_framework::{ StateKeeperLayer, }, tee_verifier_input_producer::TeeVerifierInputProducerLayer, + vm_runner::protective_reads::ProtectiveReadsWriterLayer, web3_api::{ caches::MempoolCacheLayer, server::{Web3ServerLayer, Web3ServerOptionalConfig}, diff --git a/core/lib/config/src/configs/general.rs b/core/lib/config/src/configs/general.rs index 4fdaa82b87e0..ef02f557bc18 100644 --- a/core/lib/config/src/configs/general.rs +++ b/core/lib/config/src/configs/general.rs @@ -1,9 +1,9 @@ -use crate::configs::vm_runner::ProtectiveReadsWriterConfig; use crate::{ configs::{ chain::{CircuitBreakerConfig, MempoolConfig, OperationsManagerConfig, StateKeeperConfig}, fri_prover_group::FriProverGroupConfig, house_keeper::HouseKeeperConfig, + vm_runner::ProtectiveReadsWriterConfig, FriProofCompressorConfig, FriProverConfig, FriProverGatewayConfig, FriWitnessGeneratorConfig, FriWitnessVectorGeneratorConfig, ObservabilityConfig, PrometheusConfig, ProofDataHandlerConfig, diff --git a/core/lib/env_config/src/vm_runner.rs b/core/lib/env_config/src/vm_runner.rs index 651d77a9cf08..8a99ea2dc8e2 100644 --- a/core/lib/env_config/src/vm_runner.rs +++ b/core/lib/env_config/src/vm_runner.rs @@ -1,6 +1,7 @@ -use crate::{envy_load, FromEnv}; use zksync_config::configs::ProtectiveReadsWriterConfig; +use crate::{envy_load, FromEnv}; + impl FromEnv for ProtectiveReadsWriterConfig { fn from_env() -> anyhow::Result { envy_load("vm_runner.protective_reads", "VM_RUNNER_PROTECTIVE_READS_") diff --git a/core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs b/core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs index 5bc5d12ec8ca..68389228861a 100644 --- a/core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs +++ b/core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs @@ -1,4 +1,3 @@ -use zksync_config::configs::ProtectiveReadsWriterConfig; use zksync_config::{ configs::{ api::{HealthCheckConfig, MerkleTreeApiConfig, Web3JsonRpcConfig}, @@ -11,7 +10,7 @@ use zksync_config::{ wallets::{AddressWallet, EthSender, StateKeeper, Wallet, Wallets}, FriProofCompressorConfig, FriProverConfig, FriProverGatewayConfig, FriWitnessGeneratorConfig, FriWitnessVectorGeneratorConfig, GeneralConfig, - ObservabilityConfig, PrometheusConfig, ProofDataHandlerConfig, + ObservabilityConfig, PrometheusConfig, ProofDataHandlerConfig, ProtectiveReadsWriterConfig, }, ApiConfig, ContractVerifierConfig, DBConfig, EthConfig, EthWatchConfig, GasAdjusterConfig, ObjectStoreConfig, PostgresConfig, SnapshotsCreatorConfig, From 910da89a753c50dd57366f9ff826942b75edab7f Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Mon, 3 Jun 2024 14:29:08 +1000 Subject: [PATCH 09/16] write docs --- core/node/vm_runner/src/impls/protective_reads.rs | 13 +++++++++++++ core/node/vm_runner/src/output_handler.rs | 1 + core/node/vm_runner/src/storage.rs | 15 +++++++++++---- 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/core/node/vm_runner/src/impls/protective_reads.rs b/core/node/vm_runner/src/impls/protective_reads.rs index c07c246ccd34..37f2baa54383 100644 --- a/core/node/vm_runner/src/impls/protective_reads.rs +++ b/core/node/vm_runner/src/impls/protective_reads.rs @@ -12,12 +12,15 @@ use crate::{ OutputHandlerFactory, VmRunner, VmRunnerIo, VmRunnerStorage, }; +/// A standalone component that writes protective reads asynchronously to state keeper. #[derive(Debug)] pub struct ProtectiveReadsWriter { vm_runner: VmRunner, } impl ProtectiveReadsWriter { + /// Create a new protective reads writer from the provided DB parameters and window size which + /// regulates how many batches this component can handle at the same time. pub async fn new( pool: ConnectionPool, rocksdb_path: String, @@ -47,14 +50,24 @@ impl ProtectiveReadsWriter { )) } + /// Continuously loads new available batches and writes the corresponding protective reads + /// produced by that batch. + /// + /// # Errors + /// + /// Propagates RocksDB and Postgres errors. pub async fn run(self, stop_receiver: &watch::Receiver) -> anyhow::Result<()> { self.vm_runner.run(stop_receiver).await } } +/// A collections of tasks that need to be run in order for protective reads writer to work as +/// intended. #[derive(Debug)] pub struct ProtectiveReadsWriterTasks { + /// Task that synchronizes storage with new available batches. pub loader_task: StorageSyncTask, + /// Task that handles output from processed batches. pub output_handler_factory_task: ConcurrentOutputHandlerFactoryTask, } diff --git a/core/node/vm_runner/src/output_handler.rs b/core/node/vm_runner/src/output_handler.rs index 61ab54dc1208..49bed83cd96e 100644 --- a/core/node/vm_runner/src/output_handler.rs +++ b/core/node/vm_runner/src/output_handler.rs @@ -203,6 +203,7 @@ impl Debug for ConcurrentOutputHandlerFactoryTask { } impl ConcurrentOutputHandlerFactoryTask { + /// Access the underlying [`VmRunnerIo`]. pub fn io(&self) -> &Io { &self.io } diff --git a/core/node/vm_runner/src/storage.rs b/core/node/vm_runner/src/storage.rs index d60a8c921d3e..3a8552701178 100644 --- a/core/node/vm_runner/src/storage.rs +++ b/core/node/vm_runner/src/storage.rs @@ -271,6 +271,17 @@ impl StorageSyncTask { }) } + /// Access the underlying [`VmRunnerIo`]. + pub fn io(&self) -> &Io { + &self.io + } + + /// Block until RocksDB cache instance is caught up with Postgres and then continuously makes + /// sure that the new ready batches are loaded into the cache. + /// + /// # Errors + /// + /// Propagates RocksDB and Postgres errors. pub async fn run(self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { const SLEEP_INTERVAL: Duration = Duration::from_millis(50); @@ -407,8 +418,4 @@ impl StorageSyncTask { l2_blocks, })) } - - pub fn io(&self) -> &Io { - self.io() - } } From e9464bac175f45c34028ca39b1e21e1cac28a680 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Mon, 3 Jun 2024 14:55:22 +1000 Subject: [PATCH 10/16] make protective reads writer shadow state keeper --- core/node/vm_runner/Cargo.toml | 2 +- .../vm_runner/src/impls/protective_reads.rs | 31 ++++++++++++++++--- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/core/node/vm_runner/Cargo.toml b/core/node/vm_runner/Cargo.toml index 67de95f60cb0..b3ede5a796be 100644 --- a/core/node/vm_runner/Cargo.toml +++ b/core/node/vm_runner/Cargo.toml @@ -17,6 +17,7 @@ zksync_contracts.workspace = true zksync_state.workspace = true zksync_storage.workspace = true zksync_state_keeper.workspace = true +zksync_utils.workspace = true vm_utils.workspace = true tokio = { workspace = true, features = ["time"] } @@ -30,7 +31,6 @@ dashmap.workspace = true zksync_node_test_utils.workspace = true zksync_node_genesis.workspace = true zksync_test_account.workspace = true -zksync_utils.workspace = true backon.workspace = true futures = { workspace = true, features = ["compat"] } rand.workspace = true diff --git a/core/node/vm_runner/src/impls/protective_reads.rs b/core/node/vm_runner/src/impls/protective_reads.rs index 37f2baa54383..03a5f1254aa6 100644 --- a/core/node/vm_runner/src/impls/protective_reads.rs +++ b/core/node/vm_runner/src/impls/protective_reads.rs @@ -5,7 +5,8 @@ use async_trait::async_trait; use tokio::sync::watch; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; use zksync_state_keeper::{MainBatchExecutor, StateKeeperOutputHandler, UpdatesManager}; -use zksync_types::{L1BatchNumber, L2ChainId}; +use zksync_types::{zk_evm_types::LogQuery, AccountTreeId, L1BatchNumber, L2ChainId, StorageKey}; +use zksync_utils::u256_to_h256; use crate::{ storage::StorageSyncTask, ConcurrentOutputHandlerFactory, ConcurrentOutputHandlerFactoryTask, @@ -134,7 +135,7 @@ impl StateKeeperOutputHandler for ProtectiveReadsOutputHandler { .finished .as_ref() .context("L1 batch is not actually finished")?; - let (_, protective_reads): (Vec<_>, Vec<_>) = finished_batch + let (_, protective_reads): (Vec, Vec) = finished_batch .final_execution_state .deduplicated_storage_log_queries .iter() @@ -144,10 +145,32 @@ impl StateKeeperOutputHandler for ProtectiveReadsOutputHandler { .pool .connection_tagged("protective_reads_writer") .await?; - connection + let mut expected_protective_reads = connection .storage_logs_dedup_dal() - .insert_protective_reads(updates_manager.l1_batch.number, &protective_reads) + .get_protective_reads_for_l1_batch(updates_manager.l1_batch.number) .await?; + + for protective_read in protective_reads { + let address = AccountTreeId::new(protective_read.address); + let key = u256_to_h256(protective_read.key); + if !expected_protective_reads.remove(&StorageKey::new(address, key)) { + tracing::error!( + l1_batch_number = %updates_manager.l1_batch.number, + address = %protective_read.address, + key = %key, + "VM runner produced a protective read that did not happen in state keeper" + ); + } + } + for remaining_read in expected_protective_reads { + tracing::error!( + l1_batch_number = %updates_manager.l1_batch.number, + address = %remaining_read.address(), + key = %remaining_read.key(), + "State keeper produced a protective read that did not happen in VM runner" + ); + } + Ok(()) } } From b0f57c703d169b1566b01d06f783a01d80f16917 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Mon, 3 Jun 2024 15:35:11 +1000 Subject: [PATCH 11/16] add protective reads writer to CI --- .github/workflows/ci-core-reusable.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci-core-reusable.yml b/.github/workflows/ci-core-reusable.yml index 02069c4259f4..1a47e4965360 100644 --- a/.github/workflows/ci-core-reusable.yml +++ b/.github/workflows/ci-core-reusable.yml @@ -134,7 +134,7 @@ jobs: base_token: ["Eth", "Custom"] deployment_mode: ["Rollup", "Validium"] env: - SERVER_COMPONENTS: "api,tree,eth,state_keeper,housekeeper,commitment_generator${{ matrix.consensus && ',consensus' || '' }}" + SERVER_COMPONENTS: "api,tree,eth,state_keeper,housekeeper,commitment_generator,vm_runner_protective_reads${{ matrix.consensus && ',consensus' || '' }}" runs-on: [matterlabs-ci-runner] steps: From da78d77660056f520869aa5b491e72f8434766a8 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Mon, 3 Jun 2024 17:02:17 +1000 Subject: [PATCH 12/16] fix remaining issues --- core/lib/zksync_core_leftovers/src/lib.rs | 3 +++ .../implementations/layers/vm_runner/protective_reads.rs | 2 +- core/node/state_keeper/src/updates/mod.rs | 2 +- core/node/vm_runner/src/process.rs | 3 ++- core/node/vm_runner/src/storage.rs | 6 +++--- etc/env/base/vm_runner.toml | 9 +++++++++ 6 files changed, 19 insertions(+), 6 deletions(-) create mode 100644 etc/env/base/vm_runner.toml diff --git a/core/lib/zksync_core_leftovers/src/lib.rs b/core/lib/zksync_core_leftovers/src/lib.rs index 34bc282ad7cd..4f8664ab74dc 100644 --- a/core/lib/zksync_core_leftovers/src/lib.rs +++ b/core/lib/zksync_core_leftovers/src/lib.rs @@ -192,6 +192,9 @@ impl FromStr for Components { "proof_data_handler" => Ok(Components(vec![Component::ProofDataHandler])), "consensus" => Ok(Components(vec![Component::Consensus])), "commitment_generator" => Ok(Components(vec![Component::CommitmentGenerator])), + "vm_runner_protective_reads" => { + Ok(Components(vec![Component::VmRunnerProtectiveReads])) + } other => Err(format!("{} is not a valid component name", other)), } } diff --git a/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs b/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs index 8962c0aeb7cc..1b7867cdef1f 100644 --- a/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs +++ b/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs @@ -37,7 +37,7 @@ impl WiringLayer for ProtectiveReadsWriterLayer { let master_pool = context.get_resource::>().await?; let (protective_reads_writer, tasks) = ProtectiveReadsWriter::new( - master_pool.get_singleton().await?, // TODO: check pool size + master_pool.get_custom(3).await?, self.protective_reads_writer_config.protective_reads_db_path, self.zksync_network_id, self.protective_reads_writer_config diff --git a/core/node/state_keeper/src/updates/mod.rs b/core/node/state_keeper/src/updates/mod.rs index bb33a6f58678..772ee71641a0 100644 --- a/core/node/state_keeper/src/updates/mod.rs +++ b/core/node/state_keeper/src/updates/mod.rs @@ -123,7 +123,7 @@ impl UpdatesManager { ); } - pub(crate) fn finish_batch(&mut self, finished_batch: FinishedL1Batch) { + pub fn finish_batch(&mut self, finished_batch: FinishedL1Batch) { assert!( self.l1_batch.finished.is_none(), "Cannot finish already finished batch" diff --git a/core/node/vm_runner/src/process.rs b/core/node/vm_runner/src/process.rs index 5ff7d7cc0b87..5e51b5e658f7 100644 --- a/core/node/vm_runner/src/process.rs +++ b/core/node/vm_runner/src/process.rs @@ -109,10 +109,11 @@ impl VmRunner { .await .context("VM runner failed to handle L2 block")?; } - batch_executor + let finished_batch = batch_executor .finish_batch() .await .context("failed finishing L1 batch in executor")?; + updates_manager.finish_batch(finished_batch); output_handler .handle_l1_batch(Arc::new(updates_manager)) .await diff --git a/core/node/vm_runner/src/storage.rs b/core/node/vm_runner/src/storage.rs index 3a8552701178..e7a8b147c76f 100644 --- a/core/node/vm_runner/src/storage.rs +++ b/core/node/vm_runner/src/storage.rs @@ -300,10 +300,10 @@ impl StorageSyncTask { if rocksdb_builder.l1_batch_number().await == Some(latest_processed_batch + 1) { // RocksDB is already caught up, we might not need to do anything. // Just need to check that the memory diff is up-to-date in case this is a fresh start. + let last_ready_batch = self.io.last_ready_to_be_loaded_batch(&mut conn).await?; let state = self.state.read().await; - if state - .storage - .contains_key(&self.io.last_ready_to_be_loaded_batch(&mut conn).await?) + if last_ready_batch == latest_processed_batch + || state.storage.contains_key(&last_ready_batch) { // No need to do anything, killing time until last processed batch is updated. drop(conn); diff --git a/etc/env/base/vm_runner.toml b/etc/env/base/vm_runner.toml new file mode 100644 index 000000000000..d9e10e8b357d --- /dev/null +++ b/etc/env/base/vm_runner.toml @@ -0,0 +1,9 @@ +# Configuration for the VM runner crate + +[vm_runner] + +[vm_runner.protective_reads] +# Path to the directory that contains RocksDB with protective reads writer cache. +protective_reads_db_path = "./db/main/protective_reads" +# Amount of batches that can be processed in parallel. +protective_reads_window_size = 3 From 84e12055e9b6b2710ee995a61dfc322eb0f87aec Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Mon, 3 Jun 2024 17:17:12 +1000 Subject: [PATCH 13/16] fix compilation issues --- .../src/implementations/layers/vm_runner/mod.rs | 4 ++-- .../implementations/layers/vm_runner/protective_reads.rs | 2 +- core/node/node_framework/src/task.rs | 6 ++++++ 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/core/node/node_framework/src/implementations/layers/vm_runner/mod.rs b/core/node/node_framework/src/implementations/layers/vm_runner/mod.rs index 6e84d96d894a..a105ad81ee60 100644 --- a/core/node/node_framework/src/implementations/layers/vm_runner/mod.rs +++ b/core/node/node_framework/src/implementations/layers/vm_runner/mod.rs @@ -10,7 +10,7 @@ pub mod protective_reads; #[async_trait::async_trait] impl Task for StorageSyncTask { fn id(&self) -> TaskId { - TaskId(format!("vm_runner/{}/storage_sync", self.io().name())) + format!("vm_runner/{}/storage_sync", self.io().name()).into() } async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { @@ -23,7 +23,7 @@ impl Task for StorageSyncTask { #[async_trait::async_trait] impl Task for ConcurrentOutputHandlerFactoryTask { fn id(&self) -> TaskId { - TaskId(format!("vm_runner/{}/output_handler", self.io().name())) + format!("vm_runner/{}/output_handler", self.io().name()).into() } async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { diff --git a/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs b/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs index 1b7867cdef1f..bb98f04992ed 100644 --- a/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs +++ b/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs @@ -62,7 +62,7 @@ struct ProtectiveReadsWriterTask { #[async_trait::async_trait] impl Task for ProtectiveReadsWriterTask { fn id(&self) -> TaskId { - TaskId("vm_runner/protective_reads_writer".to_owned()) + "vm_runner/protective_reads_writer".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { diff --git a/core/node/node_framework/src/task.rs b/core/node/node_framework/src/task.rs index a72d640731ea..8ff73d75d8fa 100644 --- a/core/node/node_framework/src/task.rs +++ b/core/node/node_framework/src/task.rs @@ -60,6 +60,12 @@ impl From<&str> for TaskId { } } +impl From for TaskId { + fn from(value: String) -> Self { + TaskId(value) + } +} + impl Deref for TaskId { type Target = str; From 52cf4caeeda58c5f037ff8e65b8bd7656d72dbe0 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Tue, 4 Jun 2024 12:51:34 +1000 Subject: [PATCH 14/16] add vm runner to EN int tests and loadtests --- .github/workflows/ci-core-reusable.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci-core-reusable.yml b/.github/workflows/ci-core-reusable.yml index 1a47e4965360..74a0bed17dae 100644 --- a/.github/workflows/ci-core-reusable.yml +++ b/.github/workflows/ci-core-reusable.yml @@ -104,7 +104,7 @@ jobs: # `sleep 60` because we need to wait until server added all the tokens - name: Run server run: | - ci_run zk server --uring --components api,tree,eth,state_keeper,housekeeper,commitment_generator &>server.log & + ci_run zk server --uring --components api,tree,eth,state_keeper,housekeeper,commitment_generator,vm_runner_protective_reads &>server.log & ci_run sleep 60 - name: Deploy legacy era contracts @@ -301,7 +301,7 @@ jobs: runs-on: [matterlabs-ci-runner] env: - SERVER_COMPONENTS: "api,tree,eth,state_keeper,housekeeper,commitment_generator${{ matrix.consensus && ',consensus' || '' }}" + SERVER_COMPONENTS: "api,tree,eth,state_keeper,housekeeper,commitment_generator,vm_runner_protective_reads${{ matrix.consensus && ',consensus' || '' }}" EXT_NODE_FLAGS: "${{ matrix.consensus && '-- --enable-consensus' || '' }}" steps: From aa85c80a63f20f6c493cc48bb76c1eb56a4e6739 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Tue, 4 Jun 2024 12:58:50 +1000 Subject: [PATCH 15/16] add yaml config for vm runner --- etc/env/file_based/general.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/etc/env/file_based/general.yaml b/etc/env/file_based/general.yaml index d59da18d1266..fdccdf03b5f7 100644 --- a/etc/env/file_based/general.yaml +++ b/etc/env/file_based/general.yaml @@ -321,3 +321,7 @@ observability: opentelemetry: endpoint: unset level: debug + +protective_reads_writer: + protective_reads_db_path: "./db/main/protective_reads" + protective_reads_window_size: 3 From 00ddb09566d090b6b10c825f592e5b2597132029 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Tue, 4 Jun 2024 13:32:25 +1000 Subject: [PATCH 16/16] rethink the pool size --- .../layers/vm_runner/protective_reads.rs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs b/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs index bb98f04992ed..332793031fa5 100644 --- a/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs +++ b/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs @@ -37,7 +37,22 @@ impl WiringLayer for ProtectiveReadsWriterLayer { let master_pool = context.get_resource::>().await?; let (protective_reads_writer, tasks) = ProtectiveReadsWriter::new( - master_pool.get_custom(3).await?, + // One for `StorageSyncTask` which can hold a long-term connection in case it needs to + // catch up cache. + // + // One for `ConcurrentOutputHandlerFactoryTask`/`VmRunner` as they need occasional access + // to DB for querying last processed batch and last ready to be loaded batch. + // + // `self.protective_reads_writer_config` connections for `ProtectiveReadsOutputHandlerFactory` + // as there can be multiple output handlers holding multi-second connections to write + // large amount of protective reads. + master_pool + .get_custom( + self.protective_reads_writer_config + .protective_reads_window_size + + 2, + ) + .await?, self.protective_reads_writer_config.protective_reads_db_path, self.zksync_network_id, self.protective_reads_writer_config