From b908eaed7ffafe037ebac08f204dad41d9409bfe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Grze=C5=9Bkiewicz?= Date: Thu, 5 Sep 2024 18:39:53 +0200 Subject: [PATCH] feat(eth-watch): redesign to support Gateway (#2775) Signed-off-by: tomg10 --- .github/workflows/ci-core-reusable.yml | 1 - Cargo.lock | 1 + core/bin/external_node/src/node_builder.rs | 3 +- core/bin/zksync_server/src/main.rs | 13 +- core/bin/zksync_server/src/node_builder.rs | 17 +- core/lib/config/src/configs/eth_sender.rs | 8 + core/lib/config/src/configs/secrets.rs | 1 + core/lib/config/src/configs/wallets.rs | 2 + core/lib/config/src/testonly.rs | 2 + core/lib/contracts/src/lib.rs | 9 ++ ...240b34ce29aad3ac6571116e084d45574c448.json | 34 ++++ ...0c6286fcc824e84bb40a6e9f289c34b85fded.json | 128 --------------- ...bab30553732953e589cd237595227044f438d.json | 130 --------------- ...b9863082ccd1ce45b2d20e1119f1e78171d66.json | 27 ++++ ...ba3af74e8e7b5944cb2943b5badb906167046.json | 32 ---- ...93639a63047f46dc8e605ff37b78f43f5cef5.json | 27 ++++ ...50_add_eth_watcher_progress_table.down.sql | 4 + ...5550_add_eth_watcher_progress_table.up.sql | 9 ++ core/lib/dal/src/eth_watcher_dal.rs | 153 ++++++++++++++++++ core/lib/dal/src/lib.rs | 12 +- core/lib/env_config/src/contracts.rs | 34 +++- core/lib/env_config/src/eth_sender.rs | 5 + core/lib/env_config/src/lib.rs | 4 + core/lib/env_config/src/wallets.rs | 11 ++ .../src/proto/config/secrets.proto | 1 + core/lib/protobuf_config/src/secrets.rs | 9 ++ core/lib/protobuf_config/src/wallets.rs | 1 + .../src/temp_config_store/mod.rs | 4 + core/node/eth_watch/Cargo.toml | 1 + core/node/eth_watch/src/client.rs | 38 +++-- .../decentralized_upgrades.rs | 26 +-- .../event_processors/governance_upgrades.rs | 23 ++- .../eth_watch/src/event_processors/mod.rs | 16 +- .../src/event_processors/priority_ops.rs | 34 ++-- core/node/eth_watch/src/lib.rs | 132 ++++++++------- core/node/eth_watch/src/metrics.rs | 1 - core/node/eth_watch/src/tests.rs | 147 ++++++++++++++--- .../layers/eth_sender/aggregator.rs | 34 +++- .../layers/eth_sender/manager.rs | 27 ++-- .../src/implementations/layers/eth_watch.rs | 55 ++++++- .../layers/pk_signing_eth_client.rs | 25 ++- .../layers/query_eth_client.rs | 40 +++-- .../resources/eth_interface.rs | 8 + .../tests/ts-integration/src/context-owner.ts | 8 +- infrastructure/zk/src/config.ts | 1 + infrastructure/zk/src/contract.ts | 13 +- .../commands/external_node/prepare_configs.rs | 1 + 47 files changed, 826 insertions(+), 486 deletions(-) create mode 100644 core/lib/dal/.sqlx/query-26c80e9bafcf7989e7d40c6e424240b34ce29aad3ac6571116e084d45574c448.json delete mode 100644 core/lib/dal/.sqlx/query-39a105cba1be0ec8f2b2b88d2f10c6286fcc824e84bb40a6e9f289c34b85fded.json delete mode 100644 core/lib/dal/.sqlx/query-45e52d05a4483def84c141e3529bab30553732953e589cd237595227044f438d.json create mode 100644 core/lib/dal/.sqlx/query-afdeecb78e3af802c2b8ffb0f5ab9863082ccd1ce45b2d20e1119f1e78171d66.json delete mode 100644 core/lib/dal/.sqlx/query-c4835d40921af47bfb4f60102bbba3af74e8e7b5944cb2943b5badb906167046.json create mode 100644 core/lib/dal/.sqlx/query-c61682ed92c1a43855a991598d593639a63047f46dc8e605ff37b78f43f5cef5.json create mode 100644 core/lib/dal/migrations/20240830085550_add_eth_watcher_progress_table.down.sql create mode 100644 core/lib/dal/migrations/20240830085550_add_eth_watcher_progress_table.up.sql create mode 100644 core/lib/dal/src/eth_watcher_dal.rs diff --git a/.github/workflows/ci-core-reusable.yml b/.github/workflows/ci-core-reusable.yml index 56de305ca4b2..638a312c32fb 100644 --- a/.github/workflows/ci-core-reusable.yml +++ b/.github/workflows/ci-core-reusable.yml @@ -254,7 +254,6 @@ jobs: ci_run zk contract migrate-to-sync-layer ci_run zk contract prepare-sync-layer-validators ci_run zk contract update-config-for-sync-layer - ci_run zk server --clear-l1-txs-history ci_run sleep 120 ci_run zk server >> server2.log 2>&1 & ci_run sleep 5 diff --git a/Cargo.lock b/Cargo.lock index 10f89d30974b..63ae039ac818 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8635,6 +8635,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "test-log", "thiserror", "tokio", "tracing", diff --git a/core/bin/external_node/src/node_builder.rs b/core/bin/external_node/src/node_builder.rs index 8b54b66dc336..e371894e9848 100644 --- a/core/bin/external_node/src/node_builder.rs +++ b/core/bin/external_node/src/node_builder.rs @@ -182,8 +182,7 @@ impl ExternalNodeBuilder { let query_eth_client_layer = QueryEthClientLayer::new( self.config.required.settlement_layer_id(), self.config.required.eth_client_url.clone(), - // TODO(EVM-676): add this config for external node - Default::default(), + self.config.optional.gateway_url.clone(), ); self.node.add_layer(query_eth_client_layer); Ok(self) diff --git a/core/bin/zksync_server/src/main.rs b/core/bin/zksync_server/src/main.rs index 3fae0d91c1bb..f485a8704771 100644 --- a/core/bin/zksync_server/src/main.rs +++ b/core/bin/zksync_server/src/main.rs @@ -26,7 +26,7 @@ use zksync_core_leftovers::{ temp_config_store::{decode_yaml_repr, TempConfigStore}, Component, Components, }; -use zksync_env_config::FromEnv; +use zksync_env_config::{FromEnv, FromEnvVariant}; use crate::node_builder::MainNodeBuilder; @@ -143,6 +143,8 @@ fn main() -> anyhow::Result<()> { } }; + let gateway_contracts_config = ContractsConfig::from_env_variant("GATEWAY_".to_string()).ok(); + let genesis = match opt.genesis_path { None => GenesisConfig::from_env().context("Genesis config")?, Some(path) => { @@ -174,7 +176,14 @@ fn main() -> anyhow::Result<()> { return Ok(()); } - let node = MainNodeBuilder::new(configs, wallets, genesis, contracts_config, secrets)?; + let node = MainNodeBuilder::new( + configs, + wallets, + genesis, + contracts_config, + gateway_contracts_config, + secrets, + )?; let observability_guard = { // Observability initialization should be performed within tokio context. diff --git a/core/bin/zksync_server/src/node_builder.rs b/core/bin/zksync_server/src/node_builder.rs index 0b891ad1d64f..0f7b0cb8be37 100644 --- a/core/bin/zksync_server/src/node_builder.rs +++ b/core/bin/zksync_server/src/node_builder.rs @@ -87,6 +87,7 @@ pub struct MainNodeBuilder { wallets: Wallets, genesis_config: GenesisConfig, contracts_config: ContractsConfig, + gateway_contracts_config: Option, secrets: Secrets, } @@ -96,6 +97,7 @@ impl MainNodeBuilder { wallets: Wallets, genesis_config: GenesisConfig, contracts_config: ContractsConfig, + gateway_contracts_config: Option, secrets: Secrets, ) -> anyhow::Result { Ok(Self { @@ -104,6 +106,7 @@ impl MainNodeBuilder { wallets, genesis_config, contracts_config, + gateway_contracts_config, secrets, }) } @@ -147,6 +150,7 @@ impl MainNodeBuilder { self.node.add_layer(PKSigningEthClientLayer::new( eth_config, self.contracts_config.clone(), + self.gateway_contracts_config.clone(), self.genesis_config.settlement_layer_id(), wallets, )); @@ -159,11 +163,7 @@ impl MainNodeBuilder { let query_eth_client_layer = QueryEthClientLayer::new( genesis.settlement_layer_id(), eth_config.l1_rpc_url, - self.configs - .eth - .as_ref() - .and_then(|x| Some(x.gas_adjuster?.settlement_mode)) - .unwrap_or(SettlementMode::SettlesToL1), + eth_config.gateway_url, ); self.node.add_layer(query_eth_client_layer); Ok(self) @@ -294,6 +294,12 @@ impl MainNodeBuilder { .add_layer(EthWatchLayer::new( try_load_config!(eth_config.watcher), self.contracts_config.clone(), + self.gateway_contracts_config.clone(), + self.configs + .eth + .as_ref() + .and_then(|x| Some(x.gas_adjuster?.settlement_mode)) + .unwrap_or(SettlementMode::SettlesToL1), )); Ok(self) } @@ -454,6 +460,7 @@ impl MainNodeBuilder { .add_layer(EthTxAggregatorLayer::new( eth_sender_config, self.contracts_config.clone(), + self.gateway_contracts_config.clone(), self.genesis_config.l2_chain_id, self.genesis_config.l1_batch_commit_data_generator_mode, self.configs diff --git a/core/lib/config/src/configs/eth_sender.rs b/core/lib/config/src/configs/eth_sender.rs index 208b6b1bebad..51f7736fbb7c 100644 --- a/core/lib/config/src/configs/eth_sender.rs +++ b/core/lib/config/src/configs/eth_sender.rs @@ -169,6 +169,14 @@ impl SenderConfig { .map(|pk| pk.parse().unwrap()) } + // Don't load gateway private key, if it's not required + #[deprecated] + pub fn private_key_gateway(&self) -> Option { + std::env::var("ETH_SENDER_SENDER_OPERATOR_GATEWAY_PRIVATE_KEY") + .ok() + .map(|pk| pk.parse().unwrap()) + } + const fn default_tx_aggregation_paused() -> bool { false } diff --git a/core/lib/config/src/configs/secrets.rs b/core/lib/config/src/configs/secrets.rs index 71197f5d9306..4c53913b538b 100644 --- a/core/lib/config/src/configs/secrets.rs +++ b/core/lib/config/src/configs/secrets.rs @@ -13,6 +13,7 @@ pub struct DatabaseSecrets { #[derive(Debug, Clone, PartialEq)] pub struct L1Secrets { pub l1_rpc_url: SensitiveUrl, + pub gateway_url: Option, } #[derive(Debug, Clone, PartialEq)] diff --git a/core/lib/config/src/configs/wallets.rs b/core/lib/config/src/configs/wallets.rs index 4cb5358c8f30..90ddd90faedf 100644 --- a/core/lib/config/src/configs/wallets.rs +++ b/core/lib/config/src/configs/wallets.rs @@ -62,6 +62,7 @@ impl Wallet { pub struct EthSender { pub operator: Wallet, pub blob_operator: Option, + pub gateway: Option, } #[derive(Debug, Clone, PartialEq)] @@ -89,6 +90,7 @@ impl Wallets { blob_operator: Some( Wallet::from_private_key_bytes(H256::repeat_byte(0x2), None).unwrap(), ), + gateway: None, }), state_keeper: Some(StateKeeper { fee_account: AddressWallet::from_address(H160::repeat_byte(0x3)), diff --git a/core/lib/config/src/testonly.rs b/core/lib/config/src/testonly.rs index 911451a5a348..43b44ded2947 100644 --- a/core/lib/config/src/testonly.rs +++ b/core/lib/config/src/testonly.rs @@ -849,6 +849,7 @@ impl Distribution for EncodeDist { use configs::secrets::L1Secrets; L1Secrets { l1_rpc_url: format!("localhost:{}", rng.gen::()).parse().unwrap(), + gateway_url: Some(format!("localhost:{}", rng.gen::()).parse().unwrap()), } } } @@ -900,6 +901,7 @@ impl Distribution for EncodeDist { configs::wallets::EthSender { operator: self.sample(rng), blob_operator: self.sample_opt(|| self.sample(rng)), + gateway: None, } } } diff --git a/core/lib/contracts/src/lib.rs b/core/lib/contracts/src/lib.rs index a5d376c9ef85..1a764d63353c 100644 --- a/core/lib/contracts/src/lib.rs +++ b/core/lib/contracts/src/lib.rs @@ -51,6 +51,11 @@ const CHAIN_ADMIN_CONTRACT_FILE: (&str, &str) = ("governance", "IChainAdmin.sol/ const MULTICALL3_CONTRACT_FILE: (&str, &str) = ("dev-contracts", "Multicall3.sol/Multicall3.json"); const VERIFIER_CONTRACT_FILE: (&str, &str) = ("state-transition", "Verifier.sol/Verifier.json"); + +const GETTERS_CONTRACT_FILE: (&str, &str) = ( + "state-transition/chain-interfaces", + "IGetters.sol/IGetters.json", +); const _IERC20_CONTRACT_FILE: &str = "contracts/l1-contracts/artifacts/contracts/common/interfaces/IERC20.sol/IERC20.json"; const _FAIL_ON_RECEIVE_CONTRACT_FILE: &str = @@ -161,6 +166,10 @@ pub fn verifier_contract() -> Contract { load_contract_for_both_compilers(VERIFIER_CONTRACT_FILE) } +pub fn getters_contract() -> Contract { + load_contract_for_both_compilers(GETTERS_CONTRACT_FILE) +} + #[derive(Debug, Clone)] pub struct TestContract { /// Contract bytecode to be used for sending deploy transaction. diff --git a/core/lib/dal/.sqlx/query-26c80e9bafcf7989e7d40c6e424240b34ce29aad3ac6571116e084d45574c448.json b/core/lib/dal/.sqlx/query-26c80e9bafcf7989e7d40c6e424240b34ce29aad3ac6571116e084d45574c448.json new file mode 100644 index 000000000000..c85ddccead5a --- /dev/null +++ b/core/lib/dal/.sqlx/query-26c80e9bafcf7989e7d40c6e424240b34ce29aad3ac6571116e084d45574c448.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n next_block_to_process\n FROM\n processed_events\n WHERE\n TYPE = $1\n AND chain_id = $2\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "next_block_to_process", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + { + "Custom": { + "name": "event_type", + "kind": { + "Enum": [ + "ProtocolUpgrades", + "PriorityTransactions", + "GovernanceUpgrades" + ] + } + } + }, + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "26c80e9bafcf7989e7d40c6e424240b34ce29aad3ac6571116e084d45574c448" +} diff --git a/core/lib/dal/.sqlx/query-39a105cba1be0ec8f2b2b88d2f10c6286fcc824e84bb40a6e9f289c34b85fded.json b/core/lib/dal/.sqlx/query-39a105cba1be0ec8f2b2b88d2f10c6286fcc824e84bb40a6e9f289c34b85fded.json deleted file mode 100644 index 2b094a5f24fd..000000000000 --- a/core/lib/dal/.sqlx/query-39a105cba1be0ec8f2b2b88d2f10c6286fcc824e84bb40a6e9f289c34b85fded.json +++ /dev/null @@ -1,128 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n number,\n timestamp,\n hash,\n l1_tx_count,\n l2_tx_count,\n fee_account_address AS \"fee_account_address!\",\n l2_da_validator_address AS \"l2_da_validator_address!\",\n pubdata_type AS \"pubdata_type!\",\n base_fee_per_gas,\n l1_gas_price,\n l2_fair_gas_price,\n gas_per_pubdata_limit,\n bootloader_code_hash,\n default_aa_code_hash,\n protocol_version,\n virtual_blocks,\n fair_pubdata_price,\n gas_limit,\n logs_bloom\n FROM\n miniblocks\n ORDER BY\n number DESC\n LIMIT\n 1\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "number", - "type_info": "Int8" - }, - { - "ordinal": 1, - "name": "timestamp", - "type_info": "Int8" - }, - { - "ordinal": 2, - "name": "hash", - "type_info": "Bytea" - }, - { - "ordinal": 3, - "name": "l1_tx_count", - "type_info": "Int4" - }, - { - "ordinal": 4, - "name": "l2_tx_count", - "type_info": "Int4" - }, - { - "ordinal": 5, - "name": "fee_account_address!", - "type_info": "Bytea" - }, - { - "ordinal": 6, - "name": "l2_da_validator_address!", - "type_info": "Bytea" - }, - { - "ordinal": 7, - "name": "pubdata_type!", - "type_info": "Text" - }, - { - "ordinal": 8, - "name": "base_fee_per_gas", - "type_info": "Numeric" - }, - { - "ordinal": 9, - "name": "l1_gas_price", - "type_info": "Int8" - }, - { - "ordinal": 10, - "name": "l2_fair_gas_price", - "type_info": "Int8" - }, - { - "ordinal": 11, - "name": "gas_per_pubdata_limit", - "type_info": "Int8" - }, - { - "ordinal": 12, - "name": "bootloader_code_hash", - "type_info": "Bytea" - }, - { - "ordinal": 13, - "name": "default_aa_code_hash", - "type_info": "Bytea" - }, - { - "ordinal": 14, - "name": "protocol_version", - "type_info": "Int4" - }, - { - "ordinal": 15, - "name": "virtual_blocks", - "type_info": "Int8" - }, - { - "ordinal": 16, - "name": "fair_pubdata_price", - "type_info": "Int8" - }, - { - "ordinal": 17, - "name": "gas_limit", - "type_info": "Int8" - }, - { - "ordinal": 18, - "name": "logs_bloom", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - false, - false, - false, - false, - false, - false, - false, - false, - false, - false, - false, - false, - true, - true, - true, - false, - true, - true, - true - ] - }, - "hash": "269e5901aaa362ed011a2e968d2bc8cc8877e5d1d9c2d9b04953fa7d89155b40" -} diff --git a/core/lib/dal/.sqlx/query-45e52d05a4483def84c141e3529bab30553732953e589cd237595227044f438d.json b/core/lib/dal/.sqlx/query-45e52d05a4483def84c141e3529bab30553732953e589cd237595227044f438d.json deleted file mode 100644 index eb6d9c3640df..000000000000 --- a/core/lib/dal/.sqlx/query-45e52d05a4483def84c141e3529bab30553732953e589cd237595227044f438d.json +++ /dev/null @@ -1,130 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n number,\n timestamp,\n hash,\n l1_tx_count,\n l2_tx_count,\n fee_account_address AS \"fee_account_address!\",\n l2_da_validator_address AS \"l2_da_validator_address!\",\n pubdata_type AS \"pubdata_type!\",\n base_fee_per_gas,\n l1_gas_price,\n l2_fair_gas_price,\n gas_per_pubdata_limit,\n bootloader_code_hash,\n default_aa_code_hash,\n protocol_version,\n virtual_blocks,\n fair_pubdata_price,\n gas_limit,\n logs_bloom\n FROM\n miniblocks\n WHERE\n number = $1\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "number", - "type_info": "Int8" - }, - { - "ordinal": 1, - "name": "timestamp", - "type_info": "Int8" - }, - { - "ordinal": 2, - "name": "hash", - "type_info": "Bytea" - }, - { - "ordinal": 3, - "name": "l1_tx_count", - "type_info": "Int4" - }, - { - "ordinal": 4, - "name": "l2_tx_count", - "type_info": "Int4" - }, - { - "ordinal": 5, - "name": "fee_account_address!", - "type_info": "Bytea" - }, - { - "ordinal": 6, - "name": "l2_da_validator_address!", - "type_info": "Bytea" - }, - { - "ordinal": 7, - "name": "pubdata_type!", - "type_info": "Text" - }, - { - "ordinal": 8, - "name": "base_fee_per_gas", - "type_info": "Numeric" - }, - { - "ordinal": 9, - "name": "l1_gas_price", - "type_info": "Int8" - }, - { - "ordinal": 10, - "name": "l2_fair_gas_price", - "type_info": "Int8" - }, - { - "ordinal": 11, - "name": "gas_per_pubdata_limit", - "type_info": "Int8" - }, - { - "ordinal": 12, - "name": "bootloader_code_hash", - "type_info": "Bytea" - }, - { - "ordinal": 13, - "name": "default_aa_code_hash", - "type_info": "Bytea" - }, - { - "ordinal": 14, - "name": "protocol_version", - "type_info": "Int4" - }, - { - "ordinal": 15, - "name": "virtual_blocks", - "type_info": "Int8" - }, - { - "ordinal": 16, - "name": "fair_pubdata_price", - "type_info": "Int8" - }, - { - "ordinal": 17, - "name": "gas_limit", - "type_info": "Int8" - }, - { - "ordinal": 18, - "name": "logs_bloom", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [ - false, - false, - false, - false, - false, - false, - false, - false, - false, - false, - false, - false, - true, - true, - true, - false, - true, - true, - true - ] - }, - "hash": "7af141a4533b332903b7ba5591b1c90ac9deb75cd2a542fe649d7830496a0756" -} diff --git a/core/lib/dal/.sqlx/query-afdeecb78e3af802c2b8ffb0f5ab9863082ccd1ce45b2d20e1119f1e78171d66.json b/core/lib/dal/.sqlx/query-afdeecb78e3af802c2b8ffb0f5ab9863082ccd1ce45b2d20e1119f1e78171d66.json new file mode 100644 index 000000000000..7bb4c0e0998e --- /dev/null +++ b/core/lib/dal/.sqlx/query-afdeecb78e3af802c2b8ffb0f5ab9863082ccd1ce45b2d20e1119f1e78171d66.json @@ -0,0 +1,27 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO\n processed_events (\n TYPE,\n chain_id,\n next_block_to_process\n )\n VALUES\n ($1, $2, $3)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + { + "Custom": { + "name": "event_type", + "kind": { + "Enum": [ + "ProtocolUpgrades", + "PriorityTransactions", + "GovernanceUpgrades" + ] + } + } + }, + "Int8", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "afdeecb78e3af802c2b8ffb0f5ab9863082ccd1ce45b2d20e1119f1e78171d66" +} diff --git a/core/lib/dal/.sqlx/query-c4835d40921af47bfb4f60102bbba3af74e8e7b5944cb2943b5badb906167046.json b/core/lib/dal/.sqlx/query-c4835d40921af47bfb4f60102bbba3af74e8e7b5944cb2943b5badb906167046.json deleted file mode 100644 index a70aa2eb4738..000000000000 --- a/core/lib/dal/.sqlx/query-c4835d40921af47bfb4f60102bbba3af74e8e7b5944cb2943b5badb906167046.json +++ /dev/null @@ -1,32 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO\n miniblocks (\n number,\n timestamp,\n hash,\n l1_tx_count,\n l2_tx_count,\n fee_account_address,\n base_fee_per_gas,\n l1_gas_price,\n l2_fair_gas_price,\n gas_per_pubdata_limit,\n bootloader_code_hash,\n default_aa_code_hash,\n protocol_version,\n virtual_blocks,\n fair_pubdata_price,\n gas_limit,\n l2_da_validator_address,\n pubdata_type,\n logs_bloom,\n created_at,\n updated_at\n )\n VALUES\n (\n $1,\n $2,\n $3,\n $4,\n $5,\n $6,\n $7,\n $8,\n $9,\n $10,\n $11,\n $12,\n $13,\n $14,\n $15,\n $16,\n $17,\n $18,\n $19,\n NOW(),\n NOW()\n )\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Int8", - "Bytea", - "Int4", - "Int4", - "Bytea", - "Numeric", - "Int8", - "Int8", - "Int8", - "Bytea", - "Bytea", - "Int4", - "Int8", - "Int8", - "Int8", - "Bytea", - "Text", - "Bytea" - ] - }, - "nullable": [] - }, - "hash": "7072d2bf702bb92a42ebba3732674b80631fa9dbfae927020e97e4159e399850" -} diff --git a/core/lib/dal/.sqlx/query-c61682ed92c1a43855a991598d593639a63047f46dc8e605ff37b78f43f5cef5.json b/core/lib/dal/.sqlx/query-c61682ed92c1a43855a991598d593639a63047f46dc8e605ff37b78f43f5cef5.json new file mode 100644 index 000000000000..9c3829ebe235 --- /dev/null +++ b/core/lib/dal/.sqlx/query-c61682ed92c1a43855a991598d593639a63047f46dc8e605ff37b78f43f5cef5.json @@ -0,0 +1,27 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE processed_events\n SET\n next_block_to_process = $3\n WHERE\n TYPE = $1\n AND chain_id = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + { + "Custom": { + "name": "event_type", + "kind": { + "Enum": [ + "ProtocolUpgrades", + "PriorityTransactions", + "GovernanceUpgrades" + ] + } + } + }, + "Int8", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "c61682ed92c1a43855a991598d593639a63047f46dc8e605ff37b78f43f5cef5" +} diff --git a/core/lib/dal/migrations/20240830085550_add_eth_watcher_progress_table.down.sql b/core/lib/dal/migrations/20240830085550_add_eth_watcher_progress_table.down.sql new file mode 100644 index 000000000000..79331481f589 --- /dev/null +++ b/core/lib/dal/migrations/20240830085550_add_eth_watcher_progress_table.down.sql @@ -0,0 +1,4 @@ +DROP TABLE IF EXISTS processed_events; + +DROP TYPE IF EXISTS event_type; + diff --git a/core/lib/dal/migrations/20240830085550_add_eth_watcher_progress_table.up.sql b/core/lib/dal/migrations/20240830085550_add_eth_watcher_progress_table.up.sql new file mode 100644 index 000000000000..f8cdd573c069 --- /dev/null +++ b/core/lib/dal/migrations/20240830085550_add_eth_watcher_progress_table.up.sql @@ -0,0 +1,9 @@ +CREATE TYPE event_type AS ENUM ('ProtocolUpgrades', 'PriorityTransactions', 'GovernanceUpgrades'); + +CREATE TABLE processed_events +( + type event_type NOT NULL, + chain_id BIGINT NOT NULL, + next_block_to_process BIGINT NOT NULL, + PRIMARY KEY (chain_id, type) +) diff --git a/core/lib/dal/src/eth_watcher_dal.rs b/core/lib/dal/src/eth_watcher_dal.rs new file mode 100644 index 000000000000..87fc4b778138 --- /dev/null +++ b/core/lib/dal/src/eth_watcher_dal.rs @@ -0,0 +1,153 @@ +use zksync_db_connection::{connection::Connection, error::DalResult, instrument::InstrumentExt}; +use zksync_types::SLChainId; + +use crate::Core; + +pub struct ProcessedEventsDal<'a, 'c> { + pub(crate) storage: &'a mut Connection<'c, Core>, +} + +#[derive(Debug, Copy, Clone, sqlx::Type)] +#[sqlx(type_name = "event_type")] +pub enum EventType { + ProtocolUpgrades, + PriorityTransactions, + GovernanceUpgrades, +} + +impl ProcessedEventsDal<'_, '_> { + pub async fn get_or_set_next_block_to_process( + &mut self, + event_type: EventType, + chain_id: SLChainId, + next_block_to_process: u64, + ) -> DalResult { + let result = sqlx::query!( + r#" + SELECT + next_block_to_process + FROM + processed_events + WHERE + TYPE = $1 + AND chain_id = $2 + "#, + event_type as EventType, + chain_id.0 as i64 + ) + .instrument("get_or_set_next_block_to_process") + .with_arg("event_type", &event_type) + .with_arg("chain_id", &chain_id) + .fetch_optional(self.storage) + .await?; + + if let Some(row) = result { + Ok(row.next_block_to_process as u64) + } else { + sqlx::query!( + r#" + INSERT INTO + processed_events ( + TYPE, + chain_id, + next_block_to_process + ) + VALUES + ($1, $2, $3) + "#, + event_type as EventType, + chain_id.0 as i64, + next_block_to_process as i64 + ) + .instrument("get_or_set_next_block_to_process - insert") + .with_arg("event_type", &event_type) + .with_arg("chain_id", &chain_id) + .execute(self.storage) + .await?; + + Ok(next_block_to_process) + } + } + + pub async fn update_next_block_to_process( + &mut self, + event_type: EventType, + chain_id: SLChainId, + next_block_to_process: u64, + ) -> DalResult<()> { + sqlx::query!( + r#" + UPDATE processed_events + SET + next_block_to_process = $3 + WHERE + TYPE = $1 + AND chain_id = $2 + "#, + event_type as EventType, + chain_id.0 as i64, + next_block_to_process as i64 + ) + .instrument("update_next_block_to_process") + .with_arg("event_type", &event_type) + .with_arg("chain_id", &chain_id) + .execute(self.storage) + .await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ConnectionPool, Core, CoreDal}; + + #[tokio::test] + async fn test_get_or_set_next_block_to_process_with_different_event_types() { + let pool = ConnectionPool::::test_pool().await; + let mut conn = pool.connection().await.unwrap(); + let mut dal = conn.processed_events_dal(); + + // Test with ProtocolUpgrades + let next_block = dal + .get_or_set_next_block_to_process(EventType::ProtocolUpgrades, SLChainId(1), 100) + .await + .expect("Failed to get or set next block to process"); + assert_eq!(next_block, 100); + + // Test with PriorityTransactions + let next_block = dal + .get_or_set_next_block_to_process(EventType::PriorityTransactions, SLChainId(1), 200) + .await + .expect("Failed to get or set next block to process"); + assert_eq!(next_block, 200); + + // Test with PriorityTransactions + let next_block = dal + .get_or_set_next_block_to_process(EventType::PriorityTransactions, SLChainId(2), 300) + .await + .expect("Failed to get or set next block to process"); + assert_eq!(next_block, 300); + + // Verify that the initial block is not updated for ProtocolUpgrades + let next_block = dal + .get_or_set_next_block_to_process(EventType::ProtocolUpgrades, SLChainId(1), 150) + .await + .expect("Failed to get or set next block to process"); + assert_eq!(next_block, 100); + + // Verify that the initial block is not updated for PriorityTransactions + let next_block = dal + .get_or_set_next_block_to_process(EventType::PriorityTransactions, SLChainId(1), 250) + .await + .expect("Failed to get or set next block to process"); + assert_eq!(next_block, 200); + + // Verify that the initial block is not updated for PriorityTransactions + let next_block = dal + .get_or_set_next_block_to_process(EventType::PriorityTransactions, SLChainId(2), 350) + .await + .expect("Failed to get or set next block to process"); + assert_eq!(next_block, 300); + } +} diff --git a/core/lib/dal/src/lib.rs b/core/lib/dal/src/lib.rs index 0e1badb9af76..6c96e7b5d49c 100644 --- a/core/lib/dal/src/lib.rs +++ b/core/lib/dal/src/lib.rs @@ -15,8 +15,9 @@ use crate::{ base_token_dal::BaseTokenDal, blocks_dal::BlocksDal, blocks_web3_dal::BlocksWeb3Dal, consensus_dal::ConsensusDal, contract_verification_dal::ContractVerificationDal, data_availability_dal::DataAvailabilityDal, eth_sender_dal::EthSenderDal, - events_dal::EventsDal, events_web3_dal::EventsWeb3Dal, factory_deps_dal::FactoryDepsDal, - proof_generation_dal::ProofGenerationDal, protocol_versions_dal::ProtocolVersionsDal, + eth_watcher_dal::ProcessedEventsDal, events_dal::EventsDal, events_web3_dal::EventsWeb3Dal, + factory_deps_dal::FactoryDepsDal, proof_generation_dal::ProofGenerationDal, + protocol_versions_dal::ProtocolVersionsDal, protocol_versions_web3_dal::ProtocolVersionsWeb3Dal, pruning_dal::PruningDal, snapshot_recovery_dal::SnapshotRecoveryDal, snapshots_creator_dal::SnapshotsCreatorDal, snapshots_dal::SnapshotsDal, storage_logs_dal::StorageLogsDal, @@ -61,6 +62,8 @@ pub mod transactions_dal; pub mod transactions_web3_dal; pub mod vm_runner_dal; +pub mod eth_watcher_dal; + #[cfg(test)] mod tests; @@ -132,6 +135,7 @@ where fn vm_runner_dal(&mut self) -> VmRunnerDal<'_, 'a>; fn base_token_dal(&mut self) -> BaseTokenDal<'_, 'a>; + fn processed_events_dal(&mut self) -> ProcessedEventsDal<'_, 'a>; } #[derive(Clone, Debug)] @@ -258,4 +262,8 @@ impl<'a> CoreDal<'a> for Connection<'a, Core> { fn base_token_dal(&mut self) -> BaseTokenDal<'_, 'a> { BaseTokenDal { storage: self } } + + fn processed_events_dal(&mut self) -> ProcessedEventsDal<'_, 'a> { + ProcessedEventsDal { storage: self } + } } diff --git a/core/lib/env_config/src/contracts.rs b/core/lib/env_config/src/contracts.rs index 13265cac9d52..d2f5534d3d88 100644 --- a/core/lib/env_config/src/contracts.rs +++ b/core/lib/env_config/src/contracts.rs @@ -1,22 +1,40 @@ use zksync_config::{configs::EcosystemContracts, ContractsConfig}; -use crate::{envy_load, FromEnv}; +use crate::{envy_load, FromEnv, FromEnvVariant}; impl FromEnv for EcosystemContracts { fn from_env() -> anyhow::Result { + Self::from_env_variant("".to_string()) + } +} +impl FromEnvVariant for EcosystemContracts { + fn from_env_variant(variant: String) -> anyhow::Result { Ok(Self { - bridgehub_proxy_addr: std::env::var("CONTRACTS_BRIDGEHUB_PROXY_ADDR")?.parse()?, - state_transition_proxy_addr: std::env::var("CONTRACTS_STATE_TRANSITION_PROXY_ADDR")? - .parse()?, - transparent_proxy_admin_addr: std::env::var("CONTRACTS_TRANSPARENT_PROXY_ADMIN_ADDR")? - .parse()?, + bridgehub_proxy_addr: std::env::var(format!( + "{variant}CONTRACTS_BRIDGEHUB_PROXY_ADDR" + ))? + .parse()?, + state_transition_proxy_addr: std::env::var(format!( + "{variant}CONTRACTS_STATE_TRANSITION_PROXY_ADDR" + ))? + .parse()?, + transparent_proxy_admin_addr: std::env::var(format!( + "{variant}CONTRACTS_TRANSPARENT_PROXY_ADMIN_ADDR" + ))? + .parse()?, }) } } impl FromEnv for ContractsConfig { fn from_env() -> anyhow::Result { - let mut contracts: ContractsConfig = envy_load("contracts", "CONTRACTS_")?; + Self::from_env_variant("".to_string()) + } +} +impl FromEnvVariant for ContractsConfig { + fn from_env_variant(variant: String) -> anyhow::Result { + let mut contracts: ContractsConfig = + envy_load("contracts", &format!("{variant}CONTRACTS_"))?; // Note: we are renaming the bridge, the address remains the same // These two config variables should always have the same value. // TODO(EVM-578): double check and potentially forbid both of them being `None`. @@ -35,7 +53,7 @@ impl FromEnv for ContractsConfig { panic!("L2 erc20 bridge address and L2 shared bridge address are different."); } } - contracts.ecosystem_contracts = EcosystemContracts::from_env().ok(); + contracts.ecosystem_contracts = EcosystemContracts::from_env_variant(variant).ok(); Ok(contracts) } } diff --git a/core/lib/env_config/src/eth_sender.rs b/core/lib/env_config/src/eth_sender.rs index 31dab2d20455..7e9c4cc16ec0 100644 --- a/core/lib/env_config/src/eth_sender.rs +++ b/core/lib/env_config/src/eth_sender.rs @@ -23,6 +23,9 @@ impl FromEnv for L1Secrets { .context("ETH_CLIENT_WEB3_URL")? .parse() .context("ETH_CLIENT_WEB3_URL")?, + gateway_url: std::env::var("ETH_CLIENT_GATEWAY_WEB3_URL") + .ok() + .map(|url| url.parse().expect("ETH_CLIENT_GATEWAY_WEB3_URL")), }) } } @@ -96,6 +99,7 @@ mod tests { }, L1Secrets { l1_rpc_url: "http://127.0.0.1:8545".to_string().parse().unwrap(), + gateway_url: Some("http://127.0.0.1:8547".to_string().parse().unwrap()), }, ) } @@ -138,6 +142,7 @@ mod tests { ETH_WATCH_CONFIRMATIONS_FOR_ETH_EVENT="0" ETH_WATCH_ETH_NODE_POLL_INTERVAL="300" ETH_CLIENT_WEB3_URL="http://127.0.0.1:8545" + ETH_CLIENT_GATEWAY_WEB3_URL="http://127.0.0.1:8547" "#; lock.set_env(config); diff --git a/core/lib/env_config/src/lib.rs b/core/lib/env_config/src/lib.rs index 8cfa7b58a31c..d3302e75daa9 100644 --- a/core/lib/env_config/src/lib.rs +++ b/core/lib/env_config/src/lib.rs @@ -36,6 +36,10 @@ pub trait FromEnv: Sized { fn from_env() -> anyhow::Result; } +pub trait FromEnvVariant: Sized { + fn from_env_variant(variant_prefix: String) -> anyhow::Result; +} + /// Convenience function that loads the structure from the environment variable given the prefix. /// Panics if the config cannot be loaded from the environment variables. pub fn envy_load(name: &str, prefix: &str) -> anyhow::Result { diff --git a/core/lib/env_config/src/wallets.rs b/core/lib/env_config/src/wallets.rs index 3518d56f7b45..fc6715876e3f 100644 --- a/core/lib/env_config/src/wallets.rs +++ b/core/lib/env_config/src/wallets.rs @@ -25,6 +25,10 @@ impl FromEnv for Wallets { "ETH_SENDER_SENDER_OPERATOR_BLOBS_PRIVATE_KEY", "Malformed blob operator pk", )?; + let gateway = pk_from_env( + "ETH_SENDER_SENDER_OPERATOR_GATEWAY_PRIVATE_KEY", + "Malformed gateway operator pk", + )?; let eth_sender = if let Some(operator) = operator { let operator = Wallet::from_private_key_bytes(operator, None)?; @@ -33,9 +37,16 @@ impl FromEnv for Wallets { } else { None }; + let gateway = if let Some(gateway) = gateway { + Some(Wallet::from_private_key_bytes(gateway, None)?) + } else { + None + }; + Some(EthSender { operator, blob_operator, + gateway, }) } else { None diff --git a/core/lib/protobuf_config/src/proto/config/secrets.proto b/core/lib/protobuf_config/src/proto/config/secrets.proto index b711d81d5754..3afc0e0843f7 100644 --- a/core/lib/protobuf_config/src/proto/config/secrets.proto +++ b/core/lib/protobuf_config/src/proto/config/secrets.proto @@ -11,6 +11,7 @@ message DatabaseSecrets { message L1Secrets { optional string l1_rpc_url = 1; // required + optional string gateway_url = 2; // optional } message ConsensusSecrets { diff --git a/core/lib/protobuf_config/src/secrets.rs b/core/lib/protobuf_config/src/secrets.rs index 7d10bef88a55..14b189904310 100644 --- a/core/lib/protobuf_config/src/secrets.rs +++ b/core/lib/protobuf_config/src/secrets.rs @@ -77,12 +77,21 @@ impl ProtoRepr for proto::L1Secrets { fn read(&self) -> anyhow::Result { Ok(Self::Type { l1_rpc_url: SensitiveUrl::from_str(required(&self.l1_rpc_url).context("l1_rpc_url")?)?, + gateway_url: self + .gateway_url + .clone() + .map(|url| SensitiveUrl::from_str(&url)) + .transpose()?, }) } fn build(this: &Self::Type) -> Self { Self { l1_rpc_url: Some(this.l1_rpc_url.expose_str().to_string()), + gateway_url: this + .gateway_url + .as_ref() + .map(|url| url.expose_url().to_string()), } } } diff --git a/core/lib/protobuf_config/src/wallets.rs b/core/lib/protobuf_config/src/wallets.rs index 3769dac443d0..e095de83b511 100644 --- a/core/lib/protobuf_config/src/wallets.rs +++ b/core/lib/protobuf_config/src/wallets.rs @@ -37,6 +37,7 @@ impl ProtoRepr for proto::Wallets { Some(EthSender { operator, blob_operator, + gateway: None, }) } else { None diff --git a/core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs b/core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs index 8224b03da071..00b1e1f00e4a 100644 --- a/core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs +++ b/core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs @@ -132,9 +132,13 @@ impl TempConfigStore { let blob_operator = sender .private_key_blobs() .and_then(|operator| Wallet::from_private_key_bytes(operator, None).ok()); + let gateway = sender + .private_key_gateway() + .and_then(|operator| Wallet::from_private_key_bytes(operator, None).ok()); Some(EthSender { operator, blob_operator, + gateway, }) }); let state_keeper = self diff --git a/core/node/eth_watch/Cargo.toml b/core/node/eth_watch/Cargo.toml index ecdc53901f88..d2e47035e266 100644 --- a/core/node/eth_watch/Cargo.toml +++ b/core/node/eth_watch/Cargo.toml @@ -28,3 +28,4 @@ tracing.workspace = true [dev-dependencies] zksync_concurrency.workspace = true +test-log.workspace = true diff --git a/core/node/eth_watch/src/client.rs b/core/node/eth_watch/src/client.rs index 8be556b42889..3cc56396f126 100644 --- a/core/node/eth_watch/src/client.rs +++ b/core/node/eth_watch/src/client.rs @@ -1,7 +1,7 @@ use std::fmt; use anyhow::Context; -use zksync_contracts::{state_transition_manager_contract, verifier_contract}; +use zksync_contracts::{getters_contract, state_transition_manager_contract, verifier_contract}; use zksync_eth_client::{ clients::{DynClient, L1}, CallFunctionArgs, ClientError, ContractCallError, EnrichedClientError, EnrichedClientResult, @@ -10,7 +10,7 @@ use zksync_eth_client::{ use zksync_types::{ ethabi::Contract, web3::{BlockId, BlockNumber, FilterBuilder, Log}, - Address, H256, + Address, SLChainId, H256, U256, }; /// L1 client functionality used by [`EthWatch`](crate::EthWatch) and constituent event processors. @@ -21,10 +21,13 @@ pub trait EthClient: 'static + fmt::Debug + Send + Sync { &self, from: BlockNumber, to: BlockNumber, + topic: H256, retries_left: usize, ) -> EnrichedClientResult>; /// Returns finalized L1 block number. async fn finalized_block_number(&self) -> EnrichedClientResult; + + async fn get_total_priority_txs(&self) -> Result; /// Returns scheduler verification key hash by verifier address. async fn scheduler_vk_hash(&self, verifier_address: Address) -> Result; @@ -33,8 +36,8 @@ pub trait EthClient: 'static + fmt::Debug + Send + Sync { &self, packed_version: H256, ) -> EnrichedClientResult>>; - /// Sets list of topics to return events for. - fn set_topics(&mut self, topics: Vec); + + async fn chain_id(&self) -> EnrichedClientResult; } pub const RETRY_LIMIT: usize = 5; @@ -42,10 +45,9 @@ const TOO_MANY_RESULTS_INFURA: &str = "query returned more than"; const TOO_MANY_RESULTS_ALCHEMY: &str = "response size exceeded"; /// Implementation of [`EthClient`] based on HTTP JSON-RPC (encapsulated via [`EthInterface`]). -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct EthHttpQueryClient { client: Box>, - topics: Vec, diamond_proxy_addr: Address, governance_address: Address, new_upgrade_cut_data_signature: H256, @@ -53,6 +55,7 @@ pub struct EthHttpQueryClient { state_transition_manager_address: Option
, chain_admin_address: Option
, verifier_contract_abi: Contract, + getters_contract_abi: Contract, confirmations_for_eth_event: Option, } @@ -72,7 +75,6 @@ impl EthHttpQueryClient { ); Self { client: client.for_component("watch"), - topics: Vec::new(), diamond_proxy_addr, state_transition_manager_address, chain_admin_address, @@ -83,6 +85,7 @@ impl EthHttpQueryClient { .unwrap() .signature(), verifier_contract_abi: verifier_contract(), + getters_contract_abi: getters_contract(), confirmations_for_eth_event, } } @@ -153,9 +156,10 @@ impl EthClient for EthHttpQueryClient { &self, from: BlockNumber, to: BlockNumber, + topic: H256, retries_left: usize, ) -> EnrichedClientResult> { - let mut result = self.get_filter_logs(from, to, self.topics.clone()).await; + let mut result = self.get_filter_logs(from, to, [topic].to_vec()).await; // This code is compatible with both Infura and Alchemy API providers. // Note: we don't handle rate-limits here - assumption is that we're never going to hit them. @@ -207,17 +211,17 @@ impl EthClient for EthHttpQueryClient { tracing::warn!("Splitting block range in half: {from:?} - {mid:?} - {to:?}"); let mut first_half = self - .get_events(from, BlockNumber::Number(mid), RETRY_LIMIT) + .get_events(from, BlockNumber::Number(mid), topic, RETRY_LIMIT) .await?; let mut second_half = self - .get_events(BlockNumber::Number(mid + 1u64), to, RETRY_LIMIT) + .get_events(BlockNumber::Number(mid + 1u64), to, topic, RETRY_LIMIT) .await?; first_half.append(&mut second_half); result = Ok(first_half); } else if should_retry(err_code, err_message) && retries_left > 0 { tracing::warn!("Retrying. Retries left: {retries_left}"); - result = self.get_events(from, to, retries_left - 1).await; + result = self.get_events(from, to, topic, retries_left - 1).await; } } @@ -245,7 +249,15 @@ impl EthClient for EthHttpQueryClient { } } - fn set_topics(&mut self, topics: Vec) { - self.topics = topics; + async fn get_total_priority_txs(&self) -> Result { + CallFunctionArgs::new("getTotalPriorityTxs", ()) + .for_contract(self.diamond_proxy_addr, &self.getters_contract_abi) + .call(&self.client) + .await + .map(|x: U256| x.try_into().unwrap()) + } + + async fn chain_id(&self) -> EnrichedClientResult { + Ok(self.client.fetch_chain_id().await?) } } diff --git a/core/node/eth_watch/src/event_processors/decentralized_upgrades.rs b/core/node/eth_watch/src/event_processors/decentralized_upgrades.rs index dff10662e984..aa43e7239f88 100644 --- a/core/node/eth_watch/src/event_processors/decentralized_upgrades.rs +++ b/core/node/eth_watch/src/event_processors/decentralized_upgrades.rs @@ -1,5 +1,5 @@ use anyhow::Context as _; -use zksync_dal::{Connection, Core, CoreDal, DalError}; +use zksync_dal::{eth_watcher_dal::EventType, Connection, Core, CoreDal, DalError}; use zksync_types::{ ethabi::Contract, protocol_version::ProtocolSemanticVersion, web3::Log, ProtocolUpgrade, H256, U256, @@ -7,7 +7,7 @@ use zksync_types::{ use crate::{ client::EthClient, - event_processors::{EventProcessor, EventProcessorError}, + event_processors::{EventProcessor, EventProcessorError, EventsSource}, metrics::{PollStage, METRICS}, }; @@ -40,18 +40,18 @@ impl EventProcessor for DecentralizedUpgradesEventProcessor { async fn process_events( &mut self, storage: &mut Connection<'_, Core>, - client: &dyn EthClient, + sl_client: &dyn EthClient, events: Vec, - ) -> Result<(), EventProcessorError> { + ) -> Result { let mut upgrades = Vec::new(); - for event in events { + for event in &events { let version = event.topics.get(1).copied().context("missing topic 1")?; let timestamp: u64 = U256::from_big_endian(&event.data.0) .try_into() .ok() .context("upgrade timestamp is too big")?; - let diamond_cut = client + let diamond_cut = sl_client .diamond_cut_by_version(version) .await? .context("missing upgrade data on STM")?; @@ -62,7 +62,7 @@ impl EventProcessor for DecentralizedUpgradesEventProcessor { }; // Scheduler VK is not present in proposal event. It is hard coded in verifier contract. let scheduler_vk_hash = if let Some(address) = upgrade.verifier_address { - Some(client.scheduler_vk_hash(address).await?) + Some(sl_client.scheduler_vk_hash(address).await?) } else { None }; @@ -75,7 +75,7 @@ impl EventProcessor for DecentralizedUpgradesEventProcessor { .collect(); let Some((last_upgrade, _)) = new_upgrades.last() else { - return Ok(()); + return Ok(events.len()); }; let versions: Vec<_> = new_upgrades .iter() @@ -125,10 +125,18 @@ impl EventProcessor for DecentralizedUpgradesEventProcessor { stage_latency.observe(); self.last_seen_protocol_version = last_version; - Ok(()) + Ok(events.len()) } fn relevant_topic(&self) -> H256 { self.update_upgrade_timestamp_signature } + + fn event_source(&self) -> EventsSource { + EventsSource::SL + } + + fn event_type(&self) -> EventType { + EventType::ProtocolUpgrades + } } diff --git a/core/node/eth_watch/src/event_processors/governance_upgrades.rs b/core/node/eth_watch/src/event_processors/governance_upgrades.rs index 72f5c411892f..e34c9de04c62 100644 --- a/core/node/eth_watch/src/event_processors/governance_upgrades.rs +++ b/core/node/eth_watch/src/event_processors/governance_upgrades.rs @@ -1,5 +1,5 @@ use anyhow::Context as _; -use zksync_dal::{Connection, Core, CoreDal, DalError}; +use zksync_dal::{eth_watcher_dal::EventType, Connection, Core, CoreDal, DalError}; use zksync_types::{ ethabi::Contract, protocol_upgrade::GovernanceOperation, protocol_version::ProtocolSemanticVersion, web3::Log, Address, ProtocolUpgrade, H256, @@ -7,7 +7,7 @@ use zksync_types::{ use crate::{ client::EthClient, - event_processors::{EventProcessor, EventProcessorError}, + event_processors::{EventProcessor, EventProcessorError, EventsSource}, metrics::{PollStage, METRICS}, }; @@ -44,10 +44,11 @@ impl EventProcessor for GovernanceUpgradesEventProcessor { async fn process_events( &mut self, storage: &mut Connection<'_, Core>, - client: &dyn EthClient, + sl_client: &dyn EthClient, events: Vec, - ) -> Result<(), EventProcessorError> { + ) -> Result { let mut upgrades = Vec::new(); + let events_count = events.len(); for event in events { assert_eq!(event.topics[0], self.upgrade_proposal_signature); // guaranteed by the watcher @@ -69,7 +70,7 @@ impl EventProcessor for GovernanceUpgradesEventProcessor { }; // Scheduler VK is not present in proposal event. It is hard coded in verifier contract. let scheduler_vk_hash = if let Some(address) = upgrade.verifier_address { - Some(client.scheduler_vk_hash(address).await?) + Some(sl_client.scheduler_vk_hash(address).await?) } else { None }; @@ -83,7 +84,7 @@ impl EventProcessor for GovernanceUpgradesEventProcessor { .collect(); let Some((last_upgrade, _)) = new_upgrades.last() else { - return Ok(()); + return Ok(events_count); }; let versions: Vec<_> = new_upgrades .iter() @@ -133,10 +134,18 @@ impl EventProcessor for GovernanceUpgradesEventProcessor { stage_latency.observe(); self.last_seen_protocol_version = last_version; - Ok(()) + Ok(events_count) } fn relevant_topic(&self) -> H256 { self.upgrade_proposal_signature } + + fn event_source(&self) -> EventsSource { + EventsSource::SL + } + + fn event_type(&self) -> EventType { + EventType::GovernanceUpgrades + } } diff --git a/core/node/eth_watch/src/event_processors/mod.rs b/core/node/eth_watch/src/event_processors/mod.rs index 7de918ab6019..5ded825cce4a 100644 --- a/core/node/eth_watch/src/event_processors/mod.rs +++ b/core/node/eth_watch/src/event_processors/mod.rs @@ -1,6 +1,6 @@ use std::fmt; -use zksync_dal::{Connection, Core}; +use zksync_dal::{eth_watcher_dal::EventType, Connection, Core}; use zksync_eth_client::{ContractCallError, EnrichedClientError}; use zksync_types::{web3::Log, H256}; @@ -32,6 +32,12 @@ pub(super) enum EventProcessorError { Internal(#[from] anyhow::Error), } +#[derive(Debug)] +pub(super) enum EventsSource { + L1, + SL, +} + impl EventProcessorError { pub fn log_parse(source: impl Into, log_kind: &'static str) -> Self { Self::LogParse { @@ -49,10 +55,14 @@ pub(super) trait EventProcessor: 'static + fmt::Debug + Send + Sync { async fn process_events( &mut self, storage: &mut Connection<'_, Core>, - client: &dyn EthClient, + sl_client: &dyn EthClient, events: Vec, - ) -> Result<(), EventProcessorError>; + ) -> Result; /// Relevant topic which defines what events to be processed fn relevant_topic(&self) -> H256; + + fn event_source(&self) -> EventsSource; + + fn event_type(&self) -> EventType; } diff --git a/core/node/eth_watch/src/event_processors/priority_ops.rs b/core/node/eth_watch/src/event_processors/priority_ops.rs index 9fda97b75ac6..16a084496cfb 100644 --- a/core/node/eth_watch/src/event_processors/priority_ops.rs +++ b/core/node/eth_watch/src/event_processors/priority_ops.rs @@ -2,14 +2,14 @@ use std::convert::TryFrom; use anyhow::Context; use zksync_contracts::hyperchain_contract; -use zksync_dal::{Connection, Core, CoreDal, DalError}; +use zksync_dal::{eth_watcher_dal::EventType, Connection, Core, CoreDal, DalError}; use zksync_mini_merkle_tree::SyncMerkleTree; use zksync_shared_metrics::{TxStage, APP_METRICS}; use zksync_types::{l1::L1Tx, web3::Log, PriorityOpId, H256}; use crate::{ client::EthClient, - event_processors::{EventProcessor, EventProcessorError}, + event_processors::{EventProcessor, EventProcessorError, EventsSource}, metrics::{PollStage, METRICS}, }; @@ -42,10 +42,11 @@ impl EventProcessor for PriorityOpsEventProcessor { async fn process_events( &mut self, storage: &mut Connection<'_, Core>, - _client: &dyn EthClient, + client: &dyn EthClient, events: Vec, - ) -> Result<(), EventProcessorError> { + ) -> Result { let mut priority_ops = Vec::new(); + let events_count = events.len(); for event in events { assert_eq!(event.topics[0], self.new_priority_request_signature); // guaranteed by the watcher let tx = L1Tx::try_from(event) @@ -54,7 +55,7 @@ impl EventProcessor for PriorityOpsEventProcessor { } if priority_ops.is_empty() { - return Ok(()); + return Ok(events_count); } let first = &priority_ops[0]; @@ -76,20 +77,25 @@ impl EventProcessor for PriorityOpsEventProcessor { .into_iter() .skip_while(|tx| tx.serial_id() < self.next_expected_priority_id) .collect(); - let (Some(first_new), Some(last_new)) = (new_ops.first(), new_ops.last()) else { - return Ok(()); + let Some(first_new) = new_ops.first() else { + return Ok(events_count); }; assert_eq!( first_new.serial_id(), self.next_expected_priority_id, "priority transaction serial id mismatch" ); - let next_expected_priority_id = last_new.serial_id().next(); + let mut next_expected_priority_id = self.next_expected_priority_id; let stage_latency = METRICS.poll_eth_node[&PollStage::PersistL1Txs].start(); APP_METRICS.processed_txs[&TxStage::added_to_mempool()].inc(); APP_METRICS.processed_l1_txs[&TxStage::added_to_mempool()].inc(); + let processed_priority_transactions = client.get_total_priority_txs().await?; + let mut processed_events_count = 0; for new_op in new_ops { + if processed_priority_transactions <= new_op.serial_id().0 { + break; + } let eth_block = new_op.eth_block(); let inserted = storage .transactions_dal() @@ -100,13 +106,23 @@ impl EventProcessor for PriorityOpsEventProcessor { if inserted { self.priority_merkle_tree.push_hash(new_op.hash()); } + processed_events_count += 1; + next_expected_priority_id = new_op.serial_id().next(); } stage_latency.observe(); self.next_expected_priority_id = next_expected_priority_id; - Ok(()) + Ok(processed_events_count) } fn relevant_topic(&self) -> H256 { self.new_priority_request_signature } + + fn event_source(&self) -> EventsSource { + EventsSource::L1 + } + + fn event_type(&self) -> EventType { + EventType::PriorityTransactions + } } diff --git a/core/node/eth_watch/src/lib.rs b/core/node/eth_watch/src/lib.rs index cb05bd2ade22..5b2f5d418734 100644 --- a/core/node/eth_watch/src/lib.rs +++ b/core/node/eth_watch/src/lib.rs @@ -6,7 +6,7 @@ use std::time::Duration; use anyhow::Context as _; use tokio::sync::watch; -use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; +use zksync_dal::{Connection, ConnectionPool, Core, CoreDal, DalError}; use zksync_mini_merkle_tree::SyncMerkleTree; use zksync_system_constants::PRIORITY_EXPIRATION; use zksync_types::{ @@ -21,9 +21,9 @@ use self::{ EventProcessor, EventProcessorError, GovernanceUpgradesEventProcessor, PriorityOpsEventProcessor, }, - metrics::{PollStage, METRICS}, + metrics::METRICS, }; -use crate::event_processors::DecentralizedUpgradesEventProcessor; +use crate::event_processors::{DecentralizedUpgradesEventProcessor, EventsSource}; mod client; mod event_processors; @@ -35,38 +35,39 @@ mod tests; struct EthWatchState { last_seen_protocol_version: ProtocolSemanticVersion, next_expected_priority_id: PriorityOpId, - last_processed_ethereum_block: u64, } /// Ethereum watcher component. #[derive(Debug)] pub struct EthWatch { - client: Box, + l1_client: Box, + sl_client: Box, poll_interval: Duration, event_processors: Vec>, - last_processed_ethereum_block: u64, pool: ConnectionPool, } impl EthWatch { + #[allow(clippy::too_many_arguments)] pub async fn new( - diamond_proxy_addr: Address, + sl_diamond_proxy_addr: Address, governance_contract: &Contract, chain_admin_contract: &Contract, - mut client: Box, + l1_client: Box, + sl_client: Box, pool: ConnectionPool, poll_interval: Duration, priority_merkle_tree: SyncMerkleTree, ) -> anyhow::Result { let mut storage = pool.connection_tagged("eth_watch").await?; - let state = Self::initialize_state(&*client, &mut storage).await?; + let state = Self::initialize_state(&mut storage).await?; tracing::info!("initialized state: {state:?}"); drop(storage); let priority_ops_processor = PriorityOpsEventProcessor::new(state.next_expected_priority_id, priority_merkle_tree)?; let governance_upgrades_processor = GovernanceUpgradesEventProcessor::new( - diamond_proxy_addr, + sl_diamond_proxy_addr, state.last_seen_protocol_version, governance_contract, ); @@ -80,26 +81,17 @@ impl EthWatch { Box::new(decentralized_upgrades_processor), ]; - let topics = event_processors - .iter() - .map(|processor| processor.relevant_topic()) - .collect(); - client.set_topics(topics); - Ok(Self { - client, + l1_client, + sl_client, poll_interval, event_processors, - last_processed_ethereum_block: state.last_processed_ethereum_block, pool, }) } #[tracing::instrument(name = "EthWatch::initialize_state", skip_all)] - async fn initialize_state( - client: &dyn EthClient, - storage: &mut Connection<'_, Core>, - ) -> anyhow::Result { + async fn initialize_state(storage: &mut Connection<'_, Core>) -> anyhow::Result { let next_expected_priority_id: PriorityOpId = storage .transactions_dal() .last_priority_id() @@ -112,26 +104,9 @@ impl EthWatch { .await? .context("expected at least one (genesis) version to be present in DB")?; - let last_processed_ethereum_block = match storage - .transactions_dal() - .get_last_processed_l1_block() - .await? - { - // There are some priority ops processed - start from the last processed eth block - // but subtract 1 in case the server stopped mid-block. - Some(block) => block.0.saturating_sub(1).into(), - // There are no priority ops processed - to be safe, scan the last 50k blocks. - None => client - .finalized_block_number() - .await - .context("cannot get current Ethereum block")? - .saturating_sub(PRIORITY_EXPIRATION), - }; - Ok(EthWatchState { next_expected_priority_id, last_seen_protocol_version, - last_processed_ethereum_block, }) } @@ -157,10 +132,6 @@ impl EthWatch { // This is an error because otherwise we could potentially miss a priority operation // thus entering priority mode, which is not desired. tracing::error!("Failed to process new blocks: {err}"); - self.last_processed_ethereum_block = - Self::initialize_state(&*self.client, &mut storage) - .await? - .last_processed_ethereum_block; } } } @@ -174,34 +145,59 @@ impl EthWatch { &mut self, storage: &mut Connection<'_, Core>, ) -> Result<(), EventProcessorError> { - let stage_latency = METRICS.poll_eth_node[&PollStage::Request].start(); - let to_block = self.client.finalized_block_number().await?; - if to_block <= self.last_processed_ethereum_block { - return Ok(()); - } - - let events = self - .client - .get_events( - Web3BlockNumber::Number(self.last_processed_ethereum_block.into()), - Web3BlockNumber::Number(to_block.into()), - RETRY_LIMIT, - ) - .await?; - stage_latency.observe(); - for processor in &mut self.event_processors { - let relevant_topic = processor.relevant_topic(); - let processor_events = events - .iter() - .filter(|event| event.topics.first() == Some(&relevant_topic)) - .cloned() - .collect(); - processor - .process_events(storage, &*self.client, processor_events) + let client = match processor.event_source() { + EventsSource::L1 => self.l1_client.as_ref(), + EventsSource::SL => self.sl_client.as_ref(), + }; + let chain_id = client.chain_id().await?; + let finalized_block = client.finalized_block_number().await?; + + let from_block = storage + .processed_events_dal() + .get_or_set_next_block_to_process( + processor.event_type(), + chain_id, + finalized_block.saturating_sub(PRIORITY_EXPIRATION), + ) + .await + .map_err(DalError::generalize)?; + + let processor_events = client + .get_events( + Web3BlockNumber::Number(from_block.into()), + Web3BlockNumber::Number(finalized_block.into()), + processor.relevant_topic(), + RETRY_LIMIT, + ) .await?; + let processed_events_count = processor + .process_events(storage, &*self.sl_client, processor_events.clone()) + .await?; + + let next_block_to_process = if processed_events_count == processor_events.len() { + finalized_block + } else if processed_events_count == 0 { + //nothing was processed + from_block + } else { + processor_events[processed_events_count - 1] + .block_number + .expect("Event block number is missing") + .try_into() + .unwrap() + }; + + storage + .processed_events_dal() + .update_next_block_to_process( + processor.event_type(), + chain_id, + next_block_to_process, + ) + .await + .map_err(DalError::generalize)?; } - self.last_processed_ethereum_block = to_block; Ok(()) } } diff --git a/core/node/eth_watch/src/metrics.rs b/core/node/eth_watch/src/metrics.rs index a3684cc6e724..a942d4a6e615 100644 --- a/core/node/eth_watch/src/metrics.rs +++ b/core/node/eth_watch/src/metrics.rs @@ -7,7 +7,6 @@ use vise::{Buckets, Counter, EncodeLabelSet, EncodeLabelValue, Family, Histogram #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] #[metrics(label = "stage", rename_all = "snake_case")] pub(super) enum PollStage { - Request, PersistL1Txs, PersistUpgrades, } diff --git a/core/node/eth_watch/src/tests.rs b/core/node/eth_watch/src/tests.rs index 2cbb1bc4ed40..cdf47fab2ad7 100644 --- a/core/node/eth_watch/src/tests.rs +++ b/core/node/eth_watch/src/tests.rs @@ -7,13 +7,13 @@ use zksync_eth_client::{ContractCallError, EnrichedClientResult}; use zksync_mini_merkle_tree::SyncMerkleTree; use zksync_types::{ abi, ethabi, - ethabi::{Hash, Token}, + ethabi::Token, l1::{L1Tx, OpProcessingType, PriorityQueueType}, protocol_upgrade::{ProtocolUpgradeTx, ProtocolUpgradeTxCommonData}, protocol_version::ProtocolSemanticVersion, web3::{BlockNumber, Log}, Address, Execute, L1TxCommonData, PriorityOpId, ProtocolUpgrade, ProtocolVersion, - ProtocolVersionId, Transaction, H256, U256, + ProtocolVersionId, SLChainId, Transaction, H256, U256, }; use crate::{client::EthClient, EthWatch}; @@ -24,15 +24,19 @@ struct FakeEthClientData { diamond_upgrades: HashMap>, governance_upgrades: HashMap>, last_finalized_block_number: u64, + chain_id: SLChainId, + processed_priority_transactions_count: u64, } impl FakeEthClientData { - fn new() -> Self { + fn new(chain_id: SLChainId) -> Self { Self { transactions: Default::default(), diamond_upgrades: Default::default(), governance_upgrades: Default::default(), last_finalized_block_number: 0, + chain_id, + processed_priority_transactions_count: 0, } } @@ -43,6 +47,7 @@ impl FakeEthClientData { .entry(eth_block.0 as u64) .or_default() .push(tx_into_log(transaction.clone())); + self.processed_priority_transactions_count += 1; } } @@ -58,6 +63,10 @@ impl FakeEthClientData { fn set_last_finalized_block_number(&mut self, number: u64) { self.last_finalized_block_number = number; } + + fn set_processed_priority_transactions_count(&mut self, number: u64) { + self.processed_priority_transactions_count = number; + } } #[derive(Debug, Clone)] @@ -66,9 +75,9 @@ struct MockEthClient { } impl MockEthClient { - fn new() -> Self { + fn new(chain_id: SLChainId) -> Self { Self { - inner: Arc::new(RwLock::new(FakeEthClientData::new())), + inner: Arc::new(RwLock::new(FakeEthClientData::new(chain_id))), } } @@ -87,6 +96,13 @@ impl MockEthClient { .set_last_finalized_block_number(number); } + async fn set_processed_priority_transactions_count(&mut self, number: u64) { + self.inner + .write() + .await + .set_processed_priority_transactions_count(number) + } + async fn block_to_number(&self, block: BlockNumber) -> u64 { match block { BlockNumber::Earliest => 0, @@ -105,6 +121,7 @@ impl EthClient for MockEthClient { &self, from: BlockNumber, to: BlockNumber, + topic: H256, _retries_left: usize, ) -> EnrichedClientResult> { let from = self.block_to_number(from).await; @@ -121,11 +138,12 @@ impl EthClient for MockEthClient { logs.extend_from_slice(ops); } } - Ok(logs) + Ok(logs + .into_iter() + .filter(|log| log.topics.contains(&topic)) + .collect()) } - fn set_topics(&mut self, _topics: Vec) {} - async fn scheduler_vk_hash( &self, _verifier_address: Address, @@ -143,6 +161,18 @@ impl EthClient for MockEthClient { ) -> EnrichedClientResult>> { unimplemented!() } + + async fn get_total_priority_txs(&self) -> Result { + Ok(self + .inner + .read() + .await + .processed_priority_transactions_count) + } + + async fn chain_id(&self) -> EnrichedClientResult { + Ok(self.inner.read().await.chain_id) + } } fn build_l1_tx(serial_id: u64, eth_block: u64) -> L1Tx { @@ -204,13 +234,22 @@ fn build_upgrade_tx(id: ProtocolVersionId, eth_block: u64) -> ProtocolUpgradeTx .unwrap() } -async fn create_test_watcher(connection_pool: ConnectionPool) -> (EthWatch, MockEthClient) { - let client = MockEthClient::new(); +async fn create_test_watcher( + connection_pool: ConnectionPool, + is_gateway: bool, +) -> (EthWatch, MockEthClient, MockEthClient) { + let l1_client = MockEthClient::new(SLChainId(42)); + let sl_client = if is_gateway { + MockEthClient::new(SLChainId(123)) + } else { + l1_client.clone() + }; let watcher = EthWatch::new( Address::default(), &governance_contract(), &chain_admin_contract(), - Box::new(client.clone()), + Box::new(l1_client.clone()), + Box::new(sl_client.clone()), connection_pool, std::time::Duration::from_nanos(1), SyncMerkleTree::from_hashes(std::iter::empty(), None), @@ -218,14 +257,27 @@ async fn create_test_watcher(connection_pool: ConnectionPool) -> (EthWatch .await .unwrap(); - (watcher, client) + (watcher, l1_client, sl_client) +} + +async fn create_l1_test_watcher( + connection_pool: ConnectionPool, +) -> (EthWatch, MockEthClient) { + let (watcher, l1_client, _) = create_test_watcher(connection_pool, false).await; + (watcher, l1_client) +} + +async fn create_gateway_test_watcher( + connection_pool: ConnectionPool, +) -> (EthWatch, MockEthClient, MockEthClient) { + create_test_watcher(connection_pool, true).await } -#[tokio::test] +#[test_log::test(tokio::test)] async fn test_normal_operation_l1_txs() { let connection_pool = ConnectionPool::::test_pool().await; setup_db(&connection_pool).await; - let (mut watcher, mut client) = create_test_watcher(connection_pool.clone()).await; + let (mut watcher, mut client) = create_l1_test_watcher(connection_pool.clone()).await; let mut storage = connection_pool.connection().await.unwrap(); client @@ -260,11 +312,11 @@ async fn test_normal_operation_l1_txs() { assert_eq!(db_tx.common_data.serial_id.0, 2); } -#[tokio::test] +#[test_log::test(tokio::test)] async fn test_gap_in_governance_upgrades() { let connection_pool = ConnectionPool::::test_pool().await; setup_db(&connection_pool).await; - let (mut watcher, mut client) = create_test_watcher(connection_pool.clone()).await; + let (mut watcher, mut client) = create_l1_test_watcher(connection_pool.clone()).await; let mut storage = connection_pool.connection().await.unwrap(); client @@ -293,18 +345,19 @@ async fn test_gap_in_governance_upgrades() { assert_eq!(db_versions[1].minor, next_version); } -#[tokio::test] +#[test_log::test(tokio::test)] async fn test_normal_operation_governance_upgrades() { zksync_concurrency::testonly::abort_on_panic(); let connection_pool = ConnectionPool::::test_pool().await; setup_db(&connection_pool).await; - let mut client = MockEthClient::new(); + let mut client = MockEthClient::new(SLChainId(42)); let mut watcher = EthWatch::new( Address::default(), &governance_contract(), &chain_admin_contract(), Box::new(client.clone()), + Box::new(client.clone()), connection_pool.clone(), std::time::Duration::from_nanos(1), SyncMerkleTree::from_hashes(std::iter::empty(), None), @@ -378,12 +431,12 @@ async fn test_normal_operation_governance_upgrades() { assert_eq!(tx.common_data.upgrade_id, ProtocolVersionId::next()); } -#[tokio::test] +#[test_log::test(tokio::test)] #[should_panic] async fn test_gap_in_single_batch() { let connection_pool = ConnectionPool::::test_pool().await; setup_db(&connection_pool).await; - let (mut watcher, mut client) = create_test_watcher(connection_pool.clone()).await; + let (mut watcher, mut client) = create_l1_test_watcher(connection_pool.clone()).await; let mut storage = connection_pool.connection().await.unwrap(); client @@ -399,12 +452,12 @@ async fn test_gap_in_single_batch() { watcher.loop_iteration(&mut storage).await.unwrap(); } -#[tokio::test] +#[test_log::test(tokio::test)] #[should_panic] async fn test_gap_between_batches() { let connection_pool = ConnectionPool::::test_pool().await; setup_db(&connection_pool).await; - let (mut watcher, mut client) = create_test_watcher(connection_pool.clone()).await; + let (mut watcher, mut client) = create_l1_test_watcher(connection_pool.clone()).await; let mut storage = connection_pool.connection().await.unwrap(); client @@ -427,12 +480,12 @@ async fn test_gap_between_batches() { watcher.loop_iteration(&mut storage).await.unwrap(); } -#[tokio::test] +#[test_log::test(tokio::test)] async fn test_overlapping_batches() { zksync_concurrency::testonly::abort_on_panic(); let connection_pool = ConnectionPool::::test_pool().await; setup_db(&connection_pool).await; - let (mut watcher, mut client) = create_test_watcher(connection_pool.clone()).await; + let (mut watcher, mut client) = create_l1_test_watcher(connection_pool.clone()).await; let mut storage = connection_pool.connection().await.unwrap(); client @@ -470,6 +523,52 @@ async fn test_overlapping_batches() { assert_eq!(tx.common_data.serial_id.0, 4); } +#[test_log::test(tokio::test)] +async fn test_transactions_get_gradually_processed_by_gateway() { + zksync_concurrency::testonly::abort_on_panic(); + let connection_pool = ConnectionPool::::test_pool().await; + setup_db(&connection_pool).await; + let (mut watcher, mut l1_client, mut gateway_client) = + create_gateway_test_watcher(connection_pool.clone()).await; + + let mut storage = connection_pool.connection().await.unwrap(); + l1_client + .add_transactions(&[ + build_l1_tx(0, 10), + build_l1_tx(1, 14), + build_l1_tx(2, 14), + build_l1_tx(3, 20), + build_l1_tx(4, 22), + ]) + .await; + l1_client.set_last_finalized_block_number(15).await; + gateway_client + .set_processed_priority_transactions_count(2) + .await; + watcher.loop_iteration(&mut storage).await.unwrap(); + + let db_txs = get_all_db_txs(&mut storage).await; + assert_eq!(db_txs.len(), 2); + + l1_client.set_last_finalized_block_number(25).await; + gateway_client + .set_processed_priority_transactions_count(4) + .await; + watcher.loop_iteration(&mut storage).await.unwrap(); + + let db_txs = get_all_db_txs(&mut storage).await; + assert_eq!(db_txs.len(), 4); + let mut db_txs: Vec = db_txs + .into_iter() + .map(|tx| tx.try_into().unwrap()) + .collect(); + db_txs.sort_by_key(|tx| tx.common_data.serial_id); + let tx = db_txs[2].clone(); + assert_eq!(tx.common_data.serial_id.0, 2); + let tx = db_txs[3].clone(); + assert_eq!(tx.common_data.serial_id.0, 3); +} + async fn get_all_db_txs(storage: &mut Connection<'_, Core>) -> Vec { storage.transactions_dal().reset_mempool().await.unwrap(); storage diff --git a/core/node/node_framework/src/implementations/layers/eth_sender/aggregator.rs b/core/node/node_framework/src/implementations/layers/eth_sender/aggregator.rs index cadbcb3edc1e..b252101e3330 100644 --- a/core/node/node_framework/src/implementations/layers/eth_sender/aggregator.rs +++ b/core/node/node_framework/src/implementations/layers/eth_sender/aggregator.rs @@ -8,7 +8,10 @@ use zksync_types::{commitment::L1BatchCommitmentMode, settlement::SettlementMode use crate::{ implementations::resources::{ circuit_breakers::CircuitBreakersResource, - eth_interface::{BoundEthInterfaceForBlobsResource, BoundEthInterfaceResource}, + eth_interface::{ + BoundEthInterfaceForBlobsResource, BoundEthInterfaceForL2Resource, + BoundEthInterfaceResource, + }, object_store::ObjectStoreResource, pools::{MasterPool, PoolResource, ReplicaPool}, priority_merkle_tree::PriorityTreeResource, @@ -41,6 +44,7 @@ use crate::{ pub struct EthTxAggregatorLayer { eth_sender_config: EthConfig, contracts_config: ContractsConfig, + gateway_contracts_config: Option, zksync_network_id: L2ChainId, l1_batch_commit_data_generator_mode: L1BatchCommitmentMode, settlement_mode: SettlementMode, @@ -53,6 +57,7 @@ pub struct Input { pub replica_pool: PoolResource, pub eth_client: Option, pub eth_client_blobs: Option, + pub eth_client_gateway: Option, pub object_store: ObjectStoreResource, #[context(default)] pub circuit_breakers: CircuitBreakersResource, @@ -70,6 +75,7 @@ impl EthTxAggregatorLayer { pub fn new( eth_sender_config: EthConfig, contracts_config: ContractsConfig, + gateway_contracts_config: Option, zksync_network_id: L2ChainId, l1_batch_commit_data_generator_mode: L1BatchCommitmentMode, settlement_mode: SettlementMode, @@ -77,6 +83,7 @@ impl EthTxAggregatorLayer { Self { eth_sender_config, contracts_config, + gateway_contracts_config, zksync_network_id, l1_batch_commit_data_generator_mode, settlement_mode, @@ -94,7 +101,24 @@ impl WiringLayer for EthTxAggregatorLayer { } async fn wire(self, input: Self::Input) -> Result { + tracing::info!( + "Wiring tx_aggregator in {:?} mode which is {}", + self.settlement_mode, + self.settlement_mode.is_gateway() + ); + tracing::info!("Contracts: {:?}", self.contracts_config); + tracing::info!("Gateway contracts: {:?}", self.gateway_contracts_config); // Get resources. + let contracts_config = if self.settlement_mode.is_gateway() { + self.gateway_contracts_config.unwrap() + } else { + self.contracts_config + }; + let eth_client = if self.settlement_mode.is_gateway() { + input.eth_client_gateway.unwrap().0 + } else { + input.eth_client.unwrap().0 + }; let master_pool = input.master_pool.get().await.unwrap(); let replica_pool = input.replica_pool.get().await.unwrap(); @@ -120,10 +144,10 @@ impl WiringLayer for EthTxAggregatorLayer { master_pool.clone(), config.clone(), aggregator, - input.eth_client.unwrap().0, - self.contracts_config.validator_timelock_addr, - self.contracts_config.l1_multicall3_addr, - self.contracts_config.diamond_proxy_addr, + eth_client, + contracts_config.validator_timelock_addr, + contracts_config.l1_multicall3_addr, + contracts_config.diamond_proxy_addr, self.zksync_network_id, eth_client_blobs_addr, self.settlement_mode, diff --git a/core/node/node_framework/src/implementations/layers/eth_sender/manager.rs b/core/node/node_framework/src/implementations/layers/eth_sender/manager.rs index 5462fa575f94..c7b1627e9f86 100644 --- a/core/node/node_framework/src/implementations/layers/eth_sender/manager.rs +++ b/core/node/node_framework/src/implementations/layers/eth_sender/manager.rs @@ -6,7 +6,10 @@ use zksync_eth_sender::EthTxManager; use crate::{ implementations::resources::{ circuit_breakers::CircuitBreakersResource, - eth_interface::{BoundEthInterfaceForBlobsResource, BoundEthInterfaceResource}, + eth_interface::{ + BoundEthInterfaceForBlobsResource, BoundEthInterfaceForL2Resource, + BoundEthInterfaceResource, + }, gas_adjuster::GasAdjusterResource, pools::{MasterPool, PoolResource, ReplicaPool}, }, @@ -45,6 +48,7 @@ pub struct Input { pub replica_pool: PoolResource, pub eth_client: BoundEthInterfaceResource, pub eth_client_blobs: Option, + pub eth_client_gateway: Option, pub gas_adjuster: GasAdjusterResource, #[context(default)] pub circuit_breakers: CircuitBreakersResource, @@ -77,10 +81,9 @@ impl WiringLayer for EthTxManagerLayer { let master_pool = input.master_pool.get().await.unwrap(); let replica_pool = input.replica_pool.get().await.unwrap(); - let settlement_mode = self.eth_sender_config.gas_adjuster.unwrap().settlement_mode; let eth_client = input.eth_client.0.clone(); let eth_client_blobs = input.eth_client_blobs.map(|c| c.0); - let l2_client = input.eth_client.0; + let l2_client = input.eth_client_gateway.map(|c| c.0); let config = self.eth_sender_config.sender.context("sender")?; @@ -90,21 +93,9 @@ impl WiringLayer for EthTxManagerLayer { master_pool, config, gas_adjuster, - if !settlement_mode.is_gateway() { - Some(eth_client) - } else { - None - }, - if !settlement_mode.is_gateway() { - eth_client_blobs - } else { - None - }, - if settlement_mode.is_gateway() { - Some(l2_client) - } else { - None - }, + Some(eth_client), + eth_client_blobs, + l2_client, ); // Insert circuit breaker. diff --git a/core/node/node_framework/src/implementations/layers/eth_watch.rs b/core/node/node_framework/src/implementations/layers/eth_watch.rs index 98a5b3966b96..29516988b4af 100644 --- a/core/node/node_framework/src/implementations/layers/eth_watch.rs +++ b/core/node/node_framework/src/implementations/layers/eth_watch.rs @@ -1,10 +1,11 @@ use zksync_config::{ContractsConfig, EthWatchConfig}; use zksync_contracts::{chain_admin_contract, governance_contract}; use zksync_eth_watch::{EthHttpQueryClient, EthWatch}; +use zksync_types::settlement::SettlementMode; use crate::{ implementations::resources::{ - eth_interface::EthInterfaceResource, + eth_interface::{EthInterfaceResource, GatewayEthInterfaceResource}, pools::{MasterPool, PoolResource}, priority_merkle_tree::PriorityTreeResource, }, @@ -22,6 +23,8 @@ use crate::{ pub struct EthWatchLayer { eth_watch_config: EthWatchConfig, contracts_config: ContractsConfig, + gateway_contracts_config: Option, + settlement_mode: SettlementMode, } #[derive(Debug, FromContext)] @@ -30,6 +33,7 @@ pub struct Input { pub master_pool: PoolResource, pub eth_client: EthInterfaceResource, pub priority_tree: PriorityTreeResource, + pub gateway_client: Option, } #[derive(Debug, IntoContext)] @@ -40,10 +44,17 @@ pub struct Output { } impl EthWatchLayer { - pub fn new(eth_watch_config: EthWatchConfig, contracts_config: ContractsConfig) -> Self { + pub fn new( + eth_watch_config: EthWatchConfig, + contracts_config: ContractsConfig, + gateway_contracts_config: Option, + settlement_mode: SettlementMode, + ) -> Self { Self { eth_watch_config, contracts_config, + gateway_contracts_config, + settlement_mode, } } } @@ -61,8 +72,24 @@ impl WiringLayer for EthWatchLayer { let main_pool = input.master_pool.get().await?; let client = input.eth_client.0; let priority_tree = input.priority_tree.0; + let sl_diamond_proxy_addr = if self.settlement_mode.is_gateway() { + self.gateway_contracts_config + .clone() + .unwrap() + .diamond_proxy_addr + } else { + self.contracts_config.diamond_proxy_addr + }; + tracing::info!( + "Diamond proxy address ethereum: {}", + self.contracts_config.diamond_proxy_addr + ); + tracing::info!( + "Diamond proxy address settlement_layer: {}", + sl_diamond_proxy_addr + ); - let eth_client = EthHttpQueryClient::new( + let l1_client = EthHttpQueryClient::new( client, self.contracts_config.diamond_proxy_addr, self.contracts_config @@ -73,11 +100,29 @@ impl WiringLayer for EthWatchLayer { self.eth_watch_config.confirmations_for_eth_event, ); + let sl_client = if self.settlement_mode.is_gateway() { + let gateway_client = input.gateway_client.unwrap().0; + let contracts_config = self.gateway_contracts_config.unwrap(); + EthHttpQueryClient::new( + gateway_client, + contracts_config.diamond_proxy_addr, + contracts_config + .ecosystem_contracts + .map(|a| a.state_transition_proxy_addr), + contracts_config.chain_admin_addr, + contracts_config.governance_addr, + self.eth_watch_config.confirmations_for_eth_event, + ) + } else { + l1_client.clone() + }; + let eth_watch = EthWatch::new( - self.contracts_config.diamond_proxy_addr, + sl_diamond_proxy_addr, &governance_contract(), &chain_admin_contract(), - Box::new(eth_client), + Box::new(l1_client), + Box::new(sl_client), main_pool, self.eth_watch_config.poll_interval(), priority_tree, diff --git a/core/node/node_framework/src/implementations/layers/pk_signing_eth_client.rs b/core/node/node_framework/src/implementations/layers/pk_signing_eth_client.rs index fdef23a40692..73ddd11134eb 100644 --- a/core/node/node_framework/src/implementations/layers/pk_signing_eth_client.rs +++ b/core/node/node_framework/src/implementations/layers/pk_signing_eth_client.rs @@ -8,7 +8,8 @@ use zksync_types::SLChainId; use crate::{ implementations::resources::eth_interface::{ - BoundEthInterfaceForBlobsResource, BoundEthInterfaceResource, EthInterfaceResource, + BoundEthInterfaceForBlobsResource, BoundEthInterfaceForL2Resource, + BoundEthInterfaceResource, EthInterfaceResource, GatewayEthInterfaceResource, }, wiring_layer::{WiringError, WiringLayer}, FromContext, IntoContext, @@ -19,6 +20,7 @@ use crate::{ pub struct PKSigningEthClientLayer { eth_sender_config: EthConfig, contracts_config: ContractsConfig, + gateway_contracts_config: Option, sl_chain_id: SLChainId, wallets: wallets::EthSender, } @@ -27,6 +29,7 @@ pub struct PKSigningEthClientLayer { #[context(crate = crate)] pub struct Input { pub eth_client: EthInterfaceResource, + pub gateway_client: Option, } #[derive(Debug, IntoContext)] @@ -35,18 +38,21 @@ pub struct Output { pub signing_client: BoundEthInterfaceResource, /// Only provided if the blob operator key is provided to the layer. pub signing_client_for_blobs: Option, + pub signing_client_for_gateway: Option, } impl PKSigningEthClientLayer { pub fn new( eth_sender_config: EthConfig, contracts_config: ContractsConfig, + gateway_contracts_config: Option, sl_chain_id: SLChainId, wallets: wallets::EthSender, ) -> Self { Self { eth_sender_config, contracts_config, + gateway_contracts_config, sl_chain_id, wallets, } @@ -91,10 +97,27 @@ impl WiringLayer for PKSigningEthClientLayer { ); BoundEthInterfaceForBlobsResource(Box::new(signing_client_for_blobs)) }); + let signing_client_for_gateway = if input.gateway_client.is_some() { + self.wallets.gateway.map(|gateway_operator| { + let private_key = gateway_operator.private_key(); + let GatewayEthInterfaceResource(gateway_client) = input.gateway_client.unwrap(); + let signing_client_for_blobs = PKSigningClient::new_raw( + private_key.clone(), + self.gateway_contracts_config.unwrap().diamond_proxy_addr, + gas_adjuster_config.default_priority_fee_per_gas, + self.sl_chain_id, + gateway_client, + ); + BoundEthInterfaceForL2Resource(Box::new(signing_client_for_blobs)) + }) + } else { + None + }; Ok(Output { signing_client, signing_client_for_blobs, + signing_client_for_gateway, }) } } diff --git a/core/node/node_framework/src/implementations/layers/query_eth_client.rs b/core/node/node_framework/src/implementations/layers/query_eth_client.rs index 116823d92d8a..e1a8dd71fed8 100644 --- a/core/node/node_framework/src/implementations/layers/query_eth_client.rs +++ b/core/node/node_framework/src/implementations/layers/query_eth_client.rs @@ -1,9 +1,11 @@ use anyhow::Context; -use zksync_types::{settlement::SettlementMode, url::SensitiveUrl, L2ChainId, SLChainId}; +use zksync_types::{url::SensitiveUrl, L2ChainId, SLChainId}; use zksync_web3_decl::client::Client; use crate::{ - implementations::resources::eth_interface::{EthInterfaceResource, L2InterfaceResource}, + implementations::resources::eth_interface::{ + EthInterfaceResource, GatewayEthInterfaceResource, L2InterfaceResource, + }, wiring_layer::{WiringError, WiringLayer}, IntoContext, }; @@ -13,19 +15,19 @@ use crate::{ pub struct QueryEthClientLayer { chain_id: SLChainId, web3_url: SensitiveUrl, - settlement_mode: SettlementMode, + gateway_web3_url: Option, } impl QueryEthClientLayer { pub fn new( chain_id: SLChainId, web3_url: SensitiveUrl, - settlement_mode: SettlementMode, + gateway_web3_url: Option, ) -> Self { Self { chain_id, web3_url, - settlement_mode, + gateway_web3_url, } } } @@ -35,6 +37,7 @@ impl QueryEthClientLayer { pub struct Output { query_client_l1: EthInterfaceResource, query_client_l2: Option, + query_client_gateway: Option, } #[async_trait::async_trait] @@ -55,12 +58,29 @@ impl WiringLayer for QueryEthClientLayer { .for_network(self.chain_id.into()) .build(), )), - query_client_l2: if self.settlement_mode.is_gateway() { + query_client_l2: if self.gateway_web3_url.is_some() { Some(L2InterfaceResource(Box::new( - Client::http(self.web3_url.clone()) - .context("Client::new()")? - .for_network(L2ChainId::try_from(self.chain_id.0).unwrap().into()) - .build(), + Client::http( + self.gateway_web3_url + .clone() + .expect("gateway url is required"), + ) + .context("Client::new()")? + .for_network(L2ChainId::try_from(self.chain_id.0).unwrap().into()) + .build(), + ))) + } else { + None + }, + query_client_gateway: if self.gateway_web3_url.is_some() { + Some(GatewayEthInterfaceResource(Box::new( + Client::http( + self.gateway_web3_url + .clone() + .expect("gateway url is required"), + ) + .context("Client::new()")? + .build(), ))) } else { None diff --git a/core/node/node_framework/src/implementations/resources/eth_interface.rs b/core/node/node_framework/src/implementations/resources/eth_interface.rs index 24b7df327f63..8cd2d67b5b82 100644 --- a/core/node/node_framework/src/implementations/resources/eth_interface.rs +++ b/core/node/node_framework/src/implementations/resources/eth_interface.rs @@ -13,6 +13,14 @@ impl Resource for EthInterfaceResource { } } +#[derive(Debug, Clone)] +pub struct GatewayEthInterfaceResource(pub Box>); + +impl Resource for GatewayEthInterfaceResource { + fn name() -> String { + "common/gateway_eth_interface".into() + } +} /// A resource that provides L2 interface object to the service. /// It is expected to have the same URL as the `EthInterfaceResource`, but have different capabilities. /// diff --git a/core/tests/ts-integration/src/context-owner.ts b/core/tests/ts-integration/src/context-owner.ts index 87dbe98667fd..f1a5ee0d7768 100644 --- a/core/tests/ts-integration/src/context-owner.ts +++ b/core/tests/ts-integration/src/context-owner.ts @@ -403,7 +403,13 @@ export class TestContextOwner { .then(async (tx) => { const amount = ethers.formatEther(l2ETHAmountToDeposit); this.reporter.debug(`Sent ETH deposit. Nonce ${tx.nonce}, amount: ${amount}, hash: ${tx.hash}`); - await tx.wait(); + + const timeoutPromise = new Promise((_, reject) => + setTimeout(() => reject(new Error('Transaction wait timeout')), 120 * 1000) + ); + + // Race the transaction wait against the timeout + await Promise.race([tx.wait(), timeoutPromise]); }); nonce = nonce + 1 + (ethIsBaseToken ? 0 : 1); this.reporter.debug( diff --git a/infrastructure/zk/src/config.ts b/infrastructure/zk/src/config.ts index 096e1e457895..a0f81ff60619 100644 --- a/infrastructure/zk/src/config.ts +++ b/infrastructure/zk/src/config.ts @@ -302,4 +302,5 @@ command env.modify('ETH_SENDER_SENDER_OPERATOR_COMMIT_ETH_ADDR', `"${operators[0].address}"`, configFile, false); env.modify('ETH_SENDER_SENDER_OPERATOR_BLOBS_PRIVATE_KEY', `"${operators[1].privateKey}"`, configFile, false); env.modify('ETH_SENDER_SENDER_OPERATOR_BLOBS_ETH_ADDR', `"${operators[1].address}"`, configFile, false); + env.modify('ETH_SENDER_SENDER_OPERATOR_GATEWAY_PRIVATE_KEY', `"${operators[0].privateKey}"`, configFile, false); }); diff --git a/infrastructure/zk/src/contract.ts b/infrastructure/zk/src/contract.ts index 693cbda3fb58..1b4bba6185c6 100644 --- a/infrastructure/zk/src/contract.ts +++ b/infrastructure/zk/src/contract.ts @@ -111,6 +111,7 @@ async function migrateToSyncLayer() { // FIXME: consider creating new sync_layer_* variable. updateContractsEnv(envFile, migrationLog, ['GATEWAY_DIAMOND_PROXY_ADDR']); + fs.writeFileSync('backup_diamond.txt', process.env.CONTRACTS_DIAMOND_PROXY_ADDR!); env.modify('CONTRACTS_DIAMOND_PROXY_ADDR', process.env.GATEWAY_DIAMOND_PROXY_ADDR!, envFile, true); env.modify('ETH_SENDER_SENDER_PUBDATA_SENDING_MODE', 'RelayedL2Calldata', envFile, true); env.modify('ETH_SENDER_GAS_ADJUSTER_SETTLEMENT_MODE', 'Gateway', envFile, true); @@ -140,15 +141,23 @@ async function updateConfigOnSyncLayer() { console.log('a'); + env.modify( + 'CONTRACTS_DIAMOND_PROXY_ADDR', + fs.readFileSync('backup_diamond.txt', { encoding: 'utf-8' }), + envFile, + false + ); + for (const envVar of syncLayerEnvVars) { if (specialParams.includes(envVar)) { continue; } - const contractsVar = envVar.replace(/GATEWAY/g, 'CONTRACTS'); + const contractsVar = envVar.replace(/GATEWAY/g, 'GATEWAY_CONTRACTS'); env.modify(contractsVar, process.env[envVar]!, envFile, false); } env.modify('BRIDGE_LAYER_WEB3_URL', process.env.ETH_CLIENT_WEB3_URL!, envFile, false); - env.modify('ETH_CLIENT_WEB3_URL', process.env.GATEWAY_API_WEB3_JSON_RPC_HTTP_URL!, envFile, false); + env.modify('ETH_CLIENT_GATEWAY_WEB3_URL', process.env.GATEWAY_API_WEB3_JSON_RPC_HTTP_URL!, envFile, false); + // for loadtest env.modify('L1_RPC_ADDRESS', process.env.ETH_CLIENT_WEB3_URL!, envFile, false); env.modify('ETH_CLIENT_CHAIN_ID', process.env.GATEWAY_CHAIN_ID!, envFile, false); diff --git a/zk_toolbox/crates/zk_inception/src/commands/external_node/prepare_configs.rs b/zk_toolbox/crates/zk_inception/src/commands/external_node/prepare_configs.rs index 51101c228878..007c1f9a2c18 100644 --- a/zk_toolbox/crates/zk_inception/src/commands/external_node/prepare_configs.rs +++ b/zk_toolbox/crates/zk_inception/src/commands/external_node/prepare_configs.rs @@ -80,6 +80,7 @@ fn prepare_configs( }), l1: Some(L1Secrets { l1_rpc_url: SensitiveUrl::from_str(&args.l1_rpc_url).context("l1_rpc_url")?, + gateway_url: None, }), }; secrets.save_with_base_path(shell, en_configs_path)?;