diff --git a/Cargo.lock b/Cargo.lock index c47e5b77e391..ea3cbdbc70da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9836,6 +9836,7 @@ dependencies = [ "anyhow", "async-recursion", "async-trait", + "test-log", "thiserror", "tokio", "tracing", diff --git a/core/lib/contracts/src/lib.rs b/core/lib/contracts/src/lib.rs index f57649c9d695..5dabd3208894 100644 --- a/core/lib/contracts/src/lib.rs +++ b/core/lib/contracts/src/lib.rs @@ -49,12 +49,13 @@ const DIAMOND_INIT_CONTRACT_FILE: (&str, &str) = ( const GOVERNANCE_CONTRACT_FILE: (&str, &str) = ("governance", "IGovernance.sol/IGovernance.json"); const CHAIN_ADMIN_CONTRACT_FILE: (&str, &str) = ("governance", "IChainAdmin.sol/IChainAdmin.json"); const GETTERS_FACET_CONTRACT_FILE: (&str, &str) = ( - "state-transition/chain-deps/facets", - "Getters.sol/GettersFacet.json", + "state-transition/chain-interfaces", + "IGetters.sol/IGetters.json", ); const MULTICALL3_CONTRACT_FILE: (&str, &str) = ("dev-contracts", "Multicall3.sol/Multicall3.json"); const VERIFIER_CONTRACT_FILE: (&str, &str) = ("state-transition", "Verifier.sol/Verifier.json"); + const _IERC20_CONTRACT_FILE: &str = "contracts/l1-contracts/artifacts/contracts/common/interfaces/IERC20.sol/IERC20.json"; const _FAIL_ON_RECEIVE_CONTRACT_FILE: &str = diff --git a/core/lib/dal/.sqlx/query-26c80e9bafcf7989e7d40c6e424240b34ce29aad3ac6571116e084d45574c448.json b/core/lib/dal/.sqlx/query-26c80e9bafcf7989e7d40c6e424240b34ce29aad3ac6571116e084d45574c448.json new file mode 100644 index 000000000000..ee5de53d6e65 --- /dev/null +++ b/core/lib/dal/.sqlx/query-26c80e9bafcf7989e7d40c6e424240b34ce29aad3ac6571116e084d45574c448.json @@ -0,0 +1,33 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n next_block_to_process\n FROM\n processed_events\n WHERE\n TYPE = $1\n AND chain_id = $2\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "next_block_to_process", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + { + "Custom": { + "name": "event_type", + "kind": { + "Enum": [ + "ProtocolUpgrades", + "PriorityTransactions" + ] + } + } + }, + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "26c80e9bafcf7989e7d40c6e424240b34ce29aad3ac6571116e084d45574c448" +} diff --git a/core/lib/dal/.sqlx/query-afdeecb78e3af802c2b8ffb0f5ab9863082ccd1ce45b2d20e1119f1e78171d66.json b/core/lib/dal/.sqlx/query-afdeecb78e3af802c2b8ffb0f5ab9863082ccd1ce45b2d20e1119f1e78171d66.json new file mode 100644 index 000000000000..bb0d73ee6c85 --- /dev/null +++ b/core/lib/dal/.sqlx/query-afdeecb78e3af802c2b8ffb0f5ab9863082ccd1ce45b2d20e1119f1e78171d66.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO\n processed_events (\n TYPE,\n chain_id,\n next_block_to_process\n )\n VALUES\n ($1, $2, $3)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + { + "Custom": { + "name": "event_type", + "kind": { + "Enum": [ + "ProtocolUpgrades", + "PriorityTransactions" + ] + } + } + }, + "Int8", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "afdeecb78e3af802c2b8ffb0f5ab9863082ccd1ce45b2d20e1119f1e78171d66" +} diff --git a/core/lib/dal/.sqlx/query-c61682ed92c1a43855a991598d593639a63047f46dc8e605ff37b78f43f5cef5.json b/core/lib/dal/.sqlx/query-c61682ed92c1a43855a991598d593639a63047f46dc8e605ff37b78f43f5cef5.json new file mode 100644 index 000000000000..b797ccb46045 --- /dev/null +++ b/core/lib/dal/.sqlx/query-c61682ed92c1a43855a991598d593639a63047f46dc8e605ff37b78f43f5cef5.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE processed_events\n SET\n next_block_to_process = $3\n WHERE\n TYPE = $1\n AND chain_id = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + { + "Custom": { + "name": "event_type", + "kind": { + "Enum": [ + "ProtocolUpgrades", + "PriorityTransactions" + ] + } + } + }, + "Int8", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "c61682ed92c1a43855a991598d593639a63047f46dc8e605ff37b78f43f5cef5" +} diff --git a/core/lib/dal/migrations/20240912085550_add_eth_watcher_progress_table.down.sql b/core/lib/dal/migrations/20240912085550_add_eth_watcher_progress_table.down.sql new file mode 100644 index 000000000000..79331481f589 --- /dev/null +++ b/core/lib/dal/migrations/20240912085550_add_eth_watcher_progress_table.down.sql @@ -0,0 +1,4 @@ +DROP TABLE IF EXISTS processed_events; + +DROP TYPE IF EXISTS event_type; + diff --git a/core/lib/dal/migrations/20240912085550_add_eth_watcher_progress_table.up.sql b/core/lib/dal/migrations/20240912085550_add_eth_watcher_progress_table.up.sql new file mode 100644 index 000000000000..be8a2b89a02c --- /dev/null +++ b/core/lib/dal/migrations/20240912085550_add_eth_watcher_progress_table.up.sql @@ -0,0 +1,9 @@ +CREATE TYPE event_type AS ENUM ('ProtocolUpgrades', 'PriorityTransactions'); + +CREATE TABLE processed_events +( + type event_type NOT NULL, + chain_id BIGINT NOT NULL, + next_block_to_process BIGINT NOT NULL, + PRIMARY KEY (chain_id, type) +) diff --git a/core/lib/dal/src/eth_watcher_dal.rs b/core/lib/dal/src/eth_watcher_dal.rs new file mode 100644 index 000000000000..3220868decf6 --- /dev/null +++ b/core/lib/dal/src/eth_watcher_dal.rs @@ -0,0 +1,154 @@ +use zksync_db_connection::{connection::Connection, error::DalResult, instrument::InstrumentExt}; +use zksync_types::SLChainId; + +use crate::Core; + +pub struct EthWatcherDal<'a, 'c> { + pub(crate) storage: &'a mut Connection<'c, Core>, +} + +#[derive(Debug, Copy, Clone, sqlx::Type)] +#[sqlx(type_name = "event_type")] +pub enum EventType { + ProtocolUpgrades, + PriorityTransactions, +} + +impl EthWatcherDal<'_, '_> { + // Returns last set value of next_block_to_process for given event_type and chain_id. + // If the value was missing, initializes it with provided next_block_to_process value + pub async fn get_or_set_next_block_to_process( + &mut self, + event_type: EventType, + chain_id: SLChainId, + next_block_to_process: u64, + ) -> DalResult { + let result = sqlx::query!( + r#" + SELECT + next_block_to_process + FROM + processed_events + WHERE + TYPE = $1 + AND chain_id = $2 + "#, + event_type as EventType, + chain_id.0 as i64 + ) + .instrument("get_or_set_next_block_to_process") + .with_arg("event_type", &event_type) + .with_arg("chain_id", &chain_id) + .fetch_optional(self.storage) + .await?; + + if let Some(row) = result { + Ok(row.next_block_to_process as u64) + } else { + sqlx::query!( + r#" + INSERT INTO + processed_events ( + TYPE, + chain_id, + next_block_to_process + ) + VALUES + ($1, $2, $3) + "#, + event_type as EventType, + chain_id.0 as i64, + next_block_to_process as i64 + ) + .instrument("get_or_set_next_block_to_process - insert") + .with_arg("event_type", &event_type) + .with_arg("chain_id", &chain_id) + .execute(self.storage) + .await?; + + Ok(next_block_to_process) + } + } + + pub async fn update_next_block_to_process( + &mut self, + event_type: EventType, + chain_id: SLChainId, + next_block_to_process: u64, + ) -> DalResult<()> { + sqlx::query!( + r#" + UPDATE processed_events + SET + next_block_to_process = $3 + WHERE + TYPE = $1 + AND chain_id = $2 + "#, + event_type as EventType, + chain_id.0 as i64, + next_block_to_process as i64 + ) + .instrument("update_next_block_to_process") + .with_arg("event_type", &event_type) + .with_arg("chain_id", &chain_id) + .execute(self.storage) + .await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ConnectionPool, Core, CoreDal}; + + #[tokio::test] + async fn test_get_or_set_next_block_to_process_with_different_event_types() { + let pool = ConnectionPool::::test_pool().await; + let mut conn = pool.connection().await.unwrap(); + let mut dal = conn.processed_events_dal(); + + // Test with ProtocolUpgrades + let next_block = dal + .get_or_set_next_block_to_process(EventType::ProtocolUpgrades, SLChainId(1), 100) + .await + .expect("Failed to get or set next block to process"); + assert_eq!(next_block, 100); + + // Test with PriorityTransactions + let next_block = dal + .get_or_set_next_block_to_process(EventType::PriorityTransactions, SLChainId(1), 200) + .await + .expect("Failed to get or set next block to process"); + assert_eq!(next_block, 200); + + // Test with PriorityTransactions + let next_block = dal + .get_or_set_next_block_to_process(EventType::PriorityTransactions, SLChainId(2), 300) + .await + .expect("Failed to get or set next block to process"); + assert_eq!(next_block, 300); + + // Verify that the initial block is not updated for ProtocolUpgrades + let next_block = dal + .get_or_set_next_block_to_process(EventType::ProtocolUpgrades, SLChainId(1), 150) + .await + .expect("Failed to get or set next block to process"); + assert_eq!(next_block, 100); + + // Verify that the initial block is not updated for PriorityTransactions + let next_block = dal + .get_or_set_next_block_to_process(EventType::PriorityTransactions, SLChainId(1), 250) + .await + .expect("Failed to get or set next block to process"); + assert_eq!(next_block, 200); + + // Verify that the initial block is not updated for PriorityTransactions + let next_block = dal + .get_or_set_next_block_to_process(EventType::PriorityTransactions, SLChainId(2), 350) + .await + .expect("Failed to get or set next block to process"); + assert_eq!(next_block, 300); + } +} diff --git a/core/lib/dal/src/lib.rs b/core/lib/dal/src/lib.rs index 0e1badb9af76..f0d2f0c16711 100644 --- a/core/lib/dal/src/lib.rs +++ b/core/lib/dal/src/lib.rs @@ -15,8 +15,9 @@ use crate::{ base_token_dal::BaseTokenDal, blocks_dal::BlocksDal, blocks_web3_dal::BlocksWeb3Dal, consensus_dal::ConsensusDal, contract_verification_dal::ContractVerificationDal, data_availability_dal::DataAvailabilityDal, eth_sender_dal::EthSenderDal, - events_dal::EventsDal, events_web3_dal::EventsWeb3Dal, factory_deps_dal::FactoryDepsDal, - proof_generation_dal::ProofGenerationDal, protocol_versions_dal::ProtocolVersionsDal, + eth_watcher_dal::EthWatcherDal, events_dal::EventsDal, events_web3_dal::EventsWeb3Dal, + factory_deps_dal::FactoryDepsDal, proof_generation_dal::ProofGenerationDal, + protocol_versions_dal::ProtocolVersionsDal, protocol_versions_web3_dal::ProtocolVersionsWeb3Dal, pruning_dal::PruningDal, snapshot_recovery_dal::SnapshotRecoveryDal, snapshots_creator_dal::SnapshotsCreatorDal, snapshots_dal::SnapshotsDal, storage_logs_dal::StorageLogsDal, @@ -35,6 +36,7 @@ pub mod consensus_dal; pub mod contract_verification_dal; mod data_availability_dal; pub mod eth_sender_dal; +pub mod eth_watcher_dal; pub mod events_dal; pub mod events_web3_dal; pub mod factory_deps_dal; @@ -132,6 +134,8 @@ where fn vm_runner_dal(&mut self) -> VmRunnerDal<'_, 'a>; fn base_token_dal(&mut self) -> BaseTokenDal<'_, 'a>; + + fn processed_events_dal(&mut self) -> EthWatcherDal<'_, 'a>; } #[derive(Clone, Debug)] @@ -258,4 +262,8 @@ impl<'a> CoreDal<'a> for Connection<'a, Core> { fn base_token_dal(&mut self) -> BaseTokenDal<'_, 'a> { BaseTokenDal { storage: self } } + + fn processed_events_dal(&mut self) -> EthWatcherDal<'_, 'a> { + EthWatcherDal { storage: self } + } } diff --git a/core/node/eth_watch/Cargo.toml b/core/node/eth_watch/Cargo.toml index a3d6325f4a24..985649c35daf 100644 --- a/core/node/eth_watch/Cargo.toml +++ b/core/node/eth_watch/Cargo.toml @@ -28,3 +28,4 @@ async-recursion.workspace = true [dev-dependencies] zksync_concurrency.workspace = true +test-log.workspace = true diff --git a/core/node/eth_watch/src/client.rs b/core/node/eth_watch/src/client.rs index 237c8e5bc2e6..2dbaf869390c 100644 --- a/core/node/eth_watch/src/client.rs +++ b/core/node/eth_watch/src/client.rs @@ -1,7 +1,9 @@ use std::fmt; use anyhow::Context; -use zksync_contracts::{state_transition_manager_contract, verifier_contract}; +use zksync_contracts::{ + getters_facet_contract, state_transition_manager_contract, verifier_contract, +}; use zksync_eth_client::{ clients::{DynClient, L1}, CallFunctionArgs, ClientError, ContractCallError, EnrichedClientError, EnrichedClientResult, @@ -10,7 +12,7 @@ use zksync_eth_client::{ use zksync_types::{ ethabi::Contract, web3::{BlockId, BlockNumber, FilterBuilder, Log}, - Address, H256, + Address, SLChainId, H256, U256, }; /// L1 client functionality used by [`EthWatch`](crate::EthWatch) and constituent event processors. @@ -21,10 +23,14 @@ pub trait EthClient: 'static + fmt::Debug + Send + Sync { &self, from: BlockNumber, to: BlockNumber, + topic1: H256, + topic2: Option, retries_left: usize, ) -> EnrichedClientResult>; /// Returns finalized L1 block number. async fn finalized_block_number(&self) -> EnrichedClientResult; + + async fn get_total_priority_txs(&self) -> Result; /// Returns scheduler verification key hash by verifier address. async fn scheduler_vk_hash(&self, verifier_address: Address) -> Result; @@ -33,8 +39,8 @@ pub trait EthClient: 'static + fmt::Debug + Send + Sync { &self, packed_version: H256, ) -> EnrichedClientResult>>; - /// Sets list of topics to return events for. - fn set_topics(&mut self, topics: Vec); + + async fn chain_id(&self) -> EnrichedClientResult; } pub const RETRY_LIMIT: usize = 5; @@ -43,10 +49,9 @@ const TOO_MANY_RESULTS_ALCHEMY: &str = "response size exceeded"; const TOO_MANY_RESULTS_RETH: &str = "query exceeds max block range"; /// Implementation of [`EthClient`] based on HTTP JSON-RPC (encapsulated via [`EthInterface`]). -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct EthHttpQueryClient { client: Box>, - topics: Vec, diamond_proxy_addr: Address, governance_address: Address, new_upgrade_cut_data_signature: H256, @@ -54,6 +59,7 @@ pub struct EthHttpQueryClient { state_transition_manager_address: Option
, chain_admin_address: Option
, verifier_contract_abi: Contract, + getters_facet_contract_abi: Contract, confirmations_for_eth_event: Option, } @@ -73,7 +79,6 @@ impl EthHttpQueryClient { ); Self { client: client.for_component("watch"), - topics: Vec::new(), diamond_proxy_addr, state_transition_manager_address, chain_admin_address, @@ -84,6 +89,7 @@ impl EthHttpQueryClient { .unwrap() .signature(), verifier_contract_abi: verifier_contract(), + getters_facet_contract_abi: getters_facet_contract(), confirmations_for_eth_event, } } @@ -249,13 +255,15 @@ impl EthClient for EthHttpQueryClient { &self, from: BlockNumber, to: BlockNumber, + topic1: H256, + topic2: Option, retries_left: usize, ) -> EnrichedClientResult> { self.get_events_inner( from, to, - Some(self.topics.clone()), - None, + Some(vec![topic1]), + topic2.map(|topic2| vec![topic2]), Some(self.get_default_address_list()), retries_left, ) @@ -283,7 +291,15 @@ impl EthClient for EthHttpQueryClient { } } - fn set_topics(&mut self, topics: Vec) { - self.topics = topics; + async fn get_total_priority_txs(&self) -> Result { + CallFunctionArgs::new("getTotalPriorityTxs", ()) + .for_contract(self.diamond_proxy_addr, &self.getters_facet_contract_abi) + .call(&self.client) + .await + .map(|x: U256| x.try_into().unwrap()) + } + + async fn chain_id(&self) -> EnrichedClientResult { + Ok(self.client.fetch_chain_id().await?) } } diff --git a/core/node/eth_watch/src/event_processors/decentralized_upgrades.rs b/core/node/eth_watch/src/event_processors/decentralized_upgrades.rs index dff10662e984..aa43e7239f88 100644 --- a/core/node/eth_watch/src/event_processors/decentralized_upgrades.rs +++ b/core/node/eth_watch/src/event_processors/decentralized_upgrades.rs @@ -1,5 +1,5 @@ use anyhow::Context as _; -use zksync_dal::{Connection, Core, CoreDal, DalError}; +use zksync_dal::{eth_watcher_dal::EventType, Connection, Core, CoreDal, DalError}; use zksync_types::{ ethabi::Contract, protocol_version::ProtocolSemanticVersion, web3::Log, ProtocolUpgrade, H256, U256, @@ -7,7 +7,7 @@ use zksync_types::{ use crate::{ client::EthClient, - event_processors::{EventProcessor, EventProcessorError}, + event_processors::{EventProcessor, EventProcessorError, EventsSource}, metrics::{PollStage, METRICS}, }; @@ -40,18 +40,18 @@ impl EventProcessor for DecentralizedUpgradesEventProcessor { async fn process_events( &mut self, storage: &mut Connection<'_, Core>, - client: &dyn EthClient, + sl_client: &dyn EthClient, events: Vec, - ) -> Result<(), EventProcessorError> { + ) -> Result { let mut upgrades = Vec::new(); - for event in events { + for event in &events { let version = event.topics.get(1).copied().context("missing topic 1")?; let timestamp: u64 = U256::from_big_endian(&event.data.0) .try_into() .ok() .context("upgrade timestamp is too big")?; - let diamond_cut = client + let diamond_cut = sl_client .diamond_cut_by_version(version) .await? .context("missing upgrade data on STM")?; @@ -62,7 +62,7 @@ impl EventProcessor for DecentralizedUpgradesEventProcessor { }; // Scheduler VK is not present in proposal event. It is hard coded in verifier contract. let scheduler_vk_hash = if let Some(address) = upgrade.verifier_address { - Some(client.scheduler_vk_hash(address).await?) + Some(sl_client.scheduler_vk_hash(address).await?) } else { None }; @@ -75,7 +75,7 @@ impl EventProcessor for DecentralizedUpgradesEventProcessor { .collect(); let Some((last_upgrade, _)) = new_upgrades.last() else { - return Ok(()); + return Ok(events.len()); }; let versions: Vec<_> = new_upgrades .iter() @@ -125,10 +125,18 @@ impl EventProcessor for DecentralizedUpgradesEventProcessor { stage_latency.observe(); self.last_seen_protocol_version = last_version; - Ok(()) + Ok(events.len()) } fn relevant_topic(&self) -> H256 { self.update_upgrade_timestamp_signature } + + fn event_source(&self) -> EventsSource { + EventsSource::SL + } + + fn event_type(&self) -> EventType { + EventType::ProtocolUpgrades + } } diff --git a/core/node/eth_watch/src/event_processors/governance_upgrades.rs b/core/node/eth_watch/src/event_processors/governance_upgrades.rs deleted file mode 100644 index 72f5c411892f..000000000000 --- a/core/node/eth_watch/src/event_processors/governance_upgrades.rs +++ /dev/null @@ -1,142 +0,0 @@ -use anyhow::Context as _; -use zksync_dal::{Connection, Core, CoreDal, DalError}; -use zksync_types::{ - ethabi::Contract, protocol_upgrade::GovernanceOperation, - protocol_version::ProtocolSemanticVersion, web3::Log, Address, ProtocolUpgrade, H256, -}; - -use crate::{ - client::EthClient, - event_processors::{EventProcessor, EventProcessorError}, - metrics::{PollStage, METRICS}, -}; - -/// Listens to operation events coming from the governance contract and saves new protocol upgrade proposals to the database. -#[derive(Debug)] -pub struct GovernanceUpgradesEventProcessor { - // ZKsync diamond proxy - target_contract_address: Address, - /// Last protocol version seen. Used to skip events for already known upgrade proposals. - last_seen_protocol_version: ProtocolSemanticVersion, - upgrade_proposal_signature: H256, -} - -impl GovernanceUpgradesEventProcessor { - pub fn new( - target_contract_address: Address, - last_seen_protocol_version: ProtocolSemanticVersion, - governance_contract: &Contract, - ) -> Self { - Self { - target_contract_address, - last_seen_protocol_version, - upgrade_proposal_signature: governance_contract - .event("TransparentOperationScheduled") - .context("TransparentOperationScheduled event is missing in ABI") - .unwrap() - .signature(), - } - } -} - -#[async_trait::async_trait] -impl EventProcessor for GovernanceUpgradesEventProcessor { - async fn process_events( - &mut self, - storage: &mut Connection<'_, Core>, - client: &dyn EthClient, - events: Vec, - ) -> Result<(), EventProcessorError> { - let mut upgrades = Vec::new(); - for event in events { - assert_eq!(event.topics[0], self.upgrade_proposal_signature); // guaranteed by the watcher - - let governance_operation = GovernanceOperation::try_from(event) - .map_err(|err| EventProcessorError::log_parse(err, "governance operation"))?; - // Some calls can target other contracts than Diamond proxy, skip them. - for call in governance_operation - .calls - .into_iter() - .filter(|call| call.target == self.target_contract_address) - { - // We might not get an upgrade operation here, but something else instead - // (e.g. `acceptGovernor` call), so if parsing doesn't work, just skip the call. - let Ok(upgrade) = ProtocolUpgrade::try_from(call) else { - tracing::warn!( - "Failed to parse governance operation call as protocol upgrade, skipping" - ); - continue; - }; - // Scheduler VK is not present in proposal event. It is hard coded in verifier contract. - let scheduler_vk_hash = if let Some(address) = upgrade.verifier_address { - Some(client.scheduler_vk_hash(address).await?) - } else { - None - }; - upgrades.push((upgrade, scheduler_vk_hash)); - } - } - - let new_upgrades: Vec<_> = upgrades - .into_iter() - .skip_while(|(v, _)| v.version <= self.last_seen_protocol_version) - .collect(); - - let Some((last_upgrade, _)) = new_upgrades.last() else { - return Ok(()); - }; - let versions: Vec<_> = new_upgrades - .iter() - .map(|(u, _)| u.version.to_string()) - .collect(); - tracing::debug!("Received upgrades with versions: {versions:?}"); - - let last_version = last_upgrade.version; - let stage_latency = METRICS.poll_eth_node[&PollStage::PersistUpgrades].start(); - for (upgrade, scheduler_vk_hash) in new_upgrades { - let latest_semantic_version = storage - .protocol_versions_dal() - .latest_semantic_version() - .await - .map_err(DalError::generalize)? - .context("expected some version to be present in DB")?; - - if upgrade.version > latest_semantic_version { - let latest_version = storage - .protocol_versions_dal() - .get_protocol_version_with_latest_patch(latest_semantic_version.minor) - .await - .map_err(DalError::generalize)? - .with_context(|| { - format!( - "expected minor version {} to be present in DB", - latest_semantic_version.minor as u16 - ) - })?; - - let new_version = latest_version.apply_upgrade(upgrade, scheduler_vk_hash); - if new_version.version.minor == latest_semantic_version.minor { - // Only verification parameters may change if only patch is bumped. - assert_eq!( - new_version.base_system_contracts_hashes, - latest_version.base_system_contracts_hashes - ); - assert!(new_version.tx.is_none()); - } - storage - .protocol_versions_dal() - .save_protocol_version_with_tx(&new_version) - .await - .map_err(DalError::generalize)?; - } - } - stage_latency.observe(); - - self.last_seen_protocol_version = last_version; - Ok(()) - } - - fn relevant_topic(&self) -> H256 { - self.upgrade_proposal_signature - } -} diff --git a/core/node/eth_watch/src/event_processors/mod.rs b/core/node/eth_watch/src/event_processors/mod.rs index 43ae259305a3..f145181b0cf9 100644 --- a/core/node/eth_watch/src/event_processors/mod.rs +++ b/core/node/eth_watch/src/event_processors/mod.rs @@ -1,18 +1,17 @@ use std::fmt; -use zksync_dal::{Connection, Core}; +use zksync_dal::{eth_watcher_dal::EventType, Connection, Core}; use zksync_eth_client::{ContractCallError, EnrichedClientError}; use zksync_types::{web3::Log, H256}; pub(crate) use self::{ decentralized_upgrades::DecentralizedUpgradesEventProcessor, - governance_upgrades::GovernanceUpgradesEventProcessor, priority_ops::PriorityOpsEventProcessor, + priority_ops::PriorityOpsEventProcessor, }; use crate::client::EthClient; mod decentralized_upgrades; -mod governance_upgrades; -mod priority_ops; +pub mod priority_ops; /// Errors issued by an [`EventProcessor`]. #[derive(Debug, thiserror::Error)] @@ -32,6 +31,12 @@ pub(super) enum EventProcessorError { Internal(#[from] anyhow::Error), } +#[derive(Debug)] +pub(super) enum EventsSource { + L1, + SL, +} + impl EventProcessorError { pub fn log_parse(source: impl Into, log_kind: &'static str) -> Self { Self::LogParse { @@ -46,13 +51,18 @@ impl EventProcessorError { #[async_trait::async_trait] pub(super) trait EventProcessor: 'static + fmt::Debug + Send + Sync { /// Processes given events. All events are guaranteed to match [`Self::relevant_topic()`]. + /// Returns number of processed events, this result is used to update last processed block. async fn process_events( &mut self, storage: &mut Connection<'_, Core>, - client: &dyn EthClient, + sl_client: &dyn EthClient, events: Vec, - ) -> Result<(), EventProcessorError>; + ) -> Result; /// Relevant topic which defines what events to be processed fn relevant_topic(&self) -> H256; + + fn event_source(&self) -> EventsSource; + + fn event_type(&self) -> EventType; } diff --git a/core/node/eth_watch/src/event_processors/priority_ops.rs b/core/node/eth_watch/src/event_processors/priority_ops.rs index 2783637fdb9b..051c076850e9 100644 --- a/core/node/eth_watch/src/event_processors/priority_ops.rs +++ b/core/node/eth_watch/src/event_processors/priority_ops.rs @@ -2,13 +2,13 @@ use std::convert::TryFrom; use anyhow::Context; use zksync_contracts::hyperchain_contract; -use zksync_dal::{Connection, Core, CoreDal, DalError}; +use zksync_dal::{eth_watcher_dal::EventType, Connection, Core, CoreDal, DalError}; use zksync_shared_metrics::{TxStage, APP_METRICS}; use zksync_types::{l1::L1Tx, web3::Log, PriorityOpId, H256}; use crate::{ client::EthClient, - event_processors::{EventProcessor, EventProcessorError}, + event_processors::{EventProcessor, EventProcessorError, EventsSource}, metrics::{PollStage, METRICS}, }; @@ -36,10 +36,11 @@ impl EventProcessor for PriorityOpsEventProcessor { async fn process_events( &mut self, storage: &mut Connection<'_, Core>, - _client: &dyn EthClient, + sl_client: &dyn EthClient, events: Vec, - ) -> Result<(), EventProcessorError> { + ) -> Result { let mut priority_ops = Vec::new(); + let events_count = events.len(); for event in events { assert_eq!(event.topics[0], self.new_priority_request_signature); // guaranteed by the watcher let tx = L1Tx::try_from(event) @@ -48,7 +49,7 @@ impl EventProcessor for PriorityOpsEventProcessor { } if priority_ops.is_empty() { - return Ok(()); + return Ok(events_count); } let first = &priority_ops[0]; @@ -70,33 +71,49 @@ impl EventProcessor for PriorityOpsEventProcessor { .into_iter() .skip_while(|tx| tx.serial_id() < self.next_expected_priority_id) .collect(); - let (Some(first_new), Some(last_new)) = (new_ops.first(), new_ops.last()) else { - return Ok(()); + let skipped_ops = events_count - new_ops.len(); + let Some(first_new) = new_ops.first() else { + return Ok(events_count); }; assert_eq!( first_new.serial_id(), self.next_expected_priority_id, "priority transaction serial id mismatch" ); - let next_expected_priority_id = last_new.serial_id().next(); let stage_latency = METRICS.poll_eth_node[&PollStage::PersistL1Txs].start(); APP_METRICS.processed_txs[&TxStage::added_to_mempool()].inc(); APP_METRICS.processed_l1_txs[&TxStage::added_to_mempool()].inc(); - for new_op in new_ops { - let eth_block = new_op.eth_block(); + let processed_priority_transactions = sl_client.get_total_priority_txs().await?; + let ops_to_insert: Vec<&L1Tx> = new_ops + .iter() + .take_while(|op| processed_priority_transactions > op.serial_id().0) + .collect(); + + for new_op in &ops_to_insert { storage .transactions_dal() - .insert_transaction_l1(&new_op, eth_block) + .insert_transaction_l1(new_op, new_op.eth_block()) .await .map_err(DalError::generalize)?; } stage_latency.observe(); - self.next_expected_priority_id = next_expected_priority_id; - Ok(()) + if let Some(last_op) = ops_to_insert.last() { + self.next_expected_priority_id = last_op.serial_id().next(); + } + + Ok(skipped_ops + ops_to_insert.len()) } fn relevant_topic(&self) -> H256 { self.new_priority_request_signature } + + fn event_source(&self) -> EventsSource { + EventsSource::L1 + } + + fn event_type(&self) -> EventType { + EventType::PriorityTransactions + } } diff --git a/core/node/eth_watch/src/lib.rs b/core/node/eth_watch/src/lib.rs index e964d63bb19d..537468bb6e4e 100644 --- a/core/node/eth_watch/src/lib.rs +++ b/core/node/eth_watch/src/lib.rs @@ -6,23 +6,20 @@ use std::time::Duration; use anyhow::Context as _; use tokio::sync::watch; -use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; +use zksync_dal::{Connection, ConnectionPool, Core, CoreDal, DalError}; use zksync_system_constants::PRIORITY_EXPIRATION; use zksync_types::{ ethabi::Contract, protocol_version::ProtocolSemanticVersion, - web3::BlockNumber as Web3BlockNumber, Address, PriorityOpId, + web3::BlockNumber as Web3BlockNumber, PriorityOpId, }; pub use self::client::EthHttpQueryClient; use self::{ client::{EthClient, RETRY_LIMIT}, - event_processors::{ - EventProcessor, EventProcessorError, GovernanceUpgradesEventProcessor, - PriorityOpsEventProcessor, - }, - metrics::{PollStage, METRICS}, + event_processors::{EventProcessor, EventProcessorError, PriorityOpsEventProcessor}, + metrics::METRICS, }; -use crate::event_processors::DecentralizedUpgradesEventProcessor; +use crate::event_processors::{DecentralizedUpgradesEventProcessor, EventsSource}; mod client; mod event_processors; @@ -34,70 +31,53 @@ mod tests; struct EthWatchState { last_seen_protocol_version: ProtocolSemanticVersion, next_expected_priority_id: PriorityOpId, - last_processed_ethereum_block: u64, } /// Ethereum watcher component. #[derive(Debug)] pub struct EthWatch { - client: Box, + l1_client: Box, + sl_client: Box, poll_interval: Duration, event_processors: Vec>, - last_processed_ethereum_block: u64, pool: ConnectionPool, } impl EthWatch { pub async fn new( - diamond_proxy_addr: Address, - governance_contract: &Contract, chain_admin_contract: &Contract, - mut client: Box, + l1_client: Box, + sl_client: Box, pool: ConnectionPool, poll_interval: Duration, ) -> anyhow::Result { let mut storage = pool.connection_tagged("eth_watch").await?; - let state = Self::initialize_state(&*client, &mut storage).await?; + let state = Self::initialize_state(&mut storage).await?; tracing::info!("initialized state: {state:?}"); drop(storage); let priority_ops_processor = PriorityOpsEventProcessor::new(state.next_expected_priority_id)?; - let governance_upgrades_processor = GovernanceUpgradesEventProcessor::new( - diamond_proxy_addr, - state.last_seen_protocol_version, - governance_contract, - ); let decentralized_upgrades_processor = DecentralizedUpgradesEventProcessor::new( state.last_seen_protocol_version, chain_admin_contract, ); let event_processors: Vec> = vec![ Box::new(priority_ops_processor), - Box::new(governance_upgrades_processor), Box::new(decentralized_upgrades_processor), ]; - let topics = event_processors - .iter() - .map(|processor| processor.relevant_topic()) - .collect(); - client.set_topics(topics); - Ok(Self { - client, + l1_client, + sl_client, poll_interval, event_processors, - last_processed_ethereum_block: state.last_processed_ethereum_block, pool, }) } #[tracing::instrument(name = "EthWatch::initialize_state", skip_all)] - async fn initialize_state( - client: &dyn EthClient, - storage: &mut Connection<'_, Core>, - ) -> anyhow::Result { + async fn initialize_state(storage: &mut Connection<'_, Core>) -> anyhow::Result { let next_expected_priority_id: PriorityOpId = storage .transactions_dal() .last_priority_id() @@ -110,26 +90,9 @@ impl EthWatch { .await? .context("expected at least one (genesis) version to be present in DB")?; - let last_processed_ethereum_block = match storage - .transactions_dal() - .get_last_processed_l1_block() - .await? - { - // There are some priority ops processed - start from the last processed eth block - // but subtract 1 in case the server stopped mid-block. - Some(block) => block.0.saturating_sub(1).into(), - // There are no priority ops processed - to be safe, scan the last 50k blocks. - None => client - .finalized_block_number() - .await - .context("cannot get current Ethereum block")? - .saturating_sub(PRIORITY_EXPIRATION), - }; - Ok(EthWatchState { next_expected_priority_id, last_seen_protocol_version, - last_processed_ethereum_block, }) } @@ -155,10 +118,6 @@ impl EthWatch { // This is an error because otherwise we could potentially miss a priority operation // thus entering priority mode, which is not desired. tracing::error!("Failed to process new blocks: {err}"); - self.last_processed_ethereum_block = - Self::initialize_state(&*self.client, &mut storage) - .await? - .last_processed_ethereum_block; } } } @@ -172,34 +131,60 @@ impl EthWatch { &mut self, storage: &mut Connection<'_, Core>, ) -> Result<(), EventProcessorError> { - let stage_latency = METRICS.poll_eth_node[&PollStage::Request].start(); - let to_block = self.client.finalized_block_number().await?; - if to_block <= self.last_processed_ethereum_block { - return Ok(()); - } - - let events = self - .client - .get_events( - Web3BlockNumber::Number(self.last_processed_ethereum_block.into()), - Web3BlockNumber::Number(to_block.into()), - RETRY_LIMIT, - ) - .await?; - stage_latency.observe(); - for processor in &mut self.event_processors { - let relevant_topic = processor.relevant_topic(); - let processor_events = events - .iter() - .filter(|event| event.topics.first() == Some(&relevant_topic)) - .cloned() - .collect(); - processor - .process_events(storage, &*self.client, processor_events) + let client = match processor.event_source() { + EventsSource::L1 => self.l1_client.as_ref(), + EventsSource::SL => self.sl_client.as_ref(), + }; + let chain_id = client.chain_id().await?; + let finalized_block = client.finalized_block_number().await?; + + let from_block = storage + .processed_events_dal() + .get_or_set_next_block_to_process( + processor.event_type(), + chain_id, + finalized_block.saturating_sub(PRIORITY_EXPIRATION), + ) + .await + .map_err(DalError::generalize)?; + + let processor_events = client + .get_events( + Web3BlockNumber::Number(from_block.into()), + Web3BlockNumber::Number(finalized_block.into()), + processor.relevant_topic(), + None, + RETRY_LIMIT, + ) .await?; + let processed_events_count = processor + .process_events(storage, &*self.sl_client, processor_events.clone()) + .await?; + + let next_block_to_process = if processed_events_count == processor_events.len() { + finalized_block + 1 + } else if processed_events_count == 0 { + //nothing was processed + from_block + } else { + processor_events[processed_events_count - 1] + .block_number + .expect("Event block number is missing") + .try_into() + .unwrap() + }; + + storage + .processed_events_dal() + .update_next_block_to_process( + processor.event_type(), + chain_id, + next_block_to_process, + ) + .await + .map_err(DalError::generalize)?; } - self.last_processed_ethereum_block = to_block; Ok(()) } } diff --git a/core/node/eth_watch/src/metrics.rs b/core/node/eth_watch/src/metrics.rs index a3684cc6e724..a942d4a6e615 100644 --- a/core/node/eth_watch/src/metrics.rs +++ b/core/node/eth_watch/src/metrics.rs @@ -7,7 +7,6 @@ use vise::{Buckets, Counter, EncodeLabelSet, EncodeLabelValue, Family, Histogram #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] #[metrics(label = "stage", rename_all = "snake_case")] pub(super) enum PollStage { - Request, PersistL1Txs, PersistUpgrades, } diff --git a/core/node/eth_watch/src/tests.rs b/core/node/eth_watch/src/tests.rs index 7ae3b5494e98..a43f9cf9c0af 100644 --- a/core/node/eth_watch/src/tests.rs +++ b/core/node/eth_watch/src/tests.rs @@ -1,37 +1,48 @@ use std::{collections::HashMap, convert::TryInto, sync::Arc}; use tokio::sync::RwLock; -use zksync_contracts::{chain_admin_contract, governance_contract, hyperchain_contract}; +use zksync_contracts::{ + chain_admin_contract, hyperchain_contract, state_transition_manager_contract, +}; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; use zksync_eth_client::{ContractCallError, EnrichedClientResult}; use zksync_types::{ - abi, ethabi, - ethabi::{Hash, Token}, + abi, + abi::ProposedUpgrade, + ethabi, + ethabi::Token, l1::{L1Tx, OpProcessingType, PriorityQueueType}, protocol_upgrade::{ProtocolUpgradeTx, ProtocolUpgradeTxCommonData}, protocol_version::ProtocolSemanticVersion, - web3::{BlockNumber, Log}, + web3::{contract::Tokenizable, BlockNumber, Log}, Address, Execute, L1TxCommonData, PriorityOpId, ProtocolUpgrade, ProtocolVersion, - ProtocolVersionId, Transaction, H256, U256, + ProtocolVersionId, SLChainId, Transaction, H160, H256, U256, U64, }; -use crate::{client::EthClient, EthWatch}; +use crate::{ + client::{EthClient, RETRY_LIMIT}, + EthWatch, +}; #[derive(Debug)] struct FakeEthClientData { transactions: HashMap>, diamond_upgrades: HashMap>, - governance_upgrades: HashMap>, + upgrade_timestamp: HashMap>, last_finalized_block_number: u64, + chain_id: SLChainId, + processed_priority_transactions_count: u64, } impl FakeEthClientData { - fn new() -> Self { + fn new(chain_id: SLChainId) -> Self { Self { transactions: Default::default(), diamond_upgrades: Default::default(), - governance_upgrades: Default::default(), + upgrade_timestamp: Default::default(), last_finalized_block_number: 0, + chain_id, + processed_priority_transactions_count: 0, } } @@ -42,21 +53,30 @@ impl FakeEthClientData { .entry(eth_block.0 as u64) .or_default() .push(tx_into_log(transaction.clone())); + self.processed_priority_transactions_count += 1; } } - fn add_governance_upgrades(&mut self, upgrades: &[(ProtocolUpgrade, u64)]) { + fn add_upgrade_timestamp(&mut self, upgrades: &[(ProtocolUpgrade, u64)]) { for (upgrade, eth_block) in upgrades { - self.governance_upgrades + self.upgrade_timestamp + .entry(*eth_block) + .or_default() + .push(upgrade_timestamp_log(*eth_block)); + self.diamond_upgrades .entry(*eth_block) .or_default() - .push(upgrade_into_governor_log(upgrade.clone(), *eth_block)); + .push(diamond_upgrade_log(upgrade.clone(), *eth_block)); } } fn set_last_finalized_block_number(&mut self, number: u64) { self.last_finalized_block_number = number; } + + fn set_processed_priority_transactions_count(&mut self, number: u64) { + self.processed_priority_transactions_count = number; + } } #[derive(Debug, Clone)] @@ -65,9 +85,9 @@ struct MockEthClient { } impl MockEthClient { - fn new() -> Self { + fn new(chain_id: SLChainId) -> Self { Self { - inner: Arc::new(RwLock::new(FakeEthClientData::new())), + inner: Arc::new(RwLock::new(FakeEthClientData::new(chain_id))), } } @@ -75,8 +95,8 @@ impl MockEthClient { self.inner.write().await.add_transactions(transactions); } - async fn add_governance_upgrades(&mut self, upgrades: &[(ProtocolUpgrade, u64)]) { - self.inner.write().await.add_governance_upgrades(upgrades); + async fn add_upgrade_timestamp(&mut self, upgrades: &[(ProtocolUpgrade, u64)]) { + self.inner.write().await.add_upgrade_timestamp(upgrades); } async fn set_last_finalized_block_number(&mut self, number: u64) { @@ -86,6 +106,13 @@ impl MockEthClient { .set_last_finalized_block_number(number); } + async fn set_processed_priority_transactions_count(&mut self, number: u64) { + self.inner + .write() + .await + .set_processed_priority_transactions_count(number) + } + async fn block_to_number(&self, block: BlockNumber) -> u64 { match block { BlockNumber::Earliest => 0, @@ -104,6 +131,8 @@ impl EthClient for MockEthClient { &self, from: BlockNumber, to: BlockNumber, + topic1: H256, + topic2: Option, _retries_left: usize, ) -> EnrichedClientResult> { let from = self.block_to_number(from).await; @@ -116,15 +145,19 @@ impl EthClient for MockEthClient { if let Some(ops) = self.inner.read().await.diamond_upgrades.get(&number) { logs.extend_from_slice(ops); } - if let Some(ops) = self.inner.read().await.governance_upgrades.get(&number) { + if let Some(ops) = self.inner.read().await.upgrade_timestamp.get(&number) { logs.extend_from_slice(ops); } } - Ok(logs) + Ok(logs + .into_iter() + .filter(|log| { + log.topics.get(0) == Some(&topic1) + && (topic2.is_none() || log.topics.get(1) == topic2.as_ref()) + }) + .collect()) } - fn set_topics(&mut self, _topics: Vec) {} - async fn scheduler_vk_hash( &self, _verifier_address: Address, @@ -138,9 +171,51 @@ impl EthClient for MockEthClient { async fn diamond_cut_by_version( &self, - _packed_version: H256, + packed_version: H256, ) -> EnrichedClientResult>> { - unimplemented!() + let from_block = *self + .inner + .read() + .await + .diamond_upgrades + .keys() + .min() + .unwrap_or(&0); + let to_block = *self + .inner + .read() + .await + .diamond_upgrades + .keys() + .max() + .unwrap_or(&0); + + let logs = self + .get_events( + U64::from(from_block).into(), + U64::from(to_block).into(), + state_transition_manager_contract() + .event("NewUpgradeCutData") + .unwrap() + .signature(), + Some(packed_version), + RETRY_LIMIT, + ) + .await?; + + Ok(logs.into_iter().next().map(|log| log.data.0)) + } + + async fn get_total_priority_txs(&self) -> Result { + Ok(self + .inner + .read() + .await + .processed_priority_transactions_count) + } + + async fn chain_id(&self) -> EnrichedClientResult { + Ok(self.inner.read().await.chain_id) } } @@ -203,27 +278,47 @@ fn build_upgrade_tx(id: ProtocolVersionId, eth_block: u64) -> ProtocolUpgradeTx .unwrap() } -async fn create_test_watcher(connection_pool: ConnectionPool) -> (EthWatch, MockEthClient) { - let client = MockEthClient::new(); +async fn create_test_watcher( + connection_pool: ConnectionPool, + is_gateway: bool, +) -> (EthWatch, MockEthClient, MockEthClient) { + let l1_client = MockEthClient::new(SLChainId(42)); + let sl_client = if is_gateway { + MockEthClient::new(SLChainId(123)) + } else { + l1_client.clone() + }; let watcher = EthWatch::new( - Address::default(), - &governance_contract(), &chain_admin_contract(), - Box::new(client.clone()), + Box::new(l1_client.clone()), + Box::new(sl_client.clone()), connection_pool, std::time::Duration::from_nanos(1), ) .await .unwrap(); - (watcher, client) + (watcher, l1_client, sl_client) } -#[tokio::test] +async fn create_l1_test_watcher( + connection_pool: ConnectionPool, +) -> (EthWatch, MockEthClient) { + let (watcher, l1_client, _) = create_test_watcher(connection_pool, false).await; + (watcher, l1_client) +} + +async fn create_gateway_test_watcher( + connection_pool: ConnectionPool, +) -> (EthWatch, MockEthClient, MockEthClient) { + create_test_watcher(connection_pool, true).await +} + +#[test_log::test(tokio::test)] async fn test_normal_operation_l1_txs() { let connection_pool = ConnectionPool::::test_pool().await; setup_db(&connection_pool).await; - let (mut watcher, mut client) = create_test_watcher(connection_pool.clone()).await; + let (mut watcher, mut client) = create_l1_test_watcher(connection_pool.clone()).await; let mut storage = connection_pool.connection().await.unwrap(); client @@ -258,15 +353,15 @@ async fn test_normal_operation_l1_txs() { assert_eq!(db_tx.common_data.serial_id.0, 2); } -#[tokio::test] -async fn test_gap_in_governance_upgrades() { +#[test_log::test(tokio::test)] +async fn test_gap_in_upgrade_timestamp() { let connection_pool = ConnectionPool::::test_pool().await; setup_db(&connection_pool).await; - let (mut watcher, mut client) = create_test_watcher(connection_pool.clone()).await; + let (mut watcher, mut client) = create_l1_test_watcher(connection_pool.clone()).await; let mut storage = connection_pool.connection().await.unwrap(); client - .add_governance_upgrades(&[( + .add_upgrade_timestamp(&[( ProtocolUpgrade { version: ProtocolSemanticVersion { minor: ProtocolVersionId::next(), @@ -291,18 +386,17 @@ async fn test_gap_in_governance_upgrades() { assert_eq!(db_versions[1].minor, next_version); } -#[tokio::test] -async fn test_normal_operation_governance_upgrades() { +#[test_log::test(tokio::test)] +async fn test_normal_operation_upgrade_timestamp() { zksync_concurrency::testonly::abort_on_panic(); let connection_pool = ConnectionPool::::test_pool().await; setup_db(&connection_pool).await; - let mut client = MockEthClient::new(); + let mut client = MockEthClient::new(SLChainId(42)); let mut watcher = EthWatch::new( - Address::default(), - &governance_contract(), &chain_admin_contract(), Box::new(client.clone()), + Box::new(client.clone()), connection_pool.clone(), std::time::Duration::from_nanos(1), ) @@ -311,7 +405,7 @@ async fn test_normal_operation_governance_upgrades() { let mut storage = connection_pool.connection().await.unwrap(); client - .add_governance_upgrades(&[ + .add_upgrade_timestamp(&[ ( ProtocolUpgrade { tx: None, @@ -375,12 +469,12 @@ async fn test_normal_operation_governance_upgrades() { assert_eq!(tx.common_data.upgrade_id, ProtocolVersionId::next()); } -#[tokio::test] +#[test_log::test(tokio::test)] #[should_panic] async fn test_gap_in_single_batch() { let connection_pool = ConnectionPool::::test_pool().await; setup_db(&connection_pool).await; - let (mut watcher, mut client) = create_test_watcher(connection_pool.clone()).await; + let (mut watcher, mut client) = create_l1_test_watcher(connection_pool.clone()).await; let mut storage = connection_pool.connection().await.unwrap(); client @@ -396,12 +490,12 @@ async fn test_gap_in_single_batch() { watcher.loop_iteration(&mut storage).await.unwrap(); } -#[tokio::test] +#[test_log::test(tokio::test)] #[should_panic] async fn test_gap_between_batches() { let connection_pool = ConnectionPool::::test_pool().await; setup_db(&connection_pool).await; - let (mut watcher, mut client) = create_test_watcher(connection_pool.clone()).await; + let (mut watcher, mut client) = create_l1_test_watcher(connection_pool.clone()).await; let mut storage = connection_pool.connection().await.unwrap(); client @@ -424,12 +518,12 @@ async fn test_gap_between_batches() { watcher.loop_iteration(&mut storage).await.unwrap(); } -#[tokio::test] +#[test_log::test(tokio::test)] async fn test_overlapping_batches() { zksync_concurrency::testonly::abort_on_panic(); let connection_pool = ConnectionPool::::test_pool().await; setup_db(&connection_pool).await; - let (mut watcher, mut client) = create_test_watcher(connection_pool.clone()).await; + let (mut watcher, mut client) = create_l1_test_watcher(connection_pool.clone()).await; let mut storage = connection_pool.connection().await.unwrap(); client @@ -467,6 +561,52 @@ async fn test_overlapping_batches() { assert_eq!(tx.common_data.serial_id.0, 4); } +#[test_log::test(tokio::test)] +async fn test_transactions_get_gradually_processed_by_gateway() { + zksync_concurrency::testonly::abort_on_panic(); + let connection_pool = ConnectionPool::::test_pool().await; + setup_db(&connection_pool).await; + let (mut watcher, mut l1_client, mut gateway_client) = + create_gateway_test_watcher(connection_pool.clone()).await; + + let mut storage = connection_pool.connection().await.unwrap(); + l1_client + .add_transactions(&[ + build_l1_tx(0, 10), + build_l1_tx(1, 14), + build_l1_tx(2, 14), + build_l1_tx(3, 20), + build_l1_tx(4, 22), + ]) + .await; + l1_client.set_last_finalized_block_number(15).await; + gateway_client + .set_processed_priority_transactions_count(2) + .await; + watcher.loop_iteration(&mut storage).await.unwrap(); + + let db_txs = get_all_db_txs(&mut storage).await; + assert_eq!(db_txs.len(), 2); + + l1_client.set_last_finalized_block_number(25).await; + gateway_client + .set_processed_priority_transactions_count(4) + .await; + watcher.loop_iteration(&mut storage).await.unwrap(); + + let db_txs = get_all_db_txs(&mut storage).await; + assert_eq!(db_txs.len(), 4); + let mut db_txs: Vec = db_txs + .into_iter() + .map(|tx| tx.try_into().unwrap()) + .collect(); + db_txs.sort_by_key(|tx| tx.common_data.serial_id); + let tx = db_txs[2].clone(); + assert_eq!(tx.common_data.serial_id.0, 2); + let tx = db_txs[3].clone(); + assert_eq!(tx.common_data.serial_id.0, 3); +} + async fn get_all_db_txs(storage: &mut Connection<'_, Core>) -> Vec { storage.transactions_dal().reset_mempool().await.unwrap(); storage @@ -518,37 +658,69 @@ fn tx_into_log(tx: L1Tx) -> Log { } } -fn upgrade_into_governor_log(upgrade: ProtocolUpgrade, eth_block: u64) -> Log { - let diamond_cut = upgrade_into_diamond_cut(upgrade); +fn init_calldata(protocol_upgrade: ProtocolUpgrade) -> Vec { + let upgrade_token = upgrade_into_diamond_cut(protocol_upgrade); + + let encoded_params = ethabi::encode(&[upgrade_token]); + let execute_upgrade_selector = hyperchain_contract() .function("executeUpgrade") .unwrap() .short_signature(); - let diamond_upgrade_calldata = execute_upgrade_selector - .iter() - .copied() - .chain(ethabi::encode(&[diamond_cut])) - .collect(); - let governance_call = Token::Tuple(vec![ - Token::Address(Default::default()), - Token::Uint(U256::default()), - Token::Bytes(diamond_upgrade_calldata), - ]); - let governance_operation = Token::Tuple(vec![ - Token::Array(vec![governance_call]), - Token::FixedBytes(vec![0u8; 32]), - Token::FixedBytes(vec![0u8; 32]), - ]); - let final_data = ethabi::encode(&[Token::FixedBytes(vec![0u8; 32]), governance_operation]); + + // Concatenate the function selector with the encoded parameters + let mut calldata = Vec::with_capacity(4 + encoded_params.len()); + calldata.extend_from_slice(&execute_upgrade_selector); + calldata.extend_from_slice(&encoded_params); + + calldata +} + +fn diamond_upgrade_log(upgrade: ProtocolUpgrade, eth_block: u64) -> Log { + // struct DiamondCutData { + // FacetCut[] facetCuts; + // address initAddress; + // bytes initCalldata; + // } + let final_data = ethabi::encode(&[Token::Tuple(vec![ + Token::Array(vec![]), + Token::Address(H160::zero()), + Token::Bytes(init_calldata(upgrade.clone())), + ])]); + tracing::info!("{:?}", Token::Bytes(init_calldata(upgrade))); + + Log { + address: Address::repeat_byte(0x1), + topics: vec![ + state_transition_manager_contract() + .event("NewUpgradeCutData") + .unwrap() + .signature(), + H256::from_low_u64_be(eth_block), + ], + data: final_data.into(), + block_hash: Some(H256::repeat_byte(0x11)), + block_number: Some(eth_block.into()), + transaction_hash: Some(H256::random()), + transaction_index: Some(0u64.into()), + log_index: Some(0u64.into()), + transaction_log_index: Some(0u64.into()), + log_type: None, + removed: None, + block_timestamp: None, + } +} +fn upgrade_timestamp_log(eth_block: u64) -> Log { + let final_data = ethabi::encode(&[U256::from(12345).into_token()]); Log { address: Address::repeat_byte(0x1), topics: vec![ - governance_contract() - .event("TransparentOperationScheduled") - .expect("TransparentOperationScheduled event is missing in abi") + chain_admin_contract() + .event("UpdateUpgradeTimestamp") + .expect("UpdateUpgradeTimestamp event is missing in ABI") .signature(), - Default::default(), + H256::from_low_u64_be(eth_block), ], data: final_data.into(), block_hash: Some(H256::repeat_byte(0x11)), @@ -577,7 +749,7 @@ fn upgrade_into_diamond_cut(upgrade: ProtocolUpgrade) -> Token { else { unreachable!() }; - let upgrade_token = abi::ProposedUpgrade { + ProposedUpgrade { l2_protocol_upgrade_tx: tx, factory_deps, bootloader_hash: upgrade.bootloader_code_hash.unwrap_or_default().into(), @@ -589,17 +761,7 @@ fn upgrade_into_diamond_cut(upgrade: ProtocolUpgrade) -> Token { upgrade_timestamp: upgrade.timestamp.into(), new_protocol_version: upgrade.version.pack(), } - .encode(); - Token::Tuple(vec![ - Token::Array(vec![]), - Token::Address(Default::default()), - Token::Bytes( - vec![0u8; 4] - .into_iter() - .chain(ethabi::encode(&[upgrade_token])) - .collect(), - ), - ]) + .encode() } async fn setup_db(connection_pool: &ConnectionPool) { diff --git a/core/node/node_framework/src/implementations/layers/eth_watch.rs b/core/node/node_framework/src/implementations/layers/eth_watch.rs index 53eeb1c52805..e19828d85ccd 100644 --- a/core/node/node_framework/src/implementations/layers/eth_watch.rs +++ b/core/node/node_framework/src/implementations/layers/eth_watch.rs @@ -1,5 +1,5 @@ use zksync_config::{ContractsConfig, EthWatchConfig}; -use zksync_contracts::{chain_admin_contract, governance_contract}; +use zksync_contracts::chain_admin_contract; use zksync_eth_watch::{EthHttpQueryClient, EthWatch}; use crate::{ @@ -71,9 +71,8 @@ impl WiringLayer for EthWatchLayer { ); let eth_watch = EthWatch::new( - self.contracts_config.diamond_proxy_addr, - &governance_contract(), &chain_admin_contract(), + Box::new(eth_client.clone()), Box::new(eth_client), main_pool, self.eth_watch_config.poll_interval(),