diff --git a/.github/workflows/ci-core-reusable.yml b/.github/workflows/ci-core-reusable.yml index 9e11ab51c5ab..72e75e085b16 100644 --- a/.github/workflows/ci-core-reusable.yml +++ b/.github/workflows/ci-core-reusable.yml @@ -104,7 +104,7 @@ jobs: # `sleep 60` because we need to wait until server added all the tokens - name: Run server run: | - ci_run zk server --uring --components api,tree,eth,state_keeper,housekeeper,commitment_generator &>server.log & + ci_run zk server --uring --components api,tree,eth,state_keeper,housekeeper,commitment_generator,vm_runner_protective_reads &>server.log & ci_run sleep 60 - name: Deploy legacy era contracts @@ -134,7 +134,7 @@ jobs: base_token: ["Eth", "Custom"] deployment_mode: ["Rollup", "Validium"] env: - SERVER_COMPONENTS: "api,tree,eth,state_keeper,housekeeper,commitment_generator${{ matrix.consensus && ',consensus' || '' }}" + SERVER_COMPONENTS: "api,tree,eth,state_keeper,housekeeper,commitment_generator,vm_runner_protective_reads${{ matrix.consensus && ',consensus' || '' }}" runs-on: [matterlabs-ci-runner] steps: @@ -302,7 +302,7 @@ jobs: runs-on: [matterlabs-ci-runner] env: - SERVER_COMPONENTS: "api,tree,eth,state_keeper,housekeeper,commitment_generator${{ matrix.consensus && ',consensus' || '' }}" + SERVER_COMPONENTS: "api,tree,eth,state_keeper,housekeeper,commitment_generator,vm_runner_protective_reads${{ matrix.consensus && ',consensus' || '' }}" EXT_NODE_FLAGS: "${{ matrix.consensus && '-- --enable-consensus' || '' }}" steps: diff --git a/Cargo.lock b/Cargo.lock index ad53e37d425f..af0d4d352203 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8959,6 +8959,7 @@ dependencies = [ "zksync_tee_verifier_input_producer", "zksync_types", "zksync_utils", + "zksync_vm_runner", "zksync_web3_decl", ] diff --git a/core/bin/zksync_server/src/main.rs b/core/bin/zksync_server/src/main.rs index 955a0232ae3b..f1eedd592386 100644 --- a/core/bin/zksync_server/src/main.rs +++ b/core/bin/zksync_server/src/main.rs @@ -13,7 +13,8 @@ use zksync_config::{ house_keeper::HouseKeeperConfig, ContractsConfig, DatabaseSecrets, FriProofCompressorConfig, FriProverConfig, FriProverGatewayConfig, FriWitnessGeneratorConfig, FriWitnessVectorGeneratorConfig, - L1Secrets, ObservabilityConfig, PrometheusConfig, ProofDataHandlerConfig, Secrets, + L1Secrets, ObservabilityConfig, PrometheusConfig, ProofDataHandlerConfig, + ProtectiveReadsWriterConfig, Secrets, }, ApiConfig, ContractVerifierConfig, DBConfig, EthConfig, EthWatchConfig, GasAdjusterConfig, GenesisConfig, ObjectStoreConfig, PostgresConfig, SnapshotsCreatorConfig, @@ -306,5 +307,6 @@ fn load_env_config() -> anyhow::Result { object_store_config: ObjectStoreConfig::from_env().ok(), observability: ObservabilityConfig::from_env().ok(), snapshot_creator: SnapshotsCreatorConfig::from_env().ok(), + protective_reads_writer_config: ProtectiveReadsWriterConfig::from_env().ok(), }) } diff --git a/core/bin/zksync_server/src/node_builder.rs b/core/bin/zksync_server/src/node_builder.rs index 163835044cac..d67b898c95ca 100644 --- a/core/bin/zksync_server/src/node_builder.rs +++ b/core/bin/zksync_server/src/node_builder.rs @@ -37,6 +37,7 @@ use zksync_node_framework::{ StateKeeperLayer, }, tee_verifier_input_producer::TeeVerifierInputProducerLayer, + vm_runner::protective_reads::ProtectiveReadsWriterLayer, web3_api::{ caches::MempoolCacheLayer, server::{Web3ServerLayer, Web3ServerOptionalConfig}, @@ -399,6 +400,17 @@ impl MainNodeBuilder { Ok(self) } + fn add_vm_runner_protective_reads_layer(mut self) -> anyhow::Result { + let protective_reads_writer_config = + try_load_config!(self.configs.protective_reads_writer_config); + self.node.add_layer(ProtectiveReadsWriterLayer::new( + protective_reads_writer_config, + self.genesis_config.l2_chain_id, + )); + + Ok(self) + } + pub fn build(mut self, mut components: Vec) -> anyhow::Result { // Add "base" layers (resources and helper tasks). self = self @@ -480,6 +492,9 @@ impl MainNodeBuilder { Component::CommitmentGenerator => { self = self.add_commitment_generator_layer()?; } + Component::VmRunnerProtectiveReads => { + self = self.add_vm_runner_protective_reads_layer()?; + } } } Ok(self.node.build()?) diff --git a/core/lib/config/src/configs/general.rs b/core/lib/config/src/configs/general.rs index 69d68508a035..ef02f557bc18 100644 --- a/core/lib/config/src/configs/general.rs +++ b/core/lib/config/src/configs/general.rs @@ -3,6 +3,7 @@ use crate::{ chain::{CircuitBreakerConfig, MempoolConfig, OperationsManagerConfig, StateKeeperConfig}, fri_prover_group::FriProverGroupConfig, house_keeper::HouseKeeperConfig, + vm_runner::ProtectiveReadsWriterConfig, FriProofCompressorConfig, FriProverConfig, FriProverGatewayConfig, FriWitnessGeneratorConfig, FriWitnessVectorGeneratorConfig, ObservabilityConfig, PrometheusConfig, ProofDataHandlerConfig, @@ -32,4 +33,5 @@ pub struct GeneralConfig { pub eth: Option, pub snapshot_creator: Option, pub observability: Option, + pub protective_reads_writer_config: Option, } diff --git a/core/lib/config/src/configs/mod.rs b/core/lib/config/src/configs/mod.rs index 925c30976f97..b2d9571ad292 100644 --- a/core/lib/config/src/configs/mod.rs +++ b/core/lib/config/src/configs/mod.rs @@ -20,6 +20,7 @@ pub use self::{ secrets::{DatabaseSecrets, L1Secrets, Secrets}, snapshots_creator::SnapshotsCreatorConfig, utils::PrometheusConfig, + vm_runner::ProtectiveReadsWriterConfig, }; pub mod api; @@ -46,6 +47,7 @@ pub mod proof_data_handler; pub mod secrets; pub mod snapshots_creator; pub mod utils; +pub mod vm_runner; pub mod wallets; const BYTES_IN_MEGABYTE: usize = 1_024 * 1_024; diff --git a/core/lib/config/src/configs/vm_runner.rs b/core/lib/config/src/configs/vm_runner.rs new file mode 100644 index 000000000000..6250830398eb --- /dev/null +++ b/core/lib/config/src/configs/vm_runner.rs @@ -0,0 +1,16 @@ +use serde::Deserialize; + +#[derive(Debug, Deserialize, Clone, PartialEq, Default)] +pub struct ProtectiveReadsWriterConfig { + /// Path to the RocksDB data directory that serves state cache. + #[serde(default = "ProtectiveReadsWriterConfig::default_protective_reads_db_path")] + pub protective_reads_db_path: String, + /// How many max batches should be processed at the same time. + pub protective_reads_window_size: u32, +} + +impl ProtectiveReadsWriterConfig { + fn default_protective_reads_db_path() -> String { + "./db/protective_reads_writer".to_owned() + } +} diff --git a/core/lib/dal/.sqlx/query-1f38966f65ce0ed8365b969d0a1f125cf30578457040c14fd6882c73a87fb3d6.json b/core/lib/dal/.sqlx/query-1f38966f65ce0ed8365b969d0a1f125cf30578457040c14fd6882c73a87fb3d6.json new file mode 100644 index 000000000000..94a17c87888e --- /dev/null +++ b/core/lib/dal/.sqlx/query-1f38966f65ce0ed8365b969d0a1f125cf30578457040c14fd6882c73a87fb3d6.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n COALESCE(MAX(l1_batch_number), 0) AS \"last_processed_l1_batch!\"\n FROM\n vm_runner_protective_reads\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "last_processed_l1_batch!", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + null + ] + }, + "hash": "1f38966f65ce0ed8365b969d0a1f125cf30578457040c14fd6882c73a87fb3d6" +} diff --git a/core/lib/dal/.sqlx/query-c31632143b459ea6684908ce7a15d03a811221d4ddf26e2e0ddc34147a0d8e23.json b/core/lib/dal/.sqlx/query-c31632143b459ea6684908ce7a15d03a811221d4ddf26e2e0ddc34147a0d8e23.json new file mode 100644 index 000000000000..dcbfb1d0bd24 --- /dev/null +++ b/core/lib/dal/.sqlx/query-c31632143b459ea6684908ce7a15d03a811221d4ddf26e2e0ddc34147a0d8e23.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH\n available_batches AS (\n SELECT\n MAX(number) AS \"last_batch\"\n FROM\n l1_batches\n ),\n processed_batches AS (\n SELECT\n COALESCE(MAX(l1_batch_number), 0) + $1 AS \"last_ready_batch\"\n FROM\n vm_runner_protective_reads\n )\n SELECT\n LEAST(last_batch, last_ready_batch) AS \"last_ready_batch!\"\n FROM\n available_batches\n FULL JOIN processed_batches ON TRUE\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "last_ready_batch!", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + true + ] + }, + "hash": "c31632143b459ea6684908ce7a15d03a811221d4ddf26e2e0ddc34147a0d8e23" +} diff --git a/core/lib/dal/.sqlx/query-f2f1b6c4f4686b423a4c449c56e1687e91d8367347b3830830a4c76407d60bc5.json b/core/lib/dal/.sqlx/query-f2f1b6c4f4686b423a4c449c56e1687e91d8367347b3830830a4c76407d60bc5.json new file mode 100644 index 000000000000..e49cc211cdcd --- /dev/null +++ b/core/lib/dal/.sqlx/query-f2f1b6c4f4686b423a4c449c56e1687e91d8367347b3830830a4c76407d60bc5.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO\n vm_runner_protective_reads (l1_batch_number, created_at, updated_at)\n VALUES\n ($1, NOW(), NOW())\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "f2f1b6c4f4686b423a4c449c56e1687e91d8367347b3830830a4c76407d60bc5" +} diff --git a/core/lib/dal/migrations/20240522215934_add_vm_runner_protective_reads_table.down.sql b/core/lib/dal/migrations/20240522215934_add_vm_runner_protective_reads_table.down.sql new file mode 100644 index 000000000000..773b22aa4fa1 --- /dev/null +++ b/core/lib/dal/migrations/20240522215934_add_vm_runner_protective_reads_table.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS vm_runner_protective_reads; diff --git a/core/lib/dal/migrations/20240522215934_add_vm_runner_protective_reads_table.up.sql b/core/lib/dal/migrations/20240522215934_add_vm_runner_protective_reads_table.up.sql new file mode 100644 index 000000000000..170569508281 --- /dev/null +++ b/core/lib/dal/migrations/20240522215934_add_vm_runner_protective_reads_table.up.sql @@ -0,0 +1,7 @@ +CREATE TABLE IF NOT EXISTS vm_runner_protective_reads +( + l1_batch_number BIGINT NOT NULL PRIMARY KEY, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + time_taken TIME +); diff --git a/core/lib/dal/src/lib.rs b/core/lib/dal/src/lib.rs index f9c585758c4d..8b048a035121 100644 --- a/core/lib/dal/src/lib.rs +++ b/core/lib/dal/src/lib.rs @@ -23,7 +23,7 @@ use crate::{ sync_dal::SyncDal, system_dal::SystemDal, tee_verifier_input_producer_dal::TeeVerifierInputProducerDal, tokens_dal::TokensDal, tokens_web3_dal::TokensWeb3Dal, transactions_dal::TransactionsDal, - transactions_web3_dal::TransactionsWeb3Dal, + transactions_web3_dal::TransactionsWeb3Dal, vm_runner_dal::VmRunnerDal, }; pub mod blocks_dal; @@ -55,6 +55,7 @@ pub mod tokens_dal; pub mod tokens_web3_dal; pub mod transactions_dal; pub mod transactions_web3_dal; +pub mod vm_runner_dal; #[cfg(test)] mod tests; @@ -119,6 +120,8 @@ where fn snapshot_recovery_dal(&mut self) -> SnapshotRecoveryDal<'_, 'a>; fn pruning_dal(&mut self) -> PruningDal<'_, 'a>; + + fn vm_runner_dal(&mut self) -> VmRunnerDal<'_, 'a>; } #[derive(Clone, Debug)] @@ -229,4 +232,8 @@ impl<'a> CoreDal<'a> for Connection<'a, Core> { fn pruning_dal(&mut self) -> PruningDal<'_, 'a> { PruningDal { storage: self } } + + fn vm_runner_dal(&mut self) -> VmRunnerDal<'_, 'a> { + VmRunnerDal { storage: self } + } } diff --git a/core/lib/dal/src/vm_runner_dal.rs b/core/lib/dal/src/vm_runner_dal.rs new file mode 100644 index 000000000000..3693f78a6a7a --- /dev/null +++ b/core/lib/dal/src/vm_runner_dal.rs @@ -0,0 +1,83 @@ +use zksync_db_connection::{connection::Connection, error::DalResult, instrument::InstrumentExt}; +use zksync_types::L1BatchNumber; + +use crate::Core; + +#[derive(Debug)] +pub struct VmRunnerDal<'c, 'a> { + pub(crate) storage: &'c mut Connection<'a, Core>, +} + +impl VmRunnerDal<'_, '_> { + pub async fn get_protective_reads_latest_processed_batch( + &mut self, + ) -> DalResult { + let row = sqlx::query!( + r#" + SELECT + COALESCE(MAX(l1_batch_number), 0) AS "last_processed_l1_batch!" + FROM + vm_runner_protective_reads + "# + ) + .instrument("get_protective_reads_latest_processed_batch") + .report_latency() + .fetch_one(self.storage) + .await?; + Ok(L1BatchNumber(row.last_processed_l1_batch as u32)) + } + + pub async fn get_protective_reads_last_ready_batch( + &mut self, + window_size: u32, + ) -> DalResult { + let row = sqlx::query!( + r#" + WITH + available_batches AS ( + SELECT + MAX(number) AS "last_batch" + FROM + l1_batches + ), + processed_batches AS ( + SELECT + COALESCE(MAX(l1_batch_number), 0) + $1 AS "last_ready_batch" + FROM + vm_runner_protective_reads + ) + SELECT + LEAST(last_batch, last_ready_batch) AS "last_ready_batch!" + FROM + available_batches + FULL JOIN processed_batches ON TRUE + "#, + window_size as i32 + ) + .instrument("get_protective_reads_last_ready_batch") + .report_latency() + .fetch_one(self.storage) + .await?; + Ok(L1BatchNumber(row.last_ready_batch as u32)) + } + + pub async fn mark_protective_reads_batch_as_completed( + &mut self, + l1_batch_number: L1BatchNumber, + ) -> DalResult<()> { + sqlx::query!( + r#" + INSERT INTO + vm_runner_protective_reads (l1_batch_number, created_at, updated_at) + VALUES + ($1, NOW(), NOW()) + "#, + i64::from(l1_batch_number.0), + ) + .instrument("mark_protective_reads_batch_as_completed") + .report_latency() + .execute(self.storage) + .await?; + Ok(()) + } +} diff --git a/core/lib/env_config/src/lib.rs b/core/lib/env_config/src/lib.rs index f6290020f38d..9218467fdaba 100644 --- a/core/lib/env_config/src/lib.rs +++ b/core/lib/env_config/src/lib.rs @@ -24,6 +24,7 @@ mod utils; mod genesis; #[cfg(test)] mod test_utils; +mod vm_runner; mod wallets; pub trait FromEnv: Sized { diff --git a/core/lib/env_config/src/vm_runner.rs b/core/lib/env_config/src/vm_runner.rs new file mode 100644 index 000000000000..8a99ea2dc8e2 --- /dev/null +++ b/core/lib/env_config/src/vm_runner.rs @@ -0,0 +1,9 @@ +use zksync_config::configs::ProtectiveReadsWriterConfig; + +use crate::{envy_load, FromEnv}; + +impl FromEnv for ProtectiveReadsWriterConfig { + fn from_env() -> anyhow::Result { + envy_load("vm_runner.protective_reads", "VM_RUNNER_PROTECTIVE_READS_") + } +} diff --git a/core/lib/protobuf_config/src/general.rs b/core/lib/protobuf_config/src/general.rs index ccd55a71c2ec..ba2076a09a14 100644 --- a/core/lib/protobuf_config/src/general.rs +++ b/core/lib/protobuf_config/src/general.rs @@ -37,6 +37,8 @@ impl ProtoRepr for proto::GeneralConfig { snapshot_creator: read_optional_repr(&self.snapshot_creator) .context("snapshot_creator")?, observability: read_optional_repr(&self.observability).context("observability")?, + protective_reads_writer_config: read_optional_repr(&self.protective_reads_writer) + .context("vm_runner")?, }) } @@ -68,6 +70,10 @@ impl ProtoRepr for proto::GeneralConfig { eth: this.eth.as_ref().map(ProtoRepr::build), snapshot_creator: this.snapshot_creator.as_ref().map(ProtoRepr::build), observability: this.observability.as_ref().map(ProtoRepr::build), + protective_reads_writer: this + .protective_reads_writer_config + .as_ref() + .map(ProtoRepr::build), } } } diff --git a/core/lib/protobuf_config/src/lib.rs b/core/lib/protobuf_config/src/lib.rs index 25d5662b9ddb..2fd9bbd9e059 100644 --- a/core/lib/protobuf_config/src/lib.rs +++ b/core/lib/protobuf_config/src/lib.rs @@ -27,6 +27,7 @@ pub mod testonly; #[cfg(test)] mod tests; mod utils; +mod vm_runner; mod wallets; use std::str::FromStr; diff --git a/core/lib/protobuf_config/src/proto/config/general.proto b/core/lib/protobuf_config/src/proto/config/general.proto index fdc60c57cfdd..b606417d129a 100644 --- a/core/lib/protobuf_config/src/proto/config/general.proto +++ b/core/lib/protobuf_config/src/proto/config/general.proto @@ -13,6 +13,7 @@ import "zksync/config/house_keeper.proto"; import "zksync/config/observability.proto"; import "zksync/config/snapshots_creator.proto"; import "zksync/config/utils.proto"; +import "zksync/config/vm_runner.proto"; message GeneralConfig { optional config.database.Postgres postgres = 1; @@ -35,4 +36,5 @@ message GeneralConfig { optional config.prover.ProverGateway prover_gateway = 30; optional config.snapshot_creator.SnapshotsCreator snapshot_creator = 31; optional config.observability.Observability observability = 32; + optional config.vm_runner.ProtectiveReadsWriter protective_reads_writer = 33; } diff --git a/core/lib/protobuf_config/src/proto/config/vm_runner.proto b/core/lib/protobuf_config/src/proto/config/vm_runner.proto new file mode 100644 index 000000000000..a7c829f05869 --- /dev/null +++ b/core/lib/protobuf_config/src/proto/config/vm_runner.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +package zksync.config.vm_runner; + +message ProtectiveReadsWriter { + optional string protective_reads_db_path = 1; // required; fs path + optional uint64 protective_reads_window_size = 2; // required +} diff --git a/core/lib/protobuf_config/src/vm_runner.rs b/core/lib/protobuf_config/src/vm_runner.rs new file mode 100644 index 000000000000..227e22cd5d22 --- /dev/null +++ b/core/lib/protobuf_config/src/vm_runner.rs @@ -0,0 +1,27 @@ +use anyhow::Context; +use zksync_config::configs::{self}; +use zksync_protobuf::{required, ProtoRepr}; + +use crate::proto::vm_runner as proto; + +impl ProtoRepr for proto::ProtectiveReadsWriter { + type Type = configs::ProtectiveReadsWriterConfig; + + fn read(&self) -> anyhow::Result { + Ok(Self::Type { + protective_reads_db_path: required(&self.protective_reads_db_path) + .context("protective_reads_db_path")? + .clone(), + protective_reads_window_size: *required(&self.protective_reads_window_size) + .context("protective_reads_window_size")? + as u32, + }) + } + + fn build(this: &Self::Type) -> Self { + Self { + protective_reads_db_path: Some(this.protective_reads_db_path.clone()), + protective_reads_window_size: Some(this.protective_reads_window_size as u64), + } + } +} diff --git a/core/lib/zksync_core_leftovers/src/lib.rs b/core/lib/zksync_core_leftovers/src/lib.rs index b0104cc795e3..4f8664ab74dc 100644 --- a/core/lib/zksync_core_leftovers/src/lib.rs +++ b/core/lib/zksync_core_leftovers/src/lib.rs @@ -154,6 +154,8 @@ pub enum Component { Consensus, /// Component generating commitment for L1 batches. CommitmentGenerator, + /// VM runner-based component that saves protective reads to Postgres. + VmRunnerProtectiveReads, } #[derive(Debug)] @@ -190,6 +192,9 @@ impl FromStr for Components { "proof_data_handler" => Ok(Components(vec![Component::ProofDataHandler])), "consensus" => Ok(Components(vec![Component::Consensus])), "commitment_generator" => Ok(Components(vec![Component::CommitmentGenerator])), + "vm_runner_protective_reads" => { + Ok(Components(vec![Component::VmRunnerProtectiveReads])) + } other => Err(format!("{} is not a valid component name", other)), } } diff --git a/core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs b/core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs index cfac1df27cd0..68389228861a 100644 --- a/core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs +++ b/core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs @@ -10,7 +10,7 @@ use zksync_config::{ wallets::{AddressWallet, EthSender, StateKeeper, Wallet, Wallets}, FriProofCompressorConfig, FriProverConfig, FriProverGatewayConfig, FriWitnessGeneratorConfig, FriWitnessVectorGeneratorConfig, GeneralConfig, - ObservabilityConfig, PrometheusConfig, ProofDataHandlerConfig, + ObservabilityConfig, PrometheusConfig, ProofDataHandlerConfig, ProtectiveReadsWriterConfig, }, ApiConfig, ContractVerifierConfig, DBConfig, EthConfig, EthWatchConfig, GasAdjusterConfig, ObjectStoreConfig, PostgresConfig, SnapshotsCreatorConfig, @@ -61,6 +61,7 @@ pub struct TempConfigStore { pub object_store_config: Option, pub observability: Option, pub snapshot_creator: Option, + pub protective_reads_writer_config: Option, } impl TempConfigStore { @@ -86,6 +87,7 @@ impl TempConfigStore { eth: self.eth_sender_config.clone(), snapshot_creator: self.snapshot_creator.clone(), observability: self.observability.clone(), + protective_reads_writer_config: self.protective_reads_writer_config.clone(), } } diff --git a/core/node/node_framework/Cargo.toml b/core/node/node_framework/Cargo.toml index ed7d37c876de..8e2c915d5749 100644 --- a/core/node/node_framework/Cargo.toml +++ b/core/node/node_framework/Cargo.toml @@ -44,6 +44,7 @@ zksync_contract_verification_server.workspace = true zksync_tee_verifier_input_producer.workspace = true zksync_queued_job_processor.workspace = true zksync_reorg_detector.workspace = true +zksync_vm_runner.workspace = true tracing.workspace = true thiserror.workspace = true diff --git a/core/node/node_framework/src/implementations/layers/mod.rs b/core/node/node_framework/src/implementations/layers/mod.rs index 43b1f77e88c8..1c171e84b5ba 100644 --- a/core/node/node_framework/src/implementations/layers/mod.rs +++ b/core/node/node_framework/src/implementations/layers/mod.rs @@ -21,4 +21,5 @@ pub mod reorg_detector_runner; pub mod sigint; pub mod state_keeper; pub mod tee_verifier_input_producer; +pub mod vm_runner; pub mod web3_api; diff --git a/core/node/node_framework/src/implementations/layers/vm_runner/mod.rs b/core/node/node_framework/src/implementations/layers/vm_runner/mod.rs new file mode 100644 index 000000000000..a105ad81ee60 --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/vm_runner/mod.rs @@ -0,0 +1,34 @@ +use zksync_vm_runner::{ConcurrentOutputHandlerFactoryTask, StorageSyncTask, VmRunnerIo}; + +use crate::{ + service::StopReceiver, + task::{Task, TaskId}, +}; + +pub mod protective_reads; + +#[async_trait::async_trait] +impl Task for StorageSyncTask { + fn id(&self) -> TaskId { + format!("vm_runner/{}/storage_sync", self.io().name()).into() + } + + async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { + StorageSyncTask::run(*self, stop_receiver.0.clone()).await?; + stop_receiver.0.changed().await?; + Ok(()) + } +} + +#[async_trait::async_trait] +impl Task for ConcurrentOutputHandlerFactoryTask { + fn id(&self) -> TaskId { + format!("vm_runner/{}/output_handler", self.io().name()).into() + } + + async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { + ConcurrentOutputHandlerFactoryTask::run(*self, stop_receiver.0.clone()).await?; + stop_receiver.0.changed().await?; + Ok(()) + } +} diff --git a/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs b/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs new file mode 100644 index 000000000000..332793031fa5 --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/vm_runner/protective_reads.rs @@ -0,0 +1,86 @@ +use zksync_config::configs::vm_runner::ProtectiveReadsWriterConfig; +use zksync_types::L2ChainId; +use zksync_vm_runner::ProtectiveReadsWriter; + +use crate::{ + implementations::resources::pools::{MasterPool, PoolResource}, + service::{ServiceContext, StopReceiver}, + task::{Task, TaskId}, + wiring_layer::{WiringError, WiringLayer}, +}; + +#[derive(Debug)] +pub struct ProtectiveReadsWriterLayer { + protective_reads_writer_config: ProtectiveReadsWriterConfig, + zksync_network_id: L2ChainId, +} + +impl ProtectiveReadsWriterLayer { + pub fn new( + protective_reads_writer_config: ProtectiveReadsWriterConfig, + zksync_network_id: L2ChainId, + ) -> Self { + Self { + protective_reads_writer_config, + zksync_network_id, + } + } +} + +#[async_trait::async_trait] +impl WiringLayer for ProtectiveReadsWriterLayer { + fn layer_name(&self) -> &'static str { + "vm_runner_protective_reads" + } + + async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { + let master_pool = context.get_resource::>().await?; + + let (protective_reads_writer, tasks) = ProtectiveReadsWriter::new( + // One for `StorageSyncTask` which can hold a long-term connection in case it needs to + // catch up cache. + // + // One for `ConcurrentOutputHandlerFactoryTask`/`VmRunner` as they need occasional access + // to DB for querying last processed batch and last ready to be loaded batch. + // + // `self.protective_reads_writer_config` connections for `ProtectiveReadsOutputHandlerFactory` + // as there can be multiple output handlers holding multi-second connections to write + // large amount of protective reads. + master_pool + .get_custom( + self.protective_reads_writer_config + .protective_reads_window_size + + 2, + ) + .await?, + self.protective_reads_writer_config.protective_reads_db_path, + self.zksync_network_id, + self.protective_reads_writer_config + .protective_reads_window_size, + ) + .await?; + + context.add_task(Box::new(tasks.loader_task)); + context.add_task(Box::new(tasks.output_handler_factory_task)); + context.add_task(Box::new(ProtectiveReadsWriterTask { + protective_reads_writer, + })); + Ok(()) + } +} + +#[derive(Debug)] +struct ProtectiveReadsWriterTask { + protective_reads_writer: ProtectiveReadsWriter, +} + +#[async_trait::async_trait] +impl Task for ProtectiveReadsWriterTask { + fn id(&self) -> TaskId { + "vm_runner/protective_reads_writer".into() + } + + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + self.protective_reads_writer.run(&stop_receiver.0).await + } +} diff --git a/core/node/node_framework/src/task.rs b/core/node/node_framework/src/task.rs index a72d640731ea..8ff73d75d8fa 100644 --- a/core/node/node_framework/src/task.rs +++ b/core/node/node_framework/src/task.rs @@ -60,6 +60,12 @@ impl From<&str> for TaskId { } } +impl From for TaskId { + fn from(value: String) -> Self { + TaskId(value) + } +} + impl Deref for TaskId { type Target = str; diff --git a/core/node/state_keeper/src/updates/mod.rs b/core/node/state_keeper/src/updates/mod.rs index bb33a6f58678..772ee71641a0 100644 --- a/core/node/state_keeper/src/updates/mod.rs +++ b/core/node/state_keeper/src/updates/mod.rs @@ -123,7 +123,7 @@ impl UpdatesManager { ); } - pub(crate) fn finish_batch(&mut self, finished_batch: FinishedL1Batch) { + pub fn finish_batch(&mut self, finished_batch: FinishedL1Batch) { assert!( self.l1_batch.finished.is_none(), "Cannot finish already finished batch" diff --git a/core/node/vm_runner/Cargo.toml b/core/node/vm_runner/Cargo.toml index 67de95f60cb0..b3ede5a796be 100644 --- a/core/node/vm_runner/Cargo.toml +++ b/core/node/vm_runner/Cargo.toml @@ -17,6 +17,7 @@ zksync_contracts.workspace = true zksync_state.workspace = true zksync_storage.workspace = true zksync_state_keeper.workspace = true +zksync_utils.workspace = true vm_utils.workspace = true tokio = { workspace = true, features = ["time"] } @@ -30,7 +31,6 @@ dashmap.workspace = true zksync_node_test_utils.workspace = true zksync_node_genesis.workspace = true zksync_test_account.workspace = true -zksync_utils.workspace = true backon.workspace = true futures = { workspace = true, features = ["compat"] } rand.workspace = true diff --git a/core/node/vm_runner/src/impls/mod.rs b/core/node/vm_runner/src/impls/mod.rs new file mode 100644 index 000000000000..70d01f6932ef --- /dev/null +++ b/core/node/vm_runner/src/impls/mod.rs @@ -0,0 +1,3 @@ +mod protective_reads; + +pub use protective_reads::{ProtectiveReadsWriter, ProtectiveReadsWriterTasks}; diff --git a/core/node/vm_runner/src/impls/protective_reads.rs b/core/node/vm_runner/src/impls/protective_reads.rs new file mode 100644 index 000000000000..03a5f1254aa6 --- /dev/null +++ b/core/node/vm_runner/src/impls/protective_reads.rs @@ -0,0 +1,193 @@ +use std::sync::Arc; + +use anyhow::Context; +use async_trait::async_trait; +use tokio::sync::watch; +use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; +use zksync_state_keeper::{MainBatchExecutor, StateKeeperOutputHandler, UpdatesManager}; +use zksync_types::{zk_evm_types::LogQuery, AccountTreeId, L1BatchNumber, L2ChainId, StorageKey}; +use zksync_utils::u256_to_h256; + +use crate::{ + storage::StorageSyncTask, ConcurrentOutputHandlerFactory, ConcurrentOutputHandlerFactoryTask, + OutputHandlerFactory, VmRunner, VmRunnerIo, VmRunnerStorage, +}; + +/// A standalone component that writes protective reads asynchronously to state keeper. +#[derive(Debug)] +pub struct ProtectiveReadsWriter { + vm_runner: VmRunner, +} + +impl ProtectiveReadsWriter { + /// Create a new protective reads writer from the provided DB parameters and window size which + /// regulates how many batches this component can handle at the same time. + pub async fn new( + pool: ConnectionPool, + rocksdb_path: String, + chain_id: L2ChainId, + window_size: u32, + ) -> anyhow::Result<(Self, ProtectiveReadsWriterTasks)> { + let io = ProtectiveReadsIo { window_size }; + let (loader, loader_task) = + VmRunnerStorage::new(pool.clone(), rocksdb_path, io.clone(), chain_id).await?; + let output_handler_factory = ProtectiveReadsOutputHandlerFactory { pool: pool.clone() }; + let (output_handler_factory, output_handler_factory_task) = + ConcurrentOutputHandlerFactory::new(pool.clone(), io.clone(), output_handler_factory); + let batch_processor = MainBatchExecutor::new(false, false); + let vm_runner = VmRunner::new( + pool, + Box::new(io), + Arc::new(loader), + Box::new(output_handler_factory), + Box::new(batch_processor), + ); + Ok(( + Self { vm_runner }, + ProtectiveReadsWriterTasks { + loader_task, + output_handler_factory_task, + }, + )) + } + + /// Continuously loads new available batches and writes the corresponding protective reads + /// produced by that batch. + /// + /// # Errors + /// + /// Propagates RocksDB and Postgres errors. + pub async fn run(self, stop_receiver: &watch::Receiver) -> anyhow::Result<()> { + self.vm_runner.run(stop_receiver).await + } +} + +/// A collections of tasks that need to be run in order for protective reads writer to work as +/// intended. +#[derive(Debug)] +pub struct ProtectiveReadsWriterTasks { + /// Task that synchronizes storage with new available batches. + pub loader_task: StorageSyncTask, + /// Task that handles output from processed batches. + pub output_handler_factory_task: ConcurrentOutputHandlerFactoryTask, +} + +#[derive(Debug, Clone)] +pub struct ProtectiveReadsIo { + window_size: u32, +} + +#[async_trait] +impl VmRunnerIo for ProtectiveReadsIo { + fn name(&self) -> &'static str { + "protective_reads_writer" + } + + async fn latest_processed_batch( + &self, + conn: &mut Connection<'_, Core>, + ) -> anyhow::Result { + Ok(conn + .vm_runner_dal() + .get_protective_reads_latest_processed_batch() + .await?) + } + + async fn last_ready_to_be_loaded_batch( + &self, + conn: &mut Connection<'_, Core>, + ) -> anyhow::Result { + Ok(conn + .vm_runner_dal() + .get_protective_reads_last_ready_batch(self.window_size) + .await?) + } + + async fn mark_l1_batch_as_completed( + &self, + conn: &mut Connection<'_, Core>, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result<()> { + Ok(conn + .vm_runner_dal() + .mark_protective_reads_batch_as_completed(l1_batch_number) + .await?) + } +} + +#[derive(Debug)] +struct ProtectiveReadsOutputHandler { + pool: ConnectionPool, +} + +#[async_trait] +impl StateKeeperOutputHandler for ProtectiveReadsOutputHandler { + async fn handle_l2_block(&mut self, _updates_manager: &UpdatesManager) -> anyhow::Result<()> { + Ok(()) + } + + async fn handle_l1_batch( + &mut self, + updates_manager: Arc, + ) -> anyhow::Result<()> { + let finished_batch = updates_manager + .l1_batch + .finished + .as_ref() + .context("L1 batch is not actually finished")?; + let (_, protective_reads): (Vec, Vec) = finished_batch + .final_execution_state + .deduplicated_storage_log_queries + .iter() + .partition(|log_query| log_query.rw_flag); + + let mut connection = self + .pool + .connection_tagged("protective_reads_writer") + .await?; + let mut expected_protective_reads = connection + .storage_logs_dedup_dal() + .get_protective_reads_for_l1_batch(updates_manager.l1_batch.number) + .await?; + + for protective_read in protective_reads { + let address = AccountTreeId::new(protective_read.address); + let key = u256_to_h256(protective_read.key); + if !expected_protective_reads.remove(&StorageKey::new(address, key)) { + tracing::error!( + l1_batch_number = %updates_manager.l1_batch.number, + address = %protective_read.address, + key = %key, + "VM runner produced a protective read that did not happen in state keeper" + ); + } + } + for remaining_read in expected_protective_reads { + tracing::error!( + l1_batch_number = %updates_manager.l1_batch.number, + address = %remaining_read.address(), + key = %remaining_read.key(), + "State keeper produced a protective read that did not happen in VM runner" + ); + } + + Ok(()) + } +} + +#[derive(Debug)] +struct ProtectiveReadsOutputHandlerFactory { + pool: ConnectionPool, +} + +#[async_trait] +impl OutputHandlerFactory for ProtectiveReadsOutputHandlerFactory { + async fn create_handler( + &mut self, + _l1_batch_number: L1BatchNumber, + ) -> anyhow::Result> { + Ok(Box::new(ProtectiveReadsOutputHandler { + pool: self.pool.clone(), + })) + } +} diff --git a/core/node/vm_runner/src/lib.rs b/core/node/vm_runner/src/lib.rs index 4664d4eb8e11..ca9f8bdc0eb4 100644 --- a/core/node/vm_runner/src/lib.rs +++ b/core/node/vm_runner/src/lib.rs @@ -3,6 +3,7 @@ #![warn(missing_debug_implementations, missing_docs)] +mod impls; mod io; mod output_handler; mod process; @@ -11,9 +12,10 @@ mod storage; #[cfg(test)] mod tests; +pub use impls::{ProtectiveReadsWriter, ProtectiveReadsWriterTasks}; pub use io::VmRunnerIo; pub use output_handler::{ ConcurrentOutputHandlerFactory, ConcurrentOutputHandlerFactoryTask, OutputHandlerFactory, }; pub use process::VmRunner; -pub use storage::{BatchExecuteData, VmRunnerStorage}; +pub use storage::{BatchExecuteData, StorageSyncTask, VmRunnerStorage}; diff --git a/core/node/vm_runner/src/output_handler.rs b/core/node/vm_runner/src/output_handler.rs index 30fe9e0c9010..49bed83cd96e 100644 --- a/core/node/vm_runner/src/output_handler.rs +++ b/core/node/vm_runner/src/output_handler.rs @@ -203,6 +203,11 @@ impl Debug for ConcurrentOutputHandlerFactoryTask { } impl ConcurrentOutputHandlerFactoryTask { + /// Access the underlying [`VmRunnerIo`]. + pub fn io(&self) -> &Io { + &self.io + } + /// Starts running the task which is supposed to last until the end of the node's lifetime. /// /// # Errors diff --git a/core/node/vm_runner/src/process.rs b/core/node/vm_runner/src/process.rs index 5ff7d7cc0b87..5e51b5e658f7 100644 --- a/core/node/vm_runner/src/process.rs +++ b/core/node/vm_runner/src/process.rs @@ -109,10 +109,11 @@ impl VmRunner { .await .context("VM runner failed to handle L2 block")?; } - batch_executor + let finished_batch = batch_executor .finish_batch() .await .context("failed finishing L1 batch in executor")?; + updates_manager.finish_batch(finished_batch); output_handler .handle_l1_batch(Arc::new(updates_manager)) .await diff --git a/core/node/vm_runner/src/storage.rs b/core/node/vm_runner/src/storage.rs index 5ffd1d11e70d..e7a8b147c76f 100644 --- a/core/node/vm_runner/src/storage.rs +++ b/core/node/vm_runner/src/storage.rs @@ -271,6 +271,17 @@ impl StorageSyncTask { }) } + /// Access the underlying [`VmRunnerIo`]. + pub fn io(&self) -> &Io { + &self.io + } + + /// Block until RocksDB cache instance is caught up with Postgres and then continuously makes + /// sure that the new ready batches are loaded into the cache. + /// + /// # Errors + /// + /// Propagates RocksDB and Postgres errors. pub async fn run(self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { const SLEEP_INTERVAL: Duration = Duration::from_millis(50); @@ -289,10 +300,10 @@ impl StorageSyncTask { if rocksdb_builder.l1_batch_number().await == Some(latest_processed_batch + 1) { // RocksDB is already caught up, we might not need to do anything. // Just need to check that the memory diff is up-to-date in case this is a fresh start. + let last_ready_batch = self.io.last_ready_to_be_loaded_batch(&mut conn).await?; let state = self.state.read().await; - if state - .storage - .contains_key(&self.io.last_ready_to_be_loaded_batch(&mut conn).await?) + if last_ready_batch == latest_processed_batch + || state.storage.contains_key(&last_ready_batch) { // No need to do anything, killing time until last processed batch is updated. drop(conn); diff --git a/etc/env/base/vm_runner.toml b/etc/env/base/vm_runner.toml new file mode 100644 index 000000000000..d9e10e8b357d --- /dev/null +++ b/etc/env/base/vm_runner.toml @@ -0,0 +1,9 @@ +# Configuration for the VM runner crate + +[vm_runner] + +[vm_runner.protective_reads] +# Path to the directory that contains RocksDB with protective reads writer cache. +protective_reads_db_path = "./db/main/protective_reads" +# Amount of batches that can be processed in parallel. +protective_reads_window_size = 3 diff --git a/etc/env/file_based/general.yaml b/etc/env/file_based/general.yaml index d59da18d1266..fdccdf03b5f7 100644 --- a/etc/env/file_based/general.yaml +++ b/etc/env/file_based/general.yaml @@ -321,3 +321,7 @@ observability: opentelemetry: endpoint: unset level: debug + +protective_reads_writer: + protective_reads_db_path: "./db/main/protective_reads" + protective_reads_window_size: 3