diff --git a/.github/workflows/ci-core-reusable.yml b/.github/workflows/ci-core-reusable.yml index 0e61d7b5a99d..e03608a931f7 100644 --- a/.github/workflows/ci-core-reusable.yml +++ b/.github/workflows/ci-core-reusable.yml @@ -135,7 +135,7 @@ jobs: base_token: ["Eth", "Custom"] deployment_mode: ["Rollup", "Validium"] env: - SERVER_COMPONENTS: "api,tree,eth,state_keeper,housekeeper,commitment_generator,vm_runner_protective_reads${{ matrix.consensus && ',consensus' || '' }}" + SERVER_COMPONENTS: "api,tree,eth,state_keeper,housekeeper,commitment_generator,vm_runner_protective_reads,da_dispatcher${{ matrix.consensus && ',consensus' || '' }}" runs-on: [matterlabs-ci-runner] steps: @@ -288,6 +288,10 @@ jobs: if: always() run: ci_run cat core/tests/upgrade-test/upgrade.log || true + - name: Show fee-projection.log logs + if: always() + run: ci_run cat core/tests/ts-integration/fees.log || true + - name: Show sccache logs if: always() run: | @@ -305,7 +309,7 @@ jobs: runs-on: [matterlabs-ci-runner] env: - SERVER_COMPONENTS: "api,tree,eth,state_keeper,housekeeper,commitment_generator,vm_runner_protective_reads${{ matrix.consensus && ',consensus' || '' }}" + SERVER_COMPONENTS: "api,tree,eth,state_keeper,housekeeper,commitment_generator,vm_runner_protective_reads,da_dispatcher${{ matrix.consensus && ',consensus' || '' }}" EXT_NODE_FLAGS: "${{ matrix.consensus && '-- --enable-consensus' || '' }}" steps: diff --git a/Cargo.lock b/Cargo.lock index 30dae0d1f987..dd57e952ea22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8382,6 +8382,36 @@ dependencies = [ "zksync_utils", ] +[[package]] +name = "zksync_da_client" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "serde", + "tracing", + "zksync_config", + "zksync_types", +] + +[[package]] +name = "zksync_da_dispatcher" +version = "0.1.0" +dependencies = [ + "anyhow", + "chrono", + "futures 0.3.28", + "rand 0.8.5", + "tokio", + "tracing", + "vise", + "zksync_config", + "zksync_da_client", + "zksync_dal", + "zksync_types", + "zksync_utils", +] + [[package]] name = "zksync_dal" version = "0.1.0" @@ -8434,6 +8464,23 @@ dependencies = [ "zksync_health_check", ] +[[package]] +name = "zksync_default_da_clients" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "flate2", + "serde", + "tracing", + "zksync_config", + "zksync_da_client", + "zksync_env_config", + "zksync_node_framework", + "zksync_object_store", + "zksync_types", +] + [[package]] name = "zksync_env_config" version = "0.1.0" @@ -8900,6 +8947,8 @@ dependencies = [ "zksync_consistency_checker", "zksync_contract_verification_server", "zksync_contracts", + "zksync_da_client", + "zksync_da_dispatcher", "zksync_dal", "zksync_db_connection", "zksync_env_config", @@ -9188,6 +9237,7 @@ dependencies = [ "zksync_consensus_executor", "zksync_consensus_roles", "zksync_core_leftovers", + "zksync_default_da_clients", "zksync_env_config", "zksync_eth_client", "zksync_metadata_calculator", diff --git a/Cargo.toml b/Cargo.toml index 665f7ff06564..b1ec4a864856 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ members = [ "core/node/shared_metrics", "core/node/db_pruner", "core/node/fee_model", + "core/node/da_dispatcher", "core/node/eth_sender", "core/node/vm_runner", "core/node/test_utils", @@ -44,6 +45,8 @@ members = [ "core/lib/circuit_breaker", "core/lib/dal", "core/lib/env_config", + "core/lib/da_client", + "core/lib/default_da_clients", "core/lib/eth_client", "core/lib/eth_signer", "core/lib/l1_contract_interface", @@ -223,6 +226,8 @@ zksync_dal = { path = "core/lib/dal" } zksync_db_connection = { path = "core/lib/db_connection" } zksync_env_config = { path = "core/lib/env_config" } zksync_eth_client = { path = "core/lib/eth_client" } +zksync_da_client = { path = "core/lib/da_client" } +zksync_default_da_clients = { path = "core/lib/default_da_clients" } zksync_eth_signer = { path = "core/lib/eth_signer" } zksync_health_check = { path = "core/lib/health_check" } zksync_l1_contract_interface = { path = "core/lib/l1_contract_interface" } @@ -254,6 +259,7 @@ zksync_block_reverter = { path = "core/node/block_reverter" } zksync_commitment_generator = { path = "core/node/commitment_generator" } zksync_house_keeper = { path = "core/node/house_keeper" } zksync_node_genesis = { path = "core/node/genesis" } +zksync_da_dispatcher = { path = "core/node/da_dispatcher" } zksync_eth_sender = { path = "core/node/eth_sender" } zksync_node_db_pruner = { path = "core/node/db_pruner" } zksync_node_fee_model = { path = "core/node/fee_model" } diff --git a/core/bin/zksync_server/Cargo.toml b/core/bin/zksync_server/Cargo.toml index e3fd6752b5e0..d9b8b530247d 100644 --- a/core/bin/zksync_server/Cargo.toml +++ b/core/bin/zksync_server/Cargo.toml @@ -20,6 +20,7 @@ zksync_utils.workspace = true zksync_types.workspace = true zksync_core_leftovers.workspace = true zksync_node_genesis.workspace = true +zksync_default_da_clients.workspace = true # Consensus dependenices zksync_consensus_crypto.workspace = true diff --git a/core/bin/zksync_server/src/main.rs b/core/bin/zksync_server/src/main.rs index dcd9f3718352..dae87e016636 100644 --- a/core/bin/zksync_server/src/main.rs +++ b/core/bin/zksync_server/src/main.rs @@ -16,8 +16,8 @@ use zksync_config::{ L1Secrets, ObservabilityConfig, PrometheusConfig, ProofDataHandlerConfig, ProtectiveReadsWriterConfig, Secrets, }, - ApiConfig, ContractVerifierConfig, DBConfig, EthConfig, EthWatchConfig, GasAdjusterConfig, - GenesisConfig, ObjectStoreConfig, PostgresConfig, SnapshotsCreatorConfig, + ApiConfig, ContractVerifierConfig, DADispatcherConfig, DBConfig, EthConfig, EthWatchConfig, + GasAdjusterConfig, GenesisConfig, ObjectStoreConfig, PostgresConfig, SnapshotsCreatorConfig, }; use zksync_core_leftovers::{ genesis_init, is_genesis_needed, @@ -47,7 +47,7 @@ struct Cli { /// Comma-separated list of components to launch. #[arg( long, - default_value = "api,tree,eth,state_keeper,housekeeper,tee_verifier_input_producer,commitment_generator" + default_value = "api,tree,eth,state_keeper,housekeeper,tee_verifier_input_producer,commitment_generator,da_dispatcher" )] components: ComponentsToRun, /// Path to the yaml config. If set, it will be used instead of env vars. @@ -268,6 +268,7 @@ fn load_env_config() -> anyhow::Result { gas_adjuster_config: GasAdjusterConfig::from_env().ok(), observability: ObservabilityConfig::from_env().ok(), snapshot_creator: SnapshotsCreatorConfig::from_env().ok(), + da_dispatcher_config: DADispatcherConfig::from_env().ok(), protective_reads_writer_config: ProtectiveReadsWriterConfig::from_env().ok(), core_object_store: ObjectStoreConfig::from_env().ok(), commitment_generator: None, diff --git a/core/bin/zksync_server/src/node_builder.rs b/core/bin/zksync_server/src/node_builder.rs index 2e5a70011b8d..b7ceadaaee6d 100644 --- a/core/bin/zksync_server/src/node_builder.rs +++ b/core/bin/zksync_server/src/node_builder.rs @@ -3,10 +3,17 @@ use anyhow::Context; use zksync_config::{ - configs::{consensus::ConsensusConfig, wallets::Wallets, GeneralConfig, Secrets}, + configs::{ + consensus::ConsensusConfig, eth_sender::PubdataSendingMode, wallets::Wallets, + GeneralConfig, Secrets, + }, ContractsConfig, GenesisConfig, }; use zksync_core_leftovers::Component; +use zksync_default_da_clients::{ + no_da::wiring_layer::NoDAClientWiringLayer, + object_store::{config::DAObjectStoreConfig, wiring_layer::ObjectStorageClientWiringLayer}, +}; use zksync_metadata_calculator::MetadataCalculatorConfig; use zksync_node_api_server::{ tx_sender::{ApiContracts, TxSenderConfig}, @@ -18,6 +25,7 @@ use zksync_node_framework::{ commitment_generator::CommitmentGeneratorLayer, consensus::{ConsensusLayer, Mode as ConsensusMode}, contract_verification_api::ContractVerificationApiLayer, + da_dispatcher::DataAvailabilityDispatcherLayer, eth_sender::{EthTxAggregatorLayer, EthTxManagerLayer}, eth_watch::EthWatchLayer, healtcheck_server::HealthCheckLayer, @@ -444,6 +452,38 @@ impl MainNodeBuilder { Ok(self) } + fn add_no_da_client_layer(mut self) -> anyhow::Result { + self.node.add_layer(NoDAClientWiringLayer); + Ok(self) + } + + #[allow(dead_code)] + fn add_object_storage_da_client_layer(mut self) -> anyhow::Result { + let object_store_config = DAObjectStoreConfig::from_env()?; + self.node + .add_layer(ObjectStorageClientWiringLayer::new(object_store_config.0)); + Ok(self) + } + + fn add_da_dispatcher_layer(mut self) -> anyhow::Result { + let eth_sender_config = try_load_config!(self.configs.eth); + if let Some(sender_config) = eth_sender_config.sender { + if sender_config.pubdata_sending_mode != PubdataSendingMode::Custom { + tracing::warn!("DA dispatcher is enabled, but the pubdata sending mode is not `Custom`. DA dispatcher will not be started."); + return Ok(self); + } + } + + let state_keeper_config = try_load_config!(self.configs.state_keeper_config); + let da_config = try_load_config!(self.configs.da_dispatcher_config); + self.node.add_layer(DataAvailabilityDispatcherLayer::new( + state_keeper_config, + da_config, + )); + + Ok(self) + } + fn add_vm_runner_protective_reads_layer(mut self) -> anyhow::Result { let protective_reads_writer_config = try_load_config!(self.configs.protective_reads_writer_config); @@ -539,6 +579,9 @@ impl MainNodeBuilder { Component::CommitmentGenerator => { self = self.add_commitment_generator_layer()?; } + Component::DADispatcher => { + self = self.add_no_da_client_layer()?.add_da_dispatcher_layer()?; + } Component::VmRunnerProtectiveReads => { self = self.add_vm_runner_protective_reads_layer()?; } diff --git a/core/lib/config/src/configs/chain.rs b/core/lib/config/src/configs/chain.rs index 868b5046edbf..53884c4a7227 100644 --- a/core/lib/config/src/configs/chain.rs +++ b/core/lib/config/src/configs/chain.rs @@ -105,7 +105,12 @@ pub struct StateKeeperConfig { pub batch_overhead_l1_gas: u64, /// The maximum amount of gas that can be used by the batch. This value is derived from the circuits limitation per batch. pub max_gas_per_batch: u64, - /// The maximum amount of pubdata that can be used by the batch. Note that if the calldata is used as pubdata, this variable should not exceed 128kb. + /// The maximum amount of pubdata that can be used by the batch. + /// This variable should not exceed: + /// - 128kb for calldata-based rollups + /// - 120kb * n, where `n` is a number of blobs for blob-based rollups + /// - the DA layer's blob size limit for the DA layer-based validiums + /// - 100 MB for the object store-based or no-da validiums pub max_pubdata_per_batch: u64, /// The version of the fee model to use. diff --git a/core/lib/config/src/configs/da_dispatcher.rs b/core/lib/config/src/configs/da_dispatcher.rs new file mode 100644 index 000000000000..303a2c0b54c1 --- /dev/null +++ b/core/lib/config/src/configs/da_dispatcher.rs @@ -0,0 +1,43 @@ +use std::time::Duration; + +use serde::Deserialize; + +pub const DEFAULT_POLLING_INTERVAL_MS: u32 = 5000; +pub const DEFAULT_MAX_ROWS_TO_DISPATCH: u32 = 100; +pub const DEFAULT_MAX_RETRIES: u16 = 5; + +#[derive(Debug, Clone, PartialEq, Deserialize)] +pub struct DADispatcherConfig { + /// The interval between the `da_dispatcher's` iterations. + pub polling_interval_ms: Option, + /// The maximum number of rows to query from the database in a single query. + pub max_rows_to_dispatch: Option, + /// The maximum number of retries for the dispatch of a blob. + pub max_retries: Option, +} + +impl DADispatcherConfig { + pub fn for_tests() -> Self { + Self { + polling_interval_ms: Some(DEFAULT_POLLING_INTERVAL_MS), + max_rows_to_dispatch: Some(DEFAULT_MAX_ROWS_TO_DISPATCH), + max_retries: Some(DEFAULT_MAX_RETRIES), + } + } + + pub fn polling_interval(&self) -> Duration { + match self.polling_interval_ms { + Some(interval) => Duration::from_millis(interval as u64), + None => Duration::from_millis(DEFAULT_POLLING_INTERVAL_MS as u64), + } + } + + pub fn max_rows_to_dispatch(&self) -> u32 { + self.max_rows_to_dispatch + .unwrap_or(DEFAULT_MAX_ROWS_TO_DISPATCH) + } + + pub fn max_retries(&self) -> u16 { + self.max_retries.unwrap_or(DEFAULT_MAX_RETRIES) + } +} diff --git a/core/lib/config/src/configs/eth_sender.rs b/core/lib/config/src/configs/eth_sender.rs index 58b81fa0a145..92836c74b1c6 100644 --- a/core/lib/config/src/configs/eth_sender.rs +++ b/core/lib/config/src/configs/eth_sender.rs @@ -81,6 +81,7 @@ pub enum PubdataSendingMode { #[default] Calldata, Blobs, + Custom, } #[derive(Debug, Deserialize, Clone, PartialEq)] @@ -114,7 +115,7 @@ pub struct SenderConfig { // Max acceptable fee for sending tx it acts as a safeguard to prevent sending tx with very high fees. pub max_acceptable_priority_fee_in_gwei: u64, - /// The mode in which we send pubdata, either Calldata or Blobs + /// The mode in which we send pubdata: Calldata, Blobs or Custom (DA layers, Object Store, etc.) pub pubdata_sending_mode: PubdataSendingMode, } diff --git a/core/lib/config/src/configs/general.rs b/core/lib/config/src/configs/general.rs index 312f404225cb..25aaa442c950 100644 --- a/core/lib/config/src/configs/general.rs +++ b/core/lib/config/src/configs/general.rs @@ -1,6 +1,7 @@ use crate::{ configs::{ chain::{CircuitBreakerConfig, MempoolConfig, OperationsManagerConfig, StateKeeperConfig}, + da_dispatcher::DADispatcherConfig, fri_prover_group::FriProverGroupConfig, house_keeper::HouseKeeperConfig, pruning::PruningConfig, @@ -36,6 +37,7 @@ pub struct GeneralConfig { pub eth: Option, pub snapshot_creator: Option, pub observability: Option, + pub da_dispatcher_config: Option, pub protective_reads_writer_config: Option, pub commitment_generator: Option, pub snapshot_recovery: Option, diff --git a/core/lib/config/src/configs/mod.rs b/core/lib/config/src/configs/mod.rs index 9e04f483357f..6bfa874d951d 100644 --- a/core/lib/config/src/configs/mod.rs +++ b/core/lib/config/src/configs/mod.rs @@ -4,6 +4,7 @@ pub use self::{ commitment_generator::CommitmentGeneratorConfig, contract_verifier::ContractVerifierConfig, contracts::{ContractsConfig, EcosystemContracts}, + da_dispatcher::DADispatcherConfig, database::{DBConfig, PostgresConfig}, eth_sender::{EthConfig, GasAdjusterConfig}, eth_watch::EthWatchConfig, @@ -32,6 +33,7 @@ mod commitment_generator; pub mod consensus; pub mod contract_verifier; pub mod contracts; +pub mod da_dispatcher; pub mod database; pub mod en_config; pub mod eth_sender; diff --git a/core/lib/config/src/lib.rs b/core/lib/config/src/lib.rs index 66656e60b702..1d74e51b6728 100644 --- a/core/lib/config/src/lib.rs +++ b/core/lib/config/src/lib.rs @@ -1,8 +1,9 @@ #![allow(clippy::upper_case_acronyms, clippy::derive_partial_eq_without_eq)] pub use crate::configs::{ - ApiConfig, ContractVerifierConfig, ContractsConfig, DBConfig, EthConfig, EthWatchConfig, - GasAdjusterConfig, GenesisConfig, ObjectStoreConfig, PostgresConfig, SnapshotsCreatorConfig, + ApiConfig, ContractVerifierConfig, ContractsConfig, DADispatcherConfig, DBConfig, EthConfig, + EthWatchConfig, GasAdjusterConfig, GenesisConfig, ObjectStoreConfig, PostgresConfig, + SnapshotsCreatorConfig, }; pub mod configs; diff --git a/core/lib/config/src/testonly.rs b/core/lib/config/src/testonly.rs index a05b3d096253..8db71e2c8e7f 100644 --- a/core/lib/config/src/testonly.rs +++ b/core/lib/config/src/testonly.rs @@ -350,9 +350,10 @@ impl Distribution for EncodeDist { impl Distribution for EncodeDist { fn sample(&self, rng: &mut R) -> configs::eth_sender::PubdataSendingMode { type T = configs::eth_sender::PubdataSendingMode; - match rng.gen_range(0..2) { + match rng.gen_range(0..3) { 0 => T::Calldata, - _ => T::Blobs, + 1 => T::Blobs, + _ => T::Custom, } } } diff --git a/core/lib/da_client/Cargo.toml b/core/lib/da_client/Cargo.toml new file mode 100644 index 000000000000..da118058eab5 --- /dev/null +++ b/core/lib/da_client/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "zksync_da_client" +version.workspace = true +edition.workspace = true +authors.workspace = true +homepage.workspace = true +repository.workspace = true +license.workspace = true +keywords.workspace = true +categories.workspace = true + +[dependencies] +serde = { workspace = true, features = ["derive"] } +tracing.workspace = true +async-trait.workspace = true +anyhow.workspace = true + +zksync_config.workspace = true +zksync_types.workspace = true diff --git a/core/lib/da_client/README.md b/core/lib/da_client/README.md new file mode 100644 index 000000000000..9c890498467d --- /dev/null +++ b/core/lib/da_client/README.md @@ -0,0 +1,16 @@ +# Data Availability Client + +This crate contains a trait that has to be implemented by all the DA clients. + +## Overview + +This trait assumes that every implementation follows these logical assumptions: + +- The DA client is only serving as a connector between the ZK chain's sequencer and the DA layer. +- The DA client is not supposed to be a standalone application, but rather a library that is used by the + `da_dispatcher`. +- The logic of the retries is implemented in the `da_dispatcher`, not in the DA clients. +- The `dispatch_blob` is supposed to be idempotent, and work correctly even if called multiple times with the same + params. +- The `get_inclusion_data` has to return the data only when the state roots are relayed to the L1 verification contract + (if the DA solution has one). diff --git a/core/lib/da_client/src/lib.rs b/core/lib/da_client/src/lib.rs new file mode 100644 index 000000000000..7e4a2643a259 --- /dev/null +++ b/core/lib/da_client/src/lib.rs @@ -0,0 +1,32 @@ +pub mod types; + +use std::fmt; + +use async_trait::async_trait; +use types::{DAError, DispatchResponse, InclusionData}; + +/// Trait that defines the interface for the data availability layer clients. +#[async_trait] +pub trait DataAvailabilityClient: Sync + Send + fmt::Debug { + /// Dispatches a blob to the data availability layer. + async fn dispatch_blob( + &self, + batch_number: u32, + data: Vec, + ) -> Result; + + /// Fetches the inclusion data for a given blob_id. + async fn get_inclusion_data(&self, blob_id: &str) -> Result, DAError>; + + /// Clones the client and wraps it in a Box. + fn clone_boxed(&self) -> Box; + + /// Returns the maximum size of the blob (in bytes) that can be dispatched. None means no limit. + fn blob_size_limit(&self) -> Option; +} + +impl Clone for Box { + fn clone(&self) -> Box { + self.clone_boxed() + } +} diff --git a/core/lib/da_client/src/types.rs b/core/lib/da_client/src/types.rs new file mode 100644 index 000000000000..e339111bb51a --- /dev/null +++ b/core/lib/da_client/src/types.rs @@ -0,0 +1,44 @@ +use std::{error, fmt::Display}; + +use serde::Serialize; + +/// `DAError` is the error type returned by the DA clients. +#[derive(Debug)] +pub struct DAError { + pub error: anyhow::Error, + pub is_transient: bool, +} + +impl DAError { + pub fn is_transient(&self) -> bool { + self.is_transient + } +} + +impl Display for DAError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let kind = if self.is_transient { + "transient" + } else { + "fatal" + }; + write!(f, "{kind} data availability client error: {}", self.error) + } +} + +impl error::Error for DAError {} + +/// `DispatchResponse` is the response received from the DA layer after dispatching a blob. +#[derive(Default)] +pub struct DispatchResponse { + /// The blob_id is needed to fetch the inclusion data. + pub blob_id: String, +} + +/// `InclusionData` is the data needed to verify on L1 that a blob is included in the DA layer. +#[derive(Default, Serialize)] +pub struct InclusionData { + /// The inclusion data serialized by the DA client. Serialization is done in a way that allows + /// the deserialization of the data in Solidity contracts. + pub data: Vec, +} diff --git a/core/lib/dal/.sqlx/.query-05be1a2c5cefcb1a58af2e5113e89003638d26219c13bc176e8cfee696d4e9f6.json.nPBbNl b/core/lib/dal/.sqlx/.query-05be1a2c5cefcb1a58af2e5113e89003638d26219c13bc176e8cfee696d4e9f6.json.nPBbNl deleted file mode 100644 index 69a1077452dd..000000000000 --- a/core/lib/dal/.sqlx/.query-05be1a2c5cefcb1a58af2e5113e89003638d26219c13bc176e8cfee696d4e9f6.json.nPBbNl +++ /dev/null @@ -1,119 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n transactions.hash AS tx_hash,\n transactions.index_in_block AS index_in_block,\n miniblocks.number AS block_number,\n transactions.nonce AS nonce,\n transactions.signature AS signature,\n transactions.initiator_address AS initiator_address,\n transactions.tx_format AS tx_format,\n transactions.value AS value,\n transactions.gas_limit AS gas_limit,\n transactions.max_fee_per_gas AS max_fee_per_gas,\n transactions.max_priority_fee_per_gas AS max_priority_fee_per_gas,\n transactions.effective_gas_price AS effective_gas_price,\n transactions.l1_batch_number AS l1_batch_number,\n transactions.l1_batch_tx_index AS l1_batch_tx_index,\n transactions.data->'contractAddress' AS \"execute_contract_address\",\n transactions.data->'calldata' AS \"calldata\",\n miniblocks.hash AS \"block_hash\"\n FROM transactions\n LEFT JOIN miniblocks ON miniblocks.number = transactions.miniblock_number\n WHERE\n miniblocks.number = $1 AND transactions.index_in_block = $2 AND transactions.data != '{}'::jsonb", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "tx_hash", - "type_info": "Bytea" - }, - { - "ordinal": 1, - "name": "index_in_block", - "type_info": "Int4" - }, - { - "ordinal": 2, - "name": "block_number", - "type_info": "Int8" - }, - { - "ordinal": 3, - "name": "nonce", - "type_info": "Int8" - }, - { - "ordinal": 4, - "name": "signature", - "type_info": "Bytea" - }, - { - "ordinal": 5, - "name": "initiator_address", - "type_info": "Bytea" - }, - { - "ordinal": 6, - "name": "tx_format", - "type_info": "Int4" - }, - { - "ordinal": 7, - "name": "value", - "type_info": "Numeric" - }, - { - "ordinal": 8, - "name": "gas_limit", - "type_info": "Numeric" - }, - { - "ordinal": 9, - "name": "max_fee_per_gas", - "type_info": "Numeric" - }, - { - "ordinal": 10, - "name": "max_priority_fee_per_gas", - "type_info": "Numeric" - }, - { - "ordinal": 11, - "name": "effective_gas_price", - "type_info": "Numeric" - }, - { - "ordinal": 12, - "name": "l1_batch_number", - "type_info": "Int8" - }, - { - "ordinal": 13, - "name": "l1_batch_tx_index", - "type_info": "Int4" - }, - { - "ordinal": 14, - "name": "execute_contract_address", - "type_info": "Jsonb" - }, - { - "ordinal": 15, - "name": "calldata", - "type_info": "Jsonb" - }, - { - "ordinal": 16, - "name": "block_hash", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [ - "Int8", - "Int4" - ] - }, - "nullable": [ - false, - true, - false, - true, - true, - false, - true, - false, - true, - true, - true, - true, - true, - true, - null, - null, - false - ] - }, - "hash": "05be1a2c5cefcb1a58af2e5113e89003638d26219c13bc176e8cfee696d4e9f6" -} diff --git a/core/lib/dal/.sqlx/query-0b5d5efeac95d429cf6a5be22153897edf8c868094ad029e2e8fcf286d44fd55.json b/core/lib/dal/.sqlx/query-0b5d5efeac95d429cf6a5be22153897edf8c868094ad029e2e8fcf286d44fd55.json new file mode 100644 index 000000000000..822a6967f6db --- /dev/null +++ b/core/lib/dal/.sqlx/query-0b5d5efeac95d429cf6a5be22153897edf8c868094ad029e2e8fcf286d44fd55.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO\n data_availability (l1_batch_number, blob_id, sent_at, created_at, updated_at)\n VALUES\n ($1, $2, $3, NOW(), NOW())\n ON CONFLICT DO NOTHING\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Text", + "Timestamp" + ] + }, + "nullable": [] + }, + "hash": "0b5d5efeac95d429cf6a5be22153897edf8c868094ad029e2e8fcf286d44fd55" +} diff --git a/core/lib/dal/.sqlx/query-0ccfbde0df7c74b489bae4799177b9a22283340a8c9fb4c28d2d76de921ca77b.json b/core/lib/dal/.sqlx/query-0ccfbde0df7c74b489bae4799177b9a22283340a8c9fb4c28d2d76de921ca77b.json new file mode 100644 index 000000000000..f4bd9fdfb765 --- /dev/null +++ b/core/lib/dal/.sqlx/query-0ccfbde0df7c74b489bae4799177b9a22283340a8c9fb4c28d2d76de921ca77b.json @@ -0,0 +1,38 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n l1_batch_number,\n blob_id,\n inclusion_data,\n sent_at\n FROM\n data_availability\n WHERE\n inclusion_data IS NULL\n ORDER BY\n l1_batch_number\n LIMIT\n 1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "l1_batch_number", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "blob_id", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "inclusion_data", + "type_info": "Bytea" + }, + { + "ordinal": 3, + "name": "sent_at", + "type_info": "Timestamp" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false, + true, + false + ] + }, + "hash": "0ccfbde0df7c74b489bae4799177b9a22283340a8c9fb4c28d2d76de921ca77b" +} diff --git a/core/lib/dal/.sqlx/query-3ecd408294c93a5ee7dbbe128c52c62033a7f690353f01b2978ef9b30d52c94e.json b/core/lib/dal/.sqlx/query-3ecd408294c93a5ee7dbbe128c52c62033a7f690353f01b2978ef9b30d52c94e.json new file mode 100644 index 000000000000..a64b8e06628f --- /dev/null +++ b/core/lib/dal/.sqlx/query-3ecd408294c93a5ee7dbbe128c52c62033a7f690353f01b2978ef9b30d52c94e.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n inclusion_data\n FROM\n data_availability\n WHERE\n l1_batch_number = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "inclusion_data", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + true + ] + }, + "hash": "3ecd408294c93a5ee7dbbe128c52c62033a7f690353f01b2978ef9b30d52c94e" +} diff --git a/core/lib/dal/.sqlx/query-5c99342c4fbf36ccc8e9c9dafc76de37201091bfccd3caf922e766896c5a542b.json b/core/lib/dal/.sqlx/query-5c99342c4fbf36ccc8e9c9dafc76de37201091bfccd3caf922e766896c5a542b.json new file mode 100644 index 000000000000..5d09a9c37f7a --- /dev/null +++ b/core/lib/dal/.sqlx/query-5c99342c4fbf36ccc8e9c9dafc76de37201091bfccd3caf922e766896c5a542b.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE data_availability\n SET\n inclusion_data = $1,\n updated_at = NOW()\n WHERE\n l1_batch_number = $2\n AND inclusion_data IS NULL\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Bytea", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "5c99342c4fbf36ccc8e9c9dafc76de37201091bfccd3caf922e766896c5a542b" +} diff --git a/core/lib/dal/.sqlx/query-cc4c740ec24e6845343adc3ce43588448fb534a75d2da0f54999f1befa17facc.json b/core/lib/dal/.sqlx/query-63f95c6cdcfd933e2cf8f62c0d408f2dce89f7b700896fcc0f242e0e15ba058e.json similarity index 80% rename from core/lib/dal/.sqlx/query-cc4c740ec24e6845343adc3ce43588448fb534a75d2da0f54999f1befa17facc.json rename to core/lib/dal/.sqlx/query-63f95c6cdcfd933e2cf8f62c0d408f2dce89f7b700896fcc0f242e0e15ba058e.json index 5fdf9363a0f9..cb68e7622524 100644 --- a/core/lib/dal/.sqlx/query-cc4c740ec24e6845343adc3ce43588448fb534a75d2da0f54999f1befa17facc.json +++ b/core/lib/dal/.sqlx/query-63f95c6cdcfd933e2cf8f62c0d408f2dce89f7b700896fcc0f242e0e15ba058e.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n number,\n l1_batches.timestamp,\n l1_tx_count,\n l2_tx_count,\n bloom,\n priority_ops_onchain_data,\n hash,\n commitment,\n l2_to_l1_messages,\n used_contract_hashes,\n compressed_initial_writes,\n compressed_repeated_writes,\n l2_l1_merkle_root,\n rollup_last_leaf_index,\n zkporter_is_available,\n l1_batches.bootloader_code_hash,\n l1_batches.default_aa_code_hash,\n aux_data_hash,\n pass_through_data_hash,\n meta_parameters_hash,\n protocol_version,\n compressed_state_diffs,\n system_logs,\n events_queue_commitment,\n bootloader_initial_content_commitment,\n pubdata_input\n FROM\n l1_batches\n LEFT JOIN commitments ON commitments.l1_batch_number = l1_batches.number\n JOIN protocol_versions ON protocol_versions.id = l1_batches.protocol_version\n WHERE\n eth_commit_tx_id IS NULL\n AND number != 0\n AND protocol_versions.bootloader_code_hash = $1\n AND protocol_versions.default_account_code_hash = $2\n AND commitment IS NOT NULL\n AND (\n protocol_versions.id = $3\n OR protocol_versions.upgrade_tx_hash IS NULL\n )\n AND events_queue_commitment IS NOT NULL\n AND bootloader_initial_content_commitment IS NOT NULL\n ORDER BY\n number\n LIMIT\n $4\n ", + "query": "\n SELECT\n number,\n l1_batches.timestamp,\n l1_tx_count,\n l2_tx_count,\n bloom,\n priority_ops_onchain_data,\n hash,\n commitment,\n l2_to_l1_messages,\n used_contract_hashes,\n compressed_initial_writes,\n compressed_repeated_writes,\n l2_l1_merkle_root,\n rollup_last_leaf_index,\n zkporter_is_available,\n l1_batches.bootloader_code_hash,\n l1_batches.default_aa_code_hash,\n aux_data_hash,\n pass_through_data_hash,\n meta_parameters_hash,\n protocol_version,\n compressed_state_diffs,\n system_logs,\n events_queue_commitment,\n bootloader_initial_content_commitment,\n pubdata_input\n FROM\n l1_batches\n LEFT JOIN commitments ON commitments.l1_batch_number = l1_batches.number\n LEFT JOIN data_availability ON data_availability.l1_batch_number = l1_batches.number\n JOIN protocol_versions ON protocol_versions.id = l1_batches.protocol_version\n WHERE\n eth_commit_tx_id IS NULL\n AND number != 0\n AND protocol_versions.bootloader_code_hash = $1\n AND protocol_versions.default_account_code_hash = $2\n AND commitment IS NOT NULL\n AND (\n protocol_versions.id = $3\n OR protocol_versions.upgrade_tx_hash IS NULL\n )\n AND events_queue_commitment IS NOT NULL\n AND bootloader_initial_content_commitment IS NOT NULL\n AND (\n data_availability.inclusion_data IS NOT NULL\n OR $4 IS FALSE\n )\n ORDER BY\n number\n LIMIT\n $5\n ", "describe": { "columns": [ { @@ -139,6 +139,7 @@ "Bytea", "Bytea", "Int4", + "Bool", "Int8" ] }, @@ -171,5 +172,5 @@ true ] }, - "hash": "cc4c740ec24e6845343adc3ce43588448fb534a75d2da0f54999f1befa17facc" + "hash": "63f95c6cdcfd933e2cf8f62c0d408f2dce89f7b700896fcc0f242e0e15ba058e" } diff --git a/core/lib/dal/.sqlx/query-6f003ee0311b9ff1f42d3a74587670ab55ca94647e0caa92adab7c18260f18ff.json b/core/lib/dal/.sqlx/query-6f003ee0311b9ff1f42d3a74587670ab55ca94647e0caa92adab7c18260f18ff.json new file mode 100644 index 000000000000..768089b083a1 --- /dev/null +++ b/core/lib/dal/.sqlx/query-6f003ee0311b9ff1f42d3a74587670ab55ca94647e0caa92adab7c18260f18ff.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n blob_id\n FROM\n data_availability\n WHERE\n l1_batch_number = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "blob_id", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "6f003ee0311b9ff1f42d3a74587670ab55ca94647e0caa92adab7c18260f18ff" +} diff --git a/core/lib/dal/.sqlx/query-928139bf23bd0d57b8dbdb3283b139300ad3b80ac9e70c00864c3d9f6521b028.json b/core/lib/dal/.sqlx/query-928139bf23bd0d57b8dbdb3283b139300ad3b80ac9e70c00864c3d9f6521b028.json new file mode 100644 index 000000000000..e192763b189b --- /dev/null +++ b/core/lib/dal/.sqlx/query-928139bf23bd0d57b8dbdb3283b139300ad3b80ac9e70c00864c3d9f6521b028.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n number,\n pubdata_input\n FROM\n l1_batches\n LEFT JOIN data_availability ON data_availability.l1_batch_number = l1_batches.number\n WHERE\n eth_commit_tx_id IS NULL\n AND number != 0\n AND data_availability.blob_id IS NULL\n AND pubdata_input IS NOT NULL\n ORDER BY\n number\n LIMIT\n $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "number", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "pubdata_input", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false, + true + ] + }, + "hash": "928139bf23bd0d57b8dbdb3283b139300ad3b80ac9e70c00864c3d9f6521b028" +} diff --git a/core/lib/dal/migrations/20240522081114_create_data_availability_table.down.sql b/core/lib/dal/migrations/20240522081114_create_data_availability_table.down.sql new file mode 100644 index 000000000000..b6993d850ea5 --- /dev/null +++ b/core/lib/dal/migrations/20240522081114_create_data_availability_table.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS data_availability; diff --git a/core/lib/dal/migrations/20240522081114_create_data_availability_table.up.sql b/core/lib/dal/migrations/20240522081114_create_data_availability_table.up.sql new file mode 100644 index 000000000000..037398021da6 --- /dev/null +++ b/core/lib/dal/migrations/20240522081114_create_data_availability_table.up.sql @@ -0,0 +1,13 @@ +CREATE TABLE IF NOT EXISTS data_availability +( + l1_batch_number BIGINT PRIMARY KEY REFERENCES l1_batches (number) ON DELETE CASCADE, + + blob_id TEXT NOT NULL, -- blob here is an abstract term, unrelated to any DA implementation + -- the BYTEA used for this column as the most generic type + -- the actual format of blob identifier and inclusion data is defined by the DA client implementation + inclusion_data BYTEA, + sent_at TIMESTAMP NOT NULL, + + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL +); diff --git a/core/lib/dal/src/blocks_dal.rs b/core/lib/dal/src/blocks_dal.rs index 6062dcefe89c..4f4b3e99ff7b 100644 --- a/core/lib/dal/src/blocks_dal.rs +++ b/core/lib/dal/src/blocks_dal.rs @@ -1578,12 +1578,16 @@ impl BlocksDal<'_, '_> { .context("map_l1_batches()") } + /// When `with_da_inclusion_info` is true, only batches for which custom DA inclusion + /// information has already been provided will be included pub async fn get_ready_for_commit_l1_batches( &mut self, limit: usize, bootloader_hash: H256, default_aa_hash: H256, protocol_version_id: ProtocolVersionId, + + with_da_inclusion_info: bool, ) -> anyhow::Result> { let raw_batches = sqlx::query_as!( StorageL1Batch, @@ -1618,6 +1622,7 @@ impl BlocksDal<'_, '_> { FROM l1_batches LEFT JOIN commitments ON commitments.l1_batch_number = l1_batches.number + LEFT JOIN data_availability ON data_availability.l1_batch_number = l1_batches.number JOIN protocol_versions ON protocol_versions.id = l1_batches.protocol_version WHERE eth_commit_tx_id IS NULL @@ -1631,14 +1636,19 @@ impl BlocksDal<'_, '_> { ) AND events_queue_commitment IS NOT NULL AND bootloader_initial_content_commitment IS NOT NULL + AND ( + data_availability.inclusion_data IS NOT NULL + OR $4 IS FALSE + ) ORDER BY number LIMIT - $4 + $5 "#, bootloader_hash.as_bytes(), default_aa_hash.as_bytes(), protocol_version_id as i32, + with_da_inclusion_info, limit as i64, ) .instrument("get_ready_for_commit_l1_batches") @@ -1646,6 +1656,7 @@ impl BlocksDal<'_, '_> { .with_arg("bootloader_hash", &bootloader_hash) .with_arg("default_aa_hash", &default_aa_hash) .with_arg("protocol_version_id", &protocol_version_id) + .with_arg("with_da_inclusion_info", &with_da_inclusion_info) .fetch_all(self.storage) .await?; diff --git a/core/lib/dal/src/data_availability_dal.rs b/core/lib/dal/src/data_availability_dal.rs new file mode 100644 index 000000000000..24048ec4fa19 --- /dev/null +++ b/core/lib/dal/src/data_availability_dal.rs @@ -0,0 +1,217 @@ +use zksync_db_connection::{ + connection::Connection, + error::DalResult, + instrument::{InstrumentExt, Instrumented}, +}; +use zksync_types::{pubdata_da::DataAvailabilityBlob, L1BatchNumber}; + +use crate::{ + models::storage_data_availability::{L1BatchDA, StorageDABlob}, + Core, +}; + +#[derive(Debug)] +pub struct DataAvailabilityDal<'a, 'c> { + pub(crate) storage: &'a mut Connection<'c, Core>, +} + +impl DataAvailabilityDal<'_, '_> { + /// Inserts the blob_id for the given L1 batch. If the blob_id is already present, + /// verifies that it matches the one provided in the function arguments + /// (preventing the same L1 batch from being stored twice) + pub async fn insert_l1_batch_da( + &mut self, + number: L1BatchNumber, + blob_id: &str, + sent_at: chrono::NaiveDateTime, + ) -> DalResult<()> { + let update_result = sqlx::query!( + r#" + INSERT INTO + data_availability (l1_batch_number, blob_id, sent_at, created_at, updated_at) + VALUES + ($1, $2, $3, NOW(), NOW()) + ON CONFLICT DO NOTHING + "#, + i64::from(number.0), + blob_id, + sent_at, + ) + .instrument("insert_l1_batch_da") + .with_arg("number", &number) + .with_arg("blob_id", &blob_id) + .report_latency() + .execute(self.storage) + .await?; + + if update_result.rows_affected() == 0 { + tracing::debug!( + "L1 batch #{number}: DA blob_id wasn't updated as it's already present" + ); + + let instrumentation = + Instrumented::new("get_matching_batch_da_blob_id").with_arg("number", &number); + + // Batch was already processed. Verify that existing DA blob_id matches + let query = sqlx::query!( + r#" + SELECT + blob_id + FROM + data_availability + WHERE + l1_batch_number = $1 + "#, + i64::from(number.0), + ); + + let matched: String = instrumentation + .clone() + .with(query) + .report_latency() + .fetch_one(self.storage) + .await? + .blob_id; + + if matched != *blob_id.to_string() { + let err = instrumentation.constraint_error(anyhow::anyhow!( + "Error storing DA blob id. DA blob_id {blob_id} for L1 batch #{number} does not match the expected value" + )); + return Err(err); + } + } + Ok(()) + } + + /// Saves the inclusion data for the given L1 batch. If the inclusion data is already present, + /// verifies that it matches the one provided in the function arguments + /// (meaning that the inclusion data corresponds to the same DA blob) + pub async fn save_l1_batch_inclusion_data( + &mut self, + number: L1BatchNumber, + da_inclusion_data: &[u8], + ) -> DalResult<()> { + let update_result = sqlx::query!( + r#" + UPDATE data_availability + SET + inclusion_data = $1, + updated_at = NOW() + WHERE + l1_batch_number = $2 + AND inclusion_data IS NULL + "#, + da_inclusion_data, + i64::from(number.0), + ) + .instrument("save_l1_batch_da_data") + .with_arg("number", &number) + .report_latency() + .execute(self.storage) + .await?; + + if update_result.rows_affected() == 0 { + tracing::debug!("L1 batch #{number}: DA data wasn't updated as it's already present"); + + let instrumentation = + Instrumented::new("get_matching_batch_da_data").with_arg("number", &number); + + // Batch was already processed. Verify that existing DA data matches + let query = sqlx::query!( + r#" + SELECT + inclusion_data + FROM + data_availability + WHERE + l1_batch_number = $1 + "#, + i64::from(number.0), + ); + + let matched: Option> = instrumentation + .clone() + .with(query) + .report_latency() + .fetch_one(self.storage) + .await? + .inclusion_data; + + if matched.unwrap_or_default() != da_inclusion_data.to_vec() { + let err = instrumentation.constraint_error(anyhow::anyhow!( + "Error storing DA inclusion data. DA data for L1 batch #{number} does not match the one provided before" + )); + return Err(err); + } + } + Ok(()) + } + + /// Assumes that the L1 batches are sorted by number, and returns the first one that is ready for DA dispatch. + pub async fn get_first_da_blob_awaiting_inclusion( + &mut self, + ) -> DalResult> { + Ok(sqlx::query_as!( + StorageDABlob, + r#" + SELECT + l1_batch_number, + blob_id, + inclusion_data, + sent_at + FROM + data_availability + WHERE + inclusion_data IS NULL + ORDER BY + l1_batch_number + LIMIT + 1 + "#, + ) + .instrument("get_first_da_blob_awaiting_inclusion") + .fetch_optional(self.storage) + .await? + .map(DataAvailabilityBlob::from)) + } + + /// Fetches the pubdata and `l1_batch_number` for the L1 batches that are ready for DA dispatch. + pub async fn get_ready_for_da_dispatch_l1_batches( + &mut self, + limit: usize, + ) -> DalResult> { + let rows = sqlx::query!( + r#" + SELECT + number, + pubdata_input + FROM + l1_batches + LEFT JOIN data_availability ON data_availability.l1_batch_number = l1_batches.number + WHERE + eth_commit_tx_id IS NULL + AND number != 0 + AND data_availability.blob_id IS NULL + AND pubdata_input IS NOT NULL + ORDER BY + number + LIMIT + $1 + "#, + limit as i64, + ) + .instrument("get_ready_for_da_dispatch_l1_batches") + .with_arg("limit", &limit) + .fetch_all(self.storage) + .await?; + + Ok(rows + .into_iter() + .map(|row| L1BatchDA { + // `unwrap` is safe here because we have a `WHERE` clause that filters out `NULL` values + pubdata: row.pubdata_input.unwrap(), + l1_batch_number: L1BatchNumber(row.number as u32), + }) + .collect()) + } +} diff --git a/core/lib/dal/src/lib.rs b/core/lib/dal/src/lib.rs index 7dd54cbaef94..5f95e440d10d 100644 --- a/core/lib/dal/src/lib.rs +++ b/core/lib/dal/src/lib.rs @@ -13,9 +13,10 @@ pub use zksync_db_connection::{ use crate::{ blocks_dal::BlocksDal, blocks_web3_dal::BlocksWeb3Dal, consensus_dal::ConsensusDal, - contract_verification_dal::ContractVerificationDal, eth_sender_dal::EthSenderDal, - events_dal::EventsDal, events_web3_dal::EventsWeb3Dal, factory_deps_dal::FactoryDepsDal, - proof_generation_dal::ProofGenerationDal, protocol_versions_dal::ProtocolVersionsDal, + contract_verification_dal::ContractVerificationDal, data_availability_dal::DataAvailabilityDal, + eth_sender_dal::EthSenderDal, events_dal::EventsDal, events_web3_dal::EventsWeb3Dal, + factory_deps_dal::FactoryDepsDal, proof_generation_dal::ProofGenerationDal, + protocol_versions_dal::ProtocolVersionsDal, protocol_versions_web3_dal::ProtocolVersionsWeb3Dal, pruning_dal::PruningDal, snapshot_recovery_dal::SnapshotRecoveryDal, snapshots_creator_dal::SnapshotsCreatorDal, snapshots_dal::SnapshotsDal, storage_logs_dal::StorageLogsDal, @@ -31,6 +32,7 @@ pub mod blocks_web3_dal; pub mod consensus; pub mod consensus_dal; pub mod contract_verification_dal; +mod data_availability_dal; pub mod eth_sender_dal; pub mod events_dal; pub mod events_web3_dal; @@ -124,6 +126,8 @@ where fn pruning_dal(&mut self) -> PruningDal<'_, 'a>; + fn data_availability_dal(&mut self) -> DataAvailabilityDal<'_, 'a>; + fn vm_runner_dal(&mut self) -> VmRunnerDal<'_, 'a>; } @@ -240,6 +244,10 @@ impl<'a> CoreDal<'a> for Connection<'a, Core> { PruningDal { storage: self } } + fn data_availability_dal(&mut self) -> DataAvailabilityDal<'_, 'a> { + DataAvailabilityDal { storage: self } + } + fn vm_runner_dal(&mut self) -> VmRunnerDal<'_, 'a> { VmRunnerDal { storage: self } } diff --git a/core/lib/dal/src/models/mod.rs b/core/lib/dal/src/models/mod.rs index bc0e2c657da5..34c914af59dd 100644 --- a/core/lib/dal/src/models/mod.rs +++ b/core/lib/dal/src/models/mod.rs @@ -3,6 +3,7 @@ use anyhow::Context as _; use zksync_db_connection::error::SqlxContext; use zksync_types::{ProtocolVersionId, H160, H256}; +pub(crate) mod storage_data_availability; pub mod storage_eth_tx; pub mod storage_event; pub mod storage_log; diff --git a/core/lib/dal/src/models/storage_data_availability.rs b/core/lib/dal/src/models/storage_data_availability.rs new file mode 100644 index 000000000000..2a1b39845e69 --- /dev/null +++ b/core/lib/dal/src/models/storage_data_availability.rs @@ -0,0 +1,29 @@ +use chrono::NaiveDateTime; +use zksync_types::{pubdata_da::DataAvailabilityBlob, L1BatchNumber}; + +/// Represents a blob in the data availability layer. +#[derive(Debug, Clone)] +pub(crate) struct StorageDABlob { + pub l1_batch_number: i64, + pub blob_id: String, + pub inclusion_data: Option>, + pub sent_at: NaiveDateTime, +} + +impl From for DataAvailabilityBlob { + fn from(blob: StorageDABlob) -> DataAvailabilityBlob { + DataAvailabilityBlob { + l1_batch_number: L1BatchNumber(blob.l1_batch_number as u32), + blob_id: blob.blob_id, + inclusion_data: blob.inclusion_data, + sent_at: blob.sent_at.and_utc(), + } + } +} + +/// A small struct used to store a batch and its data availability, which are retrieved from the database. +#[derive(Debug)] +pub struct L1BatchDA { + pub pubdata: Vec, + pub l1_batch_number: L1BatchNumber, +} diff --git a/core/lib/default_da_clients/Cargo.toml b/core/lib/default_da_clients/Cargo.toml new file mode 100644 index 000000000000..c19af34681a8 --- /dev/null +++ b/core/lib/default_da_clients/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "zksync_default_da_clients" +version.workspace = true +edition.workspace = true +authors.workspace = true +homepage.workspace = true +repository.workspace = true +license.workspace = true +keywords.workspace = true +categories.workspace = true + +[dependencies] +serde = { workspace = true, features = ["derive"] } +tracing.workspace = true +async-trait.workspace = true +anyhow.workspace = true +flate2.workspace = true + +zksync_config.workspace = true +zksync_types.workspace = true +zksync_object_store.workspace = true +zksync_da_client.workspace = true +zksync_node_framework.workspace = true +zksync_env_config.workspace = true diff --git a/core/lib/default_da_clients/README.md b/core/lib/default_da_clients/README.md new file mode 100644 index 000000000000..17ced715b268 --- /dev/null +++ b/core/lib/default_da_clients/README.md @@ -0,0 +1,11 @@ +# Default DA Clients + +This crate contains the default implementations of the Data Availability clients. Default clients are maintained within +this repo because they are tightly coupled with the codebase, and would cause the circular dependency if they were to be +moved to the [hyperchain-da](https://github.com/matter-labs/hyperchain-da) repository. + +Currently, the following DataAvailability clients are implemented: + +- `NoDA client` that does not send or store any pubdata, it is needed to run the zkSync network in the "no-DA" mode + utilizing the DA framework. +- `Object Store client` that stores the pubdata in the Object Store(GCS). diff --git a/core/lib/default_da_clients/src/lib.rs b/core/lib/default_da_clients/src/lib.rs new file mode 100644 index 000000000000..3aa2a18cdcec --- /dev/null +++ b/core/lib/default_da_clients/src/lib.rs @@ -0,0 +1,2 @@ +pub mod no_da; +pub mod object_store; diff --git a/core/lib/default_da_clients/src/no_da/client.rs b/core/lib/default_da_clients/src/no_da/client.rs new file mode 100644 index 000000000000..2710c9ce9d9b --- /dev/null +++ b/core/lib/default_da_clients/src/no_da/client.rs @@ -0,0 +1,28 @@ +use async_trait::async_trait; +use zksync_da_client::{ + types::{DAError, DispatchResponse, InclusionData}, + DataAvailabilityClient, +}; + +/// A no-op implementation of the `DataAvailabilityClient` trait, that doesn't store the pubdata. +#[derive(Clone, Debug, Default)] +pub struct NoDAClient; + +#[async_trait] +impl DataAvailabilityClient for NoDAClient { + async fn dispatch_blob(&self, _: u32, _: Vec) -> Result { + Ok(DispatchResponse::default()) + } + + async fn get_inclusion_data(&self, _: &str) -> Result, DAError> { + return Ok(Some(InclusionData::default())); + } + + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } + + fn blob_size_limit(&self) -> Option { + None + } +} diff --git a/core/lib/default_da_clients/src/no_da/mod.rs b/core/lib/default_da_clients/src/no_da/mod.rs new file mode 100644 index 000000000000..814cf30c2cbd --- /dev/null +++ b/core/lib/default_da_clients/src/no_da/mod.rs @@ -0,0 +1,2 @@ +pub mod client; +pub mod wiring_layer; diff --git a/core/lib/default_da_clients/src/no_da/wiring_layer.rs b/core/lib/default_da_clients/src/no_da/wiring_layer.rs new file mode 100644 index 000000000000..c1332da9a97e --- /dev/null +++ b/core/lib/default_da_clients/src/no_da/wiring_layer.rs @@ -0,0 +1,28 @@ +use std::fmt::Debug; + +use zksync_da_client::DataAvailabilityClient; +use zksync_node_framework::{ + implementations::resources::da_client::DAClientResource, + service::ServiceContext, + wiring_layer::{WiringError, WiringLayer}, +}; + +use crate::no_da::client::NoDAClient; + +#[derive(Debug, Default)] +pub struct NoDAClientWiringLayer; + +#[async_trait::async_trait] +impl WiringLayer for NoDAClientWiringLayer { + fn layer_name(&self) -> &'static str { + "no_da_layer" + } + + async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { + let client: Box = Box::new(NoDAClient); + + context.insert_resource(DAClientResource(client))?; + + Ok(()) + } +} diff --git a/core/lib/default_da_clients/src/object_store/client.rs b/core/lib/default_da_clients/src/object_store/client.rs new file mode 100644 index 000000000000..fc17a842a099 --- /dev/null +++ b/core/lib/default_da_clients/src/object_store/client.rs @@ -0,0 +1,86 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use zksync_config::ObjectStoreConfig; +use zksync_da_client::{ + types::{DAError, DispatchResponse, InclusionData}, + DataAvailabilityClient, +}; +use zksync_object_store::{ObjectStore, ObjectStoreFactory}; +use zksync_types::L1BatchNumber; + +use crate::object_store::types::StorablePubdata; + +/// An implementation of the `DataAvailabilityClient` trait that stores the pubdata in the GCS. +#[derive(Clone, Debug)] +pub struct ObjectStoreDAClient { + object_store: Arc, +} + +impl ObjectStoreDAClient { + pub async fn new(object_store_conf: ObjectStoreConfig) -> anyhow::Result { + Ok(ObjectStoreDAClient { + object_store: ObjectStoreFactory::new(object_store_conf) + .create_store() + .await?, + }) + } +} + +#[async_trait] +impl DataAvailabilityClient for ObjectStoreDAClient { + async fn dispatch_blob( + &self, + batch_number: u32, + data: Vec, + ) -> Result { + if let Err(err) = self + .object_store + .put(L1BatchNumber(batch_number), &StorablePubdata { data }) + .await + { + return Err(DAError { + is_transient: err.is_transient(), + error: anyhow::Error::from(err), + }); + } + + Ok(DispatchResponse { + blob_id: batch_number.to_string(), + }) + } + + async fn get_inclusion_data(&self, key: &str) -> Result, DAError> { + let key_u32 = key.parse::().map_err(|err| DAError { + error: anyhow::Error::from(err).context(format!("Failed to parse blob key: {}", key)), + is_transient: false, + })?; + + if let Err(err) = self + .object_store + .get::(L1BatchNumber(key_u32)) + .await + { + if let zksync_object_store::ObjectStoreError::KeyNotFound(_) = err { + return Ok(None); + } + + return Err(DAError { + is_transient: err.is_transient(), + error: anyhow::Error::from(err), + }); + } + + // Using default here because we don't get any inclusion data from object store, thus + // there's nothing to check on L1. + return Ok(Some(InclusionData::default())); + } + + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } + + fn blob_size_limit(&self) -> Option { + None + } +} diff --git a/core/lib/default_da_clients/src/object_store/config.rs b/core/lib/default_da_clients/src/object_store/config.rs new file mode 100644 index 000000000000..285c39827c79 --- /dev/null +++ b/core/lib/default_da_clients/src/object_store/config.rs @@ -0,0 +1,12 @@ +use zksync_config::ObjectStoreConfig; +use zksync_env_config::envy_load; + +#[derive(Debug)] +pub struct DAObjectStoreConfig(pub ObjectStoreConfig); + +impl DAObjectStoreConfig { + pub fn from_env() -> anyhow::Result { + let config = envy_load("object_store", "DA_CLIENT_OBJECT_STORE_")?; + Ok(Self(config)) + } +} diff --git a/core/lib/default_da_clients/src/object_store/mod.rs b/core/lib/default_da_clients/src/object_store/mod.rs new file mode 100644 index 000000000000..1600941b0572 --- /dev/null +++ b/core/lib/default_da_clients/src/object_store/mod.rs @@ -0,0 +1,4 @@ +pub mod client; +pub mod config; +mod types; +pub mod wiring_layer; diff --git a/core/lib/default_da_clients/src/object_store/types.rs b/core/lib/default_da_clients/src/object_store/types.rs new file mode 100644 index 000000000000..b8ec9303e71e --- /dev/null +++ b/core/lib/default_da_clients/src/object_store/types.rs @@ -0,0 +1,38 @@ +use std::io::{Read, Write}; + +use flate2::{read::GzDecoder, write::GzEncoder, Compression}; +use zksync_object_store::{Bucket, StoredObject, _reexports::BoxedError}; +use zksync_types::L1BatchNumber; + +/// Used as a wrapper for the pubdata to be stored in the GCS. +#[derive(Debug)] +pub struct StorablePubdata { + pub data: Vec, +} + +impl StoredObject for StorablePubdata { + const BUCKET: Bucket = Bucket::DataAvailability; + type Key<'a> = L1BatchNumber; + + fn encode_key(key: Self::Key<'_>) -> String { + format!("l1_batch_{key}_pubdata.gzip") + } + + fn serialize(&self) -> Result, BoxedError> { + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(&self.data[..])?; + encoder.finish().map_err(From::from) + } + + fn deserialize(bytes: Vec) -> Result { + let mut decoder = GzDecoder::new(&bytes[..]); + let mut decompressed_bytes = Vec::new(); + decoder + .read_to_end(&mut decompressed_bytes) + .map_err(BoxedError::from)?; + + Ok(Self { + data: decompressed_bytes, + }) + } +} diff --git a/core/lib/default_da_clients/src/object_store/wiring_layer.rs b/core/lib/default_da_clients/src/object_store/wiring_layer.rs new file mode 100644 index 000000000000..7af7e4d04fa6 --- /dev/null +++ b/core/lib/default_da_clients/src/object_store/wiring_layer.rs @@ -0,0 +1,36 @@ +use zksync_config::ObjectStoreConfig; +use zksync_da_client::DataAvailabilityClient; +use zksync_node_framework::{ + implementations::resources::da_client::DAClientResource, + service::ServiceContext, + wiring_layer::{WiringError, WiringLayer}, +}; + +use crate::object_store::client::ObjectStoreDAClient; + +#[derive(Debug)] +pub struct ObjectStorageClientWiringLayer { + config: ObjectStoreConfig, +} + +impl ObjectStorageClientWiringLayer { + pub fn new(config: ObjectStoreConfig) -> Self { + Self { config } + } +} + +#[async_trait::async_trait] +impl WiringLayer for ObjectStorageClientWiringLayer { + fn layer_name(&self) -> &'static str { + "object_store_da_layer" + } + + async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { + let client: Box = + Box::new(ObjectStoreDAClient::new(self.config).await?); + + context.insert_resource(DAClientResource(client))?; + + Ok(()) + } +} diff --git a/core/lib/env_config/src/da_dispatcher.rs b/core/lib/env_config/src/da_dispatcher.rs new file mode 100644 index 000000000000..194e4185b286 --- /dev/null +++ b/core/lib/env_config/src/da_dispatcher.rs @@ -0,0 +1,44 @@ +use zksync_config::DADispatcherConfig; + +use crate::{envy_load, FromEnv}; + +impl FromEnv for DADispatcherConfig { + fn from_env() -> anyhow::Result { + envy_load("da_dispatcher", "DA_DISPATCHER_") + } +} + +#[cfg(test)] +mod tests { + use zksync_config::configs::da_dispatcher::DADispatcherConfig; + + use super::*; + use crate::test_utils::EnvMutex; + + static MUTEX: EnvMutex = EnvMutex::new(); + + fn expected_da_layer_config( + interval: u32, + rows_limit: u32, + max_retries: u16, + ) -> DADispatcherConfig { + DADispatcherConfig { + polling_interval_ms: Some(interval), + max_rows_to_dispatch: Some(rows_limit), + max_retries: Some(max_retries), + } + } + + #[test] + fn from_env_da_dispatcher() { + let mut lock = MUTEX.lock(); + let config = r#" + DA_DISPATCHER_POLLING_INTERVAL_MS=5000 + DA_DISPATCHER_MAX_ROWS_TO_DISPATCH=60 + DA_DISPATCHER_MAX_RETRIES=7 + "#; + lock.set_env(config); + let actual = DADispatcherConfig::from_env().unwrap(); + assert_eq!(actual, expected_da_layer_config(5000, 60, 7)); + } +} diff --git a/core/lib/env_config/src/lib.rs b/core/lib/env_config/src/lib.rs index 9218467fdaba..67078fcd4513 100644 --- a/core/lib/env_config/src/lib.rs +++ b/core/lib/env_config/src/lib.rs @@ -21,6 +21,7 @@ mod proof_data_handler; mod snapshots_creator; mod utils; +mod da_dispatcher; mod genesis; #[cfg(test)] mod test_utils; diff --git a/core/lib/l1_contract_interface/src/i_executor/structures/commit_batch_info.rs b/core/lib/l1_contract_interface/src/i_executor/structures/commit_batch_info.rs index cf17d8c7909e..b5d77ff60c16 100644 --- a/core/lib/l1_contract_interface/src/i_executor/structures/commit_batch_info.rs +++ b/core/lib/l1_contract_interface/src/i_executor/structures/commit_batch_info.rs @@ -17,6 +17,7 @@ use crate::{ /// These are used by the L1 Contracts to indicate what DA layer is used for pubdata const PUBDATA_SOURCE_CALLDATA: u8 = 0; const PUBDATA_SOURCE_BLOBS: u8 = 1; +const PUBDATA_SOURCE_CUSTOM: u8 = 2; /// Encoding for `CommitBatchInfo` from `IExecutor.sol` for a contract running in rollup mode. #[derive(Debug)] @@ -208,6 +209,13 @@ impl Tokenizable for CommitBatchInfo<'_> { vec![PUBDATA_SOURCE_BLOBS] } + (L1BatchCommitmentMode::Rollup, PubdataDA::Custom) => { + panic!("Custom pubdata DA is incompatible with Rollup mode") + } + (L1BatchCommitmentMode::Validium, PubdataDA::Custom) => { + vec![PUBDATA_SOURCE_CUSTOM] + } + (L1BatchCommitmentMode::Rollup, PubdataDA::Calldata) => { // We compute and add the blob commitment to the pubdata payload so that we can verify the proof // even if we are not using blobs. diff --git a/core/lib/object_store/src/factory.rs b/core/lib/object_store/src/factory.rs index 0fa1329ad72c..af00a8193d7f 100644 --- a/core/lib/object_store/src/factory.rs +++ b/core/lib/object_store/src/factory.rs @@ -52,6 +52,11 @@ impl ObjectStoreFactory { .cloned() } + /// Creates an [`ObjectStore`] based on the provided `config`. + /// + /// # Errors + /// + /// Returns an error if store initialization fails (e.g., because of incorrect configuration). async fn create_from_config( config: &ObjectStoreConfig, ) -> Result, ObjectStoreError> { diff --git a/core/lib/object_store/src/raw.rs b/core/lib/object_store/src/raw.rs index 66cda57a0ab1..da1cd99728d9 100644 --- a/core/lib/object_store/src/raw.rs +++ b/core/lib/object_store/src/raw.rs @@ -18,6 +18,7 @@ pub enum Bucket { ProofsFri, ProofsTee, StorageSnapshot, + DataAvailability, TeeVerifierInput, } @@ -36,6 +37,7 @@ impl Bucket { Self::ProofsFri => "proofs_fri", Self::ProofsTee => "proofs_tee", Self::StorageSnapshot => "storage_logs_snapshots", + Self::DataAvailability => "data_availability", Self::TeeVerifierInput => "tee_verifier_inputs", } } diff --git a/core/lib/protobuf_config/src/da_dispatcher.rs b/core/lib/protobuf_config/src/da_dispatcher.rs new file mode 100644 index 000000000000..1cafa37a1e19 --- /dev/null +++ b/core/lib/protobuf_config/src/da_dispatcher.rs @@ -0,0 +1,24 @@ +use zksync_config::configs::{self}; +use zksync_protobuf::ProtoRepr; + +use crate::proto::da_dispatcher as proto; + +impl ProtoRepr for proto::DataAvailabilityDispatcher { + type Type = configs::da_dispatcher::DADispatcherConfig; + + fn read(&self) -> anyhow::Result { + Ok(configs::da_dispatcher::DADispatcherConfig { + polling_interval_ms: self.polling_interval_ms, + max_rows_to_dispatch: self.max_rows_to_dispatch, + max_retries: self.max_retries.map(|x| x as u16), + }) + } + + fn build(this: &Self::Type) -> Self { + Self { + polling_interval_ms: this.polling_interval_ms, + max_rows_to_dispatch: this.max_rows_to_dispatch, + max_retries: this.max_retries.map(Into::into), + } + } +} diff --git a/core/lib/protobuf_config/src/eth.rs b/core/lib/protobuf_config/src/eth.rs index 4ed5a8841436..90807f7dafa3 100644 --- a/core/lib/protobuf_config/src/eth.rs +++ b/core/lib/protobuf_config/src/eth.rs @@ -30,6 +30,7 @@ impl proto::PubdataSendingMode { match x { From::Calldata => Self::Calldata, From::Blobs => Self::Blobs, + From::Custom => Self::Custom, } } @@ -38,6 +39,7 @@ impl proto::PubdataSendingMode { match self { Self::Calldata => To::Calldata, Self::Blobs => To::Blobs, + Self::Custom => To::Custom, } } } diff --git a/core/lib/protobuf_config/src/general.rs b/core/lib/protobuf_config/src/general.rs index 9ea3a3265541..9215ad5ae7d6 100644 --- a/core/lib/protobuf_config/src/general.rs +++ b/core/lib/protobuf_config/src/general.rs @@ -37,6 +37,8 @@ impl ProtoRepr for proto::GeneralConfig { snapshot_creator: read_optional_repr(&self.snapshot_creator) .context("snapshot_creator")?, observability: read_optional_repr(&self.observability).context("observability")?, + da_dispatcher_config: read_optional_repr(&self.da_dispatcher) + .context("da_dispatcher")?, protective_reads_writer_config: read_optional_repr(&self.protective_reads_writer) .context("protective_reads_writer")?, core_object_store: read_optional_repr(&self.core_object_store) @@ -77,6 +79,7 @@ impl ProtoRepr for proto::GeneralConfig { eth: this.eth.as_ref().map(ProtoRepr::build), snapshot_creator: this.snapshot_creator.as_ref().map(ProtoRepr::build), observability: this.observability.as_ref().map(ProtoRepr::build), + da_dispatcher: this.da_dispatcher_config.as_ref().map(ProtoRepr::build), protective_reads_writer: this .protective_reads_writer_config .as_ref() diff --git a/core/lib/protobuf_config/src/lib.rs b/core/lib/protobuf_config/src/lib.rs index f7eb19f0d60c..8b9ed28e23e2 100644 --- a/core/lib/protobuf_config/src/lib.rs +++ b/core/lib/protobuf_config/src/lib.rs @@ -11,6 +11,7 @@ mod commitment_generator; mod consensus; mod contract_verifier; mod contracts; +mod da_dispatcher; mod database; mod en; mod eth; diff --git a/core/lib/protobuf_config/src/proto/config/da_dispatcher.proto b/core/lib/protobuf_config/src/proto/config/da_dispatcher.proto new file mode 100644 index 000000000000..d1d913498a4e --- /dev/null +++ b/core/lib/protobuf_config/src/proto/config/da_dispatcher.proto @@ -0,0 +1,11 @@ +syntax = "proto3"; + +package zksync.config.da_dispatcher; + +import "zksync/config/object_store.proto"; + +message DataAvailabilityDispatcher { + optional uint32 polling_interval_ms = 1; + optional uint32 max_rows_to_dispatch = 2; + optional uint32 max_retries = 3; +} diff --git a/core/lib/protobuf_config/src/proto/config/eth_sender.proto b/core/lib/protobuf_config/src/proto/config/eth_sender.proto index 1eb15f0679a4..839c7f65b973 100644 --- a/core/lib/protobuf_config/src/proto/config/eth_sender.proto +++ b/core/lib/protobuf_config/src/proto/config/eth_sender.proto @@ -23,6 +23,7 @@ enum ProofLoadingMode { enum PubdataSendingMode { CALLDATA = 0; BLOBS = 1; + CUSTOM = 2; } message Sender { diff --git a/core/lib/protobuf_config/src/proto/config/general.proto b/core/lib/protobuf_config/src/proto/config/general.proto index 7d2423f6b71b..3931e708af87 100644 --- a/core/lib/protobuf_config/src/proto/config/general.proto +++ b/core/lib/protobuf_config/src/proto/config/general.proto @@ -13,6 +13,7 @@ import "zksync/config/house_keeper.proto"; import "zksync/config/observability.proto"; import "zksync/config/snapshots_creator.proto"; import "zksync/config/utils.proto"; +import "zksync/config/da_dispatcher.proto"; import "zksync/config/vm_runner.proto"; import "zksync/config/commitment_generator.proto"; import "zksync/config/snapshot_recovery.proto"; @@ -45,4 +46,5 @@ message GeneralConfig { optional config.snapshot_recovery.SnapshotRecovery snapshot_recovery = 35; optional config.pruning.Pruning pruning = 36; optional config.commitment_generator.CommitmentGenerator commitment_generator = 37; + optional config.da_dispatcher.DataAvailabilityDispatcher da_dispatcher = 38; } diff --git a/core/lib/types/src/pubdata_da.rs b/core/lib/types/src/pubdata_da.rs index 8f7d3a96f55e..6705fdc29530 100644 --- a/core/lib/types/src/pubdata_da.rs +++ b/core/lib/types/src/pubdata_da.rs @@ -1,5 +1,7 @@ +use chrono::{DateTime, Utc}; use num_enum::TryFromPrimitive; use serde::{Deserialize, Serialize}; +use zksync_basic_types::L1BatchNumber; use zksync_config::configs::eth_sender::PubdataSendingMode; /// Enum holding the current values used for DA Layers. @@ -7,8 +9,12 @@ use zksync_config::configs::eth_sender::PubdataSendingMode; #[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)] #[derive(TryFromPrimitive)] pub enum PubdataDA { + /// Pubdata is sent to the L1 as a tx calldata. Calldata = 0, + /// Pubdata is sent to L1 as EIP-4844 blobs. Blobs, + /// Pubdata is sent to the external storage (GCS/DA layers) or not sent at all. + Custom, } impl From for PubdataDA { @@ -16,6 +22,16 @@ impl From for PubdataDA { match value { PubdataSendingMode::Calldata => PubdataDA::Calldata, PubdataSendingMode::Blobs => PubdataDA::Blobs, + PubdataSendingMode::Custom => PubdataDA::Custom, } } } + +/// Represents a blob in the data availability layer. +#[derive(Debug, Clone)] +pub struct DataAvailabilityBlob { + pub l1_batch_number: L1BatchNumber, + pub blob_id: String, + pub inclusion_data: Option>, + pub sent_at: DateTime, +} diff --git a/core/lib/zksync_core_leftovers/src/lib.rs b/core/lib/zksync_core_leftovers/src/lib.rs index 8e85bad9cc33..b760a0b7e426 100644 --- a/core/lib/zksync_core_leftovers/src/lib.rs +++ b/core/lib/zksync_core_leftovers/src/lib.rs @@ -86,6 +86,8 @@ pub enum Component { Consensus, /// Component generating commitment for L1 batches. CommitmentGenerator, + /// Component sending a pubdata to the DA layers. + DADispatcher, /// VM runner-based component that saves protective reads to Postgres. VmRunnerProtectiveReads, } @@ -124,6 +126,7 @@ impl FromStr for Components { "proof_data_handler" => Ok(Components(vec![Component::ProofDataHandler])), "consensus" => Ok(Components(vec![Component::Consensus])), "commitment_generator" => Ok(Components(vec![Component::CommitmentGenerator])), + "da_dispatcher" => Ok(Components(vec![Component::DADispatcher])), "vm_runner_protective_reads" => { Ok(Components(vec![Component::VmRunnerProtectiveReads])) } diff --git a/core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs b/core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs index 60a610c359f8..c45b8cb8687b 100644 --- a/core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs +++ b/core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs @@ -16,8 +16,8 @@ use zksync_config::{ GeneralConfig, ObservabilityConfig, PrometheusConfig, ProofDataHandlerConfig, ProtectiveReadsWriterConfig, PruningConfig, SnapshotRecoveryConfig, }, - ApiConfig, ContractVerifierConfig, DBConfig, EthConfig, EthWatchConfig, GasAdjusterConfig, - ObjectStoreConfig, PostgresConfig, SnapshotsCreatorConfig, + ApiConfig, ContractVerifierConfig, DADispatcherConfig, DBConfig, EthConfig, EthWatchConfig, + GasAdjusterConfig, ObjectStoreConfig, PostgresConfig, SnapshotsCreatorConfig, }; use zksync_protobuf::repr::ProtoRepr; @@ -63,6 +63,7 @@ pub struct TempConfigStore { pub gas_adjuster_config: Option, pub observability: Option, pub snapshot_creator: Option, + pub da_dispatcher_config: Option, pub protective_reads_writer_config: Option, pub core_object_store: Option, pub commitment_generator: Option, @@ -93,6 +94,7 @@ impl TempConfigStore { eth: self.eth_sender_config.clone(), snapshot_creator: self.snapshot_creator.clone(), observability: self.observability.clone(), + da_dispatcher_config: self.da_dispatcher_config.clone(), protective_reads_writer_config: self.protective_reads_writer_config.clone(), core_object_store: self.core_object_store.clone(), commitment_generator: self.commitment_generator.clone(), diff --git a/core/node/consistency_checker/src/lib.rs b/core/node/consistency_checker/src/lib.rs index e4634c86e403..ba8085333a4c 100644 --- a/core/node/consistency_checker/src/lib.rs +++ b/core/node/consistency_checker/src/lib.rs @@ -262,6 +262,7 @@ pub fn detect_da( /// These are used by the L1 Contracts to indicate what DA layer is used for pubdata const PUBDATA_SOURCE_CALLDATA: u8 = 0; const PUBDATA_SOURCE_BLOBS: u8 = 1; + const PUBDATA_SOURCE_CUSTOM: u8 = 2; fn parse_error(message: impl Into>) -> ethabi::Error { ethabi::Error::Other(message.into()) @@ -292,6 +293,7 @@ pub fn detect_da( match last_reference_token.first() { Some(&byte) if byte == PUBDATA_SOURCE_CALLDATA => Ok(PubdataDA::Calldata), Some(&byte) if byte == PUBDATA_SOURCE_BLOBS => Ok(PubdataDA::Blobs), + Some(&byte) if byte == PUBDATA_SOURCE_CUSTOM => Ok(PubdataDA::Custom), Some(&byte) => Err(parse_error(format!( "unexpected first byte of the last reference token; expected one of [{PUBDATA_SOURCE_CALLDATA}, {PUBDATA_SOURCE_BLOBS}], \ got {byte}" diff --git a/core/node/da_dispatcher/Cargo.toml b/core/node/da_dispatcher/Cargo.toml new file mode 100644 index 000000000000..159c8f40ef47 --- /dev/null +++ b/core/node/da_dispatcher/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "zksync_da_dispatcher" +version.workspace = true +edition.workspace = true +authors.workspace = true +homepage.workspace = true +repository.workspace = true +license.workspace = true +keywords.workspace = true +categories.workspace = true + + +[dependencies] +vise.workspace = true +zksync_dal.workspace = true +zksync_utils.workspace = true +zksync_config.workspace = true +zksync_types.workspace = true +zksync_da_client.workspace = true + +tokio = { workspace = true, features = ["time"] } +anyhow.workspace = true +tracing.workspace = true +chrono.workspace = true +rand.workspace = true +futures.workspace = true diff --git a/core/node/da_dispatcher/README.md b/core/node/da_dispatcher/README.md new file mode 100644 index 000000000000..a7ea6351a5ed --- /dev/null +++ b/core/node/da_dispatcher/README.md @@ -0,0 +1,18 @@ +# DA dispatcher + +This crate contains an implementation of the DataAvailability dispatcher component, which sends a blobs of data to the +corresponding DA layer. + +## Overview + +The implementation of the DA clients is abstracted away from the dispatcher. The dispatcher is responsible for storing +the DA blobs info in the Postgres database and use it to get the inclusion proofs for the blobs. The retries logic is +also part of the DA dispatcher. + +This component assumes that batches are being sent to the L1 sequentially and that there is no need to fetch the +inclusion data for their DA in parallel. Same with dispatching DA blobs, there is no need to do that in parallel unless +we are facing performance issues when the sequencer is trying to catch up after some outage. + +This is a singleton component, only one instance of the DA dispatcher should be running at a time. In case multiple +instances are started, they will be dispatching the same pubdata blobs to the DA layer. It is not going to cause any +critical issues, but it is wasteful. diff --git a/core/node/da_dispatcher/src/da_dispatcher.rs b/core/node/da_dispatcher/src/da_dispatcher.rs new file mode 100644 index 000000000000..80c030dff338 --- /dev/null +++ b/core/node/da_dispatcher/src/da_dispatcher.rs @@ -0,0 +1,211 @@ +use std::{future::Future, time::Duration}; + +use anyhow::Context; +use chrono::{NaiveDateTime, Utc}; +use rand::Rng; +use tokio::sync::watch::Receiver; +use zksync_config::DADispatcherConfig; +use zksync_da_client::{types::DAError, DataAvailabilityClient}; +use zksync_dal::{ConnectionPool, Core, CoreDal}; +use zksync_types::L1BatchNumber; + +use crate::metrics::METRICS; + +#[derive(Debug)] +pub struct DataAvailabilityDispatcher { + client: Box, + pool: ConnectionPool, + config: DADispatcherConfig, +} + +impl DataAvailabilityDispatcher { + pub fn new( + pool: ConnectionPool, + config: DADispatcherConfig, + client: Box, + ) -> Self { + Self { + pool, + config, + client, + } + } + + pub async fn run(self, mut stop_receiver: Receiver) -> anyhow::Result<()> { + loop { + if *stop_receiver.borrow() { + break; + } + + let subtasks = futures::future::join( + async { + if let Err(err) = self.dispatch().await { + tracing::error!("dispatch error {err:?}"); + } + }, + async { + if let Err(err) = self.poll_for_inclusion().await { + tracing::error!("poll_for_inclusion error {err:?}"); + } + }, + ); + + tokio::select! { + _ = subtasks => {}, + _ = stop_receiver.changed() => { + break; + } + } + + if tokio::time::timeout(self.config.polling_interval(), stop_receiver.changed()) + .await + .is_ok() + { + break; + } + } + + tracing::info!("Stop signal received, da_dispatcher is shutting down"); + Ok(()) + } + + /// Dispatches the blobs to the data availability layer, and saves the blob_id in the database. + async fn dispatch(&self) -> anyhow::Result<()> { + let mut conn = self.pool.connection_tagged("da_dispatcher").await?; + let batches = conn + .data_availability_dal() + .get_ready_for_da_dispatch_l1_batches(self.config.max_rows_to_dispatch() as usize) + .await?; + drop(conn); + + for batch in batches { + let dispatch_latency = METRICS.blob_dispatch_latency.start(); + let dispatch_response = retry(self.config.max_retries(), batch.l1_batch_number, || { + self.client + .dispatch_blob(batch.l1_batch_number.0, batch.pubdata.clone()) + }) + .await + .with_context(|| { + format!( + "failed to dispatch a blob with batch_number: {}, pubdata_len: {}", + batch.l1_batch_number, + batch.pubdata.len() + ) + })?; + let dispatch_latency_duration = dispatch_latency.observe(); + + let sent_at = + NaiveDateTime::from_timestamp_millis(Utc::now().timestamp_millis()).unwrap(); + + let mut conn = self.pool.connection_tagged("da_dispatcher").await?; + conn.data_availability_dal() + .insert_l1_batch_da( + batch.l1_batch_number, + dispatch_response.blob_id.as_str(), + sent_at, + ) + .await?; + drop(conn); + + METRICS + .last_dispatched_l1_batch + .set(batch.l1_batch_number.0 as usize); + METRICS.blob_size.observe(batch.pubdata.len()); + tracing::info!( + "Dispatched a DA for batch_number: {}, pubdata_size: {}, dispatch_latency: {dispatch_latency_duration:?}", + batch.l1_batch_number, + batch.pubdata.len(), + ); + } + + Ok(()) + } + + /// Polls the data availability layer for inclusion data, and saves it in the database. + async fn poll_for_inclusion(&self) -> anyhow::Result<()> { + let mut conn = self.pool.connection_tagged("da_dispatcher").await?; + let blob_info = conn + .data_availability_dal() + .get_first_da_blob_awaiting_inclusion() + .await?; + drop(conn); + + let Some(blob_info) = blob_info else { + return Ok(()); + }; + + let inclusion_data = self + .client + .get_inclusion_data(blob_info.blob_id.as_str()) + .await + .with_context(|| { + format!( + "failed to get inclusion data for blob_id: {}, batch_number: {}", + blob_info.blob_id, blob_info.l1_batch_number + ) + })?; + + let Some(inclusion_data) = inclusion_data else { + return Ok(()); + }; + + let mut conn = self.pool.connection_tagged("da_dispatcher").await?; + conn.data_availability_dal() + .save_l1_batch_inclusion_data( + L1BatchNumber(blob_info.l1_batch_number.0), + inclusion_data.data.as_slice(), + ) + .await?; + drop(conn); + + let inclusion_latency = Utc::now().signed_duration_since(blob_info.sent_at); + if let Ok(latency) = inclusion_latency.to_std() { + METRICS.inclusion_latency.observe(latency); + } + METRICS + .last_included_l1_batch + .set(blob_info.l1_batch_number.0 as usize); + + tracing::info!( + "Received an inclusion data for a batch_number: {}, inclusion_latency_seconds: {}", + blob_info.l1_batch_number, + inclusion_latency.num_seconds() + ); + + Ok(()) + } +} + +async fn retry( + max_retries: u16, + batch_number: L1BatchNumber, + mut f: F, +) -> Result +where + Fut: Future>, + F: FnMut() -> Fut, +{ + let mut retries = 1; + let mut backoff_secs = 1; + loop { + match f().await { + Ok(result) => { + METRICS.dispatch_call_retries.observe(retries as usize); + return Ok(result); + } + Err(err) => { + if !err.is_transient() || retries > max_retries { + return Err(err); + } + + retries += 1; + let sleep_duration = Duration::from_secs(backoff_secs) + .mul_f32(rand::thread_rng().gen_range(0.8..1.2)); + tracing::warn!(%err, "Failed DA dispatch request {retries}/{max_retries} for batch {batch_number}, retrying in {} milliseconds.", sleep_duration.as_millis()); + tokio::time::sleep(sleep_duration).await; + + backoff_secs = (backoff_secs * 2).min(128); // cap the back-off at 128 seconds + } + } + } +} diff --git a/core/node/da_dispatcher/src/lib.rs b/core/node/da_dispatcher/src/lib.rs new file mode 100644 index 000000000000..cb41ea1f7c25 --- /dev/null +++ b/core/node/da_dispatcher/src/lib.rs @@ -0,0 +1,4 @@ +pub use self::da_dispatcher::DataAvailabilityDispatcher; + +mod da_dispatcher; +mod metrics; diff --git a/core/node/da_dispatcher/src/metrics.rs b/core/node/da_dispatcher/src/metrics.rs new file mode 100644 index 000000000000..67ac5ed68222 --- /dev/null +++ b/core/node/da_dispatcher/src/metrics.rs @@ -0,0 +1,33 @@ +use std::time::Duration; + +use vise::{Buckets, Gauge, Histogram, Metrics, Unit}; + +/// Buckets for `blob_dispatch_latency` (from 0.1 to 120 seconds). +const DISPATCH_LATENCIES: Buckets = + Buckets::values(&[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0]); + +#[derive(Debug, Metrics)] +#[metrics(prefix = "server_da_dispatcher")] +pub(super) struct DataAvailabilityDispatcherMetrics { + /// Latency of the dispatch of the blob. + #[metrics(buckets = DISPATCH_LATENCIES, unit = Unit::Seconds)] + pub blob_dispatch_latency: Histogram, + /// The duration between the moment when the blob is dispatched and the moment when it is included. + #[metrics(buckets = Buckets::LATENCIES)] + pub inclusion_latency: Histogram, + /// Size of the dispatched blob. + /// Buckets are bytes ranging from 1 KB to 16 MB, which has to satisfy all blob size values. + #[metrics(buckets = Buckets::exponential(1_024.0..=16.0 * 1_024.0 * 1_024.0, 2.0), unit = Unit::Bytes)] + pub blob_size: Histogram, + + /// Number of transactions resent by the DA dispatcher. + #[metrics(buckets = Buckets::linear(0.0..=10.0, 1.0))] + pub dispatch_call_retries: Histogram, + /// Last L1 batch that was dispatched to the DA layer. + pub last_dispatched_l1_batch: Gauge, + /// Last L1 batch that has its inclusion finalized by DA layer. + pub last_included_l1_batch: Gauge, +} + +#[vise::register] +pub(super) static METRICS: vise::Global = vise::Global::new(); diff --git a/core/node/eth_sender/src/aggregator.rs b/core/node/eth_sender/src/aggregator.rs index 966c9d1f1907..de6a6982088b 100644 --- a/core/node/eth_sender/src/aggregator.rs +++ b/core/node/eth_sender/src/aggregator.rs @@ -216,6 +216,7 @@ impl Aggregator { base_system_contracts_hashes.bootloader, base_system_contracts_hashes.default_aa, protocol_version_id, + self.commitment_mode != L1BatchCommitmentMode::Rollup, ) .await .unwrap() diff --git a/core/node/fee_model/src/l1_gas_price/gas_adjuster/mod.rs b/core/node/fee_model/src/l1_gas_price/gas_adjuster/mod.rs index a3a1ed78e5b7..2032cb9c89fd 100644 --- a/core/node/fee_model/src/l1_gas_price/gas_adjuster/mod.rs +++ b/core/node/fee_model/src/l1_gas_price/gas_adjuster/mod.rs @@ -202,6 +202,11 @@ impl GasAdjuster { PubdataSendingMode::Calldata => { self.estimate_effective_gas_price() * self.pubdata_byte_gas() } + PubdataSendingMode::Custom => { + // Fix this when we have a better understanding of dynamic pricing for custom DA layers. + // GitHub issue: https://github.com/matter-labs/zksync-era/issues/2105 + 0 + } } } diff --git a/core/node/node_framework/Cargo.toml b/core/node/node_framework/Cargo.toml index f6ce714178f8..d6a2e463a533 100644 --- a/core/node/node_framework/Cargo.toml +++ b/core/node/node_framework/Cargo.toml @@ -34,6 +34,8 @@ zksync_commitment_generator.workspace = true zksync_house_keeper.workspace = true zksync_node_fee_model.workspace = true zksync_eth_sender.workspace = true +zksync_da_client.workspace = true +zksync_da_dispatcher.workspace = true zksync_block_reverter.workspace = true zksync_state_keeper.workspace = true zksync_consistency_checker.workspace = true diff --git a/core/node/node_framework/src/implementations/layers/da_dispatcher.rs b/core/node/node_framework/src/implementations/layers/da_dispatcher.rs new file mode 100644 index 000000000000..d1ba66b6ddd3 --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/da_dispatcher.rs @@ -0,0 +1,70 @@ +use zksync_config::configs::{chain::StateKeeperConfig, da_dispatcher::DADispatcherConfig}; +use zksync_da_dispatcher::DataAvailabilityDispatcher; + +use crate::{ + implementations::resources::{ + da_client::DAClientResource, + pools::{MasterPool, PoolResource}, + }, + service::{ServiceContext, StopReceiver}, + task::{Task, TaskId}, + wiring_layer::{WiringError, WiringLayer}, +}; + +/// A layer that wires the data availability dispatcher task. +#[derive(Debug)] +pub struct DataAvailabilityDispatcherLayer { + state_keeper_config: StateKeeperConfig, + da_config: DADispatcherConfig, +} + +impl DataAvailabilityDispatcherLayer { + pub fn new(state_keeper_config: StateKeeperConfig, da_config: DADispatcherConfig) -> Self { + Self { + state_keeper_config, + da_config, + } + } +} + +#[async_trait::async_trait] +impl WiringLayer for DataAvailabilityDispatcherLayer { + fn layer_name(&self) -> &'static str { + "da_dispatcher_layer" + } + + async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { + let master_pool_resource = context.get_resource::>()?; + // A pool with size 2 is used here because there are 2 functions within a task that execute in parallel + let master_pool = master_pool_resource.get_custom(2).await?; + let da_client = context.get_resource::()?.0; + + if let Some(limit) = da_client.blob_size_limit() { + if self.state_keeper_config.max_pubdata_per_batch > limit as u64 { + return Err(WiringError::Configuration(format!( + "Max pubdata per batch is greater than the blob size limit: {} > {}", + self.state_keeper_config.max_pubdata_per_batch, limit + ))); + } + } + + context.add_task(DataAvailabilityDispatcher::new( + master_pool, + self.da_config, + da_client, + )); + + Ok(()) + } +} + +#[async_trait::async_trait] +impl Task for DataAvailabilityDispatcher { + fn id(&self) -> TaskId { + "da_dispatcher".into() + } + + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + (*self).run(stop_receiver.0).await + } +} diff --git a/core/node/node_framework/src/implementations/layers/mod.rs b/core/node/node_framework/src/implementations/layers/mod.rs index 8637f15459d5..f822ef5cc909 100644 --- a/core/node/node_framework/src/implementations/layers/mod.rs +++ b/core/node/node_framework/src/implementations/layers/mod.rs @@ -4,6 +4,7 @@ pub mod commitment_generator; pub mod consensus; pub mod consistency_checker; pub mod contract_verification_api; +pub mod da_dispatcher; pub mod eth_sender; pub mod eth_watch; pub mod healtcheck_server; diff --git a/core/node/node_framework/src/implementations/resources/da_client.rs b/core/node/node_framework/src/implementations/resources/da_client.rs new file mode 100644 index 000000000000..525164cb9b10 --- /dev/null +++ b/core/node/node_framework/src/implementations/resources/da_client.rs @@ -0,0 +1,13 @@ +use zksync_da_client::DataAvailabilityClient; + +use crate::resource::Resource; + +/// Represents a client of a certain DA solution. +#[derive(Clone)] +pub struct DAClientResource(pub Box); + +impl Resource for DAClientResource { + fn name() -> String { + "common/da_client".into() + } +} diff --git a/core/node/node_framework/src/implementations/resources/mod.rs b/core/node/node_framework/src/implementations/resources/mod.rs index edfb280d4db7..ac090d551316 100644 --- a/core/node/node_framework/src/implementations/resources/mod.rs +++ b/core/node/node_framework/src/implementations/resources/mod.rs @@ -1,5 +1,6 @@ pub mod action_queue; pub mod circuit_breakers; +pub mod da_client; pub mod eth_interface; pub mod fee_input; pub mod healthcheck; diff --git a/core/node/shared_metrics/src/lib.rs b/core/node/shared_metrics/src/lib.rs index 22a90349191d..e0a7fa74ef42 100644 --- a/core/node/shared_metrics/src/lib.rs +++ b/core/node/shared_metrics/src/lib.rs @@ -31,6 +31,7 @@ pub enum InitStage { Tree, TeeVerifierInputProducer, Consensus, + DADispatcher, } impl fmt::Display for InitStage { @@ -46,6 +47,7 @@ impl fmt::Display for InitStage { Self::Tree => formatter.write_str("tree"), Self::TeeVerifierInputProducer => formatter.write_str("tee_verifier_input_producer"), Self::Consensus => formatter.write_str("consensus"), + Self::DADispatcher => formatter.write_str("da_dispatcher"), } } } diff --git a/core/tests/revert-test/tests/revert-and-restart-en.test.ts b/core/tests/revert-test/tests/revert-and-restart-en.test.ts index 27c04c8be640..02174c25e271 100644 --- a/core/tests/revert-test/tests/revert-and-restart-en.test.ts +++ b/core/tests/revert-test/tests/revert-and-restart-en.test.ts @@ -137,10 +137,11 @@ class MainNode { env.DATABASE_MERKLE_TREE_MODE = 'full'; console.log(`DATABASE_URL = ${env.DATABASE_URL}`); - let components = 'api,tree,eth,state_keeper,commitment_generator'; + let components = 'api,tree,eth,state_keeper,commitment_generator,da_dispatcher'; if (enableConsensus) { components += ',consensus'; } + let proc = spawn('./target/release/zksync_server', ['--components', components], { cwd: env.ZKSYNC_HOME, stdio: [null, logs, logs], diff --git a/core/tests/revert-test/tests/revert-and-restart.test.ts b/core/tests/revert-test/tests/revert-and-restart.test.ts index 1ce788cb2cc8..9c781e02a69e 100644 --- a/core/tests/revert-test/tests/revert-and-restart.test.ts +++ b/core/tests/revert-test/tests/revert-and-restart.test.ts @@ -70,9 +70,9 @@ describe('Block reverting test', function () { const pathToHome = path.join(__dirname, '../../../..'); - let enable_consensus = process.env.ENABLE_CONSENSUS == 'true'; - let components = 'api,tree,eth,state_keeper,commitment_generator'; - if (enable_consensus) { + const enableConsensus = process.env.ENABLE_CONSENSUS == 'true'; + let components = 'api,tree,eth,state_keeper,commitment_generator,da_dispatcher'; + if (enableConsensus) { components += ',consensus'; } diff --git a/core/tests/ts-integration/tests/fees.test.ts b/core/tests/ts-integration/tests/fees.test.ts index e4610d3f2c3c..91133705a218 100644 --- a/core/tests/ts-integration/tests/fees.test.ts +++ b/core/tests/ts-integration/tests/fees.test.ts @@ -298,7 +298,7 @@ async function setInternalL1GasPrice( } catch (_) {} // Run server in background. - let command = 'zk server --components api,tree,eth,state_keeper'; + let command = 'zk server --components api,tree,eth,state_keeper,da_dispatcher'; command = `DATABASE_MERKLE_TREE_MODE=full ${command}`; if (newPubdataPrice) { diff --git a/core/tests/upgrade-test/tests/upgrade.test.ts b/core/tests/upgrade-test/tests/upgrade.test.ts index 0da90464b421..d08319c6e334 100644 --- a/core/tests/upgrade-test/tests/upgrade.test.ts +++ b/core/tests/upgrade-test/tests/upgrade.test.ts @@ -28,6 +28,8 @@ const STATE_TRANSITON_MANAGER = new ethers.utils.Interface( require(`${L1_CONTRACTS_FOLDER}/state-transition/StateTransitionManager.sol/StateTransitionManager.json`).abi ); +let serverComponents = 'api,tree,eth,state_keeper,commitment_generator,da_dispatcher'; + const depositAmount = ethers.utils.parseEther('0.001'); describe('Upgrade test', function () { @@ -68,8 +70,7 @@ describe('Upgrade test', function () { process.env.CHAIN_STATE_KEEPER_BLOCK_COMMIT_DEADLINE_MS = '2000'; // Run server in background. utils.background({ - command: - 'cd $ZKSYNC_HOME && cargo run --bin zksync_server --release -- --components=api,tree,eth,state_keeper,commitment_generator', + command: `cd $ZKSYNC_HOME && cargo run --bin zksync_server --release -- --components=${serverComponents}`, stdio: [null, logs, logs] }); // Server may need some time to recompile if it's a cold run, so wait for it. @@ -265,8 +266,7 @@ describe('Upgrade test', function () { // Run again. utils.background({ - command: - 'cd $ZKSYNC_HOME && zk f cargo run --bin zksync_server --release -- --components=api,tree,eth,state_keeper,commitment_generator &> upgrade.log', + command: `cd $ZKSYNC_HOME && zk f cargo run --bin zksync_server --release -- --components=${serverComponents} &> upgrade.log`, stdio: [null, logs, logs] }); await utils.sleep(10); diff --git a/etc/env/configs/dev_validium.toml b/etc/env/configs/dev_validium.toml index d1b415180bce..5ed4ccb38e41 100644 --- a/etc/env/configs/dev_validium.toml +++ b/etc/env/configs/dev_validium.toml @@ -10,6 +10,9 @@ max_pubdata_per_batch=100000 fee_model_version="V2" l1_batch_commit_data_generator_mode="Validium" +[eth_sender] +sender_pubdata_sending_mode="Custom" + # This override will be removed soon but it is needed for now. [eth_sender.gas_adjuster] max_blob_base_fee=0 diff --git a/etc/env/configs/dev_validium_docker.toml b/etc/env/configs/dev_validium_docker.toml index 4392ca8d2711..7e985cb974ab 100644 --- a/etc/env/configs/dev_validium_docker.toml +++ b/etc/env/configs/dev_validium_docker.toml @@ -19,6 +19,9 @@ fee_model_version = "V2" l1_batch_commit_data_generator_mode = "Validium" miniblock_iteration_interval = 50 +[eth_sender] +sender_pubdata_sending_mode="Custom" + [eth_client] web3_url = "http://reth:8545" diff --git a/prover/config/src/lib.rs b/prover/config/src/lib.rs index 2c05b57e16cf..ac9ebc911b6d 100644 --- a/prover/config/src/lib.rs +++ b/prover/config/src/lib.rs @@ -8,10 +8,10 @@ use zksync_config::{ }, fri_prover_group::FriProverGroupConfig, house_keeper::HouseKeeperConfig, - DatabaseSecrets, FriProofCompressorConfig, FriProverConfig, FriProverGatewayConfig, - FriWitnessGeneratorConfig, FriWitnessVectorGeneratorConfig, GeneralConfig, - ObjectStoreConfig, ObservabilityConfig, PrometheusConfig, ProofDataHandlerConfig, - ProtectiveReadsWriterConfig, + DADispatcherConfig, DatabaseSecrets, FriProofCompressorConfig, FriProverConfig, + FriProverGatewayConfig, FriWitnessGeneratorConfig, FriWitnessVectorGeneratorConfig, + GeneralConfig, ObjectStoreConfig, ObservabilityConfig, PrometheusConfig, + ProofDataHandlerConfig, ProtectiveReadsWriterConfig, }, ApiConfig, ContractVerifierConfig, DBConfig, EthConfig, EthWatchConfig, GasAdjusterConfig, PostgresConfig, SnapshotsCreatorConfig, @@ -48,6 +48,7 @@ fn load_env_config() -> anyhow::Result { gas_adjuster_config: GasAdjusterConfig::from_env().ok(), observability: ObservabilityConfig::from_env().ok(), snapshot_creator: SnapshotsCreatorConfig::from_env().ok(), + da_dispatcher_config: DADispatcherConfig::from_env().ok(), protective_reads_writer_config: ProtectiveReadsWriterConfig::from_env().ok(), core_object_store: ObjectStoreConfig::from_env().ok(), commitment_generator: None,