From e984bfb8a243bc746549ab9347dc0a367fe02790 Mon Sep 17 00:00:00 2001 From: perekopskiy <53865202+perekopskiy@users.noreply.github.com> Date: Mon, 7 Oct 2024 10:02:09 +0300 Subject: [PATCH] feat(en): periodically fetch bridge addresses (#2949) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Periodically fetch bridge addresses ## Why ❔ Addresses will be changed during gateway upgrade ## Checklist - [ ] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [ ] Documentation comments have been added / updated. - [ ] Code has been formatted via `zk fmt` and `zk lint`. --- .gitignore | 1 + core/bin/external_node/src/config/mod.rs | 8 ++ core/bin/external_node/src/node_builder.rs | 4 + core/lib/config/src/configs/en_config.rs | 3 +- core/lib/config/src/testonly.rs | 1 + core/lib/protobuf_config/src/en.rs | 11 ++- .../protobuf_config/src/proto/config/en.proto | 1 + .../web3/backend_jsonrpsee/namespaces/zks.rs | 2 +- core/node/api_server/src/web3/mod.rs | 80 +++++++++---------- .../api_server/src/web3/namespaces/zks.rs | 4 +- core/node/api_server/src/web3/state.rs | 72 +++++++---------- core/node/api_server/src/web3/testonly.rs | 4 + .../web3_api/server/bridge_addresses.rs | 48 +++++++++++ .../web3_api/{server.rs => server/mod.rs} | 71 ++++++++++++---- .../layers/web3_api/server/sealed_l2_block.rs | 50 ++++++++++++ .../commands/external_node/prepare_configs.rs | 1 + 16 files changed, 254 insertions(+), 107 deletions(-) create mode 100644 core/node/node_framework/src/implementations/layers/web3_api/server/bridge_addresses.rs rename core/node/node_framework/src/implementations/layers/web3_api/{server.rs => server/mod.rs} (81%) create mode 100644 core/node/node_framework/src/implementations/layers/web3_api/server/sealed_l2_block.rs diff --git a/.gitignore b/.gitignore index c3de7a2df84d..bbd13e2319af 100644 --- a/.gitignore +++ b/.gitignore @@ -114,6 +114,7 @@ prover/data/keys/setup_* # Zk Toolbox chains/era/configs/* +chains/gateway/* configs/* era-observability/ core/tests/ts-integration/deployments-zk diff --git a/core/bin/external_node/src/config/mod.rs b/core/bin/external_node/src/config/mod.rs index 9b1677c47c4d..56ee3edfd253 100644 --- a/core/bin/external_node/src/config/mod.rs +++ b/core/bin/external_node/src/config/mod.rs @@ -445,6 +445,8 @@ pub(crate) struct OptionalENConfig { /// Gateway RPC URL, needed for operating during migration. #[allow(dead_code)] pub gateway_url: Option, + /// Interval for bridge addresses refreshing in seconds. + bridge_addresses_refresh_interval_sec: Option, } impl OptionalENConfig { @@ -675,6 +677,7 @@ impl OptionalENConfig { api_namespaces, contracts_diamond_proxy_addr: None, gateway_url: enconfig.gateway_url.clone(), + bridge_addresses_refresh_interval_sec: enconfig.bridge_addresses_refresh_interval_sec, }) } @@ -901,6 +904,11 @@ impl OptionalENConfig { Duration::from_secs(self.pruning_data_retention_sec) } + pub fn bridge_addresses_refresh_interval(&self) -> Option { + self.bridge_addresses_refresh_interval_sec + .map(|n| Duration::from_secs(n.get())) + } + #[cfg(test)] fn mock() -> Self { // Set all values to their defaults diff --git a/core/bin/external_node/src/node_builder.rs b/core/bin/external_node/src/node_builder.rs index d0055896d42e..14e09b9c2a7a 100644 --- a/core/bin/external_node/src/node_builder.rs +++ b/core/bin/external_node/src/node_builder.rs @@ -430,6 +430,10 @@ impl ExternalNodeBuilder { response_body_size_limit: Some(self.config.optional.max_response_body_size()), with_extended_tracing: self.config.optional.extended_rpc_tracing, pruning_info_refresh_interval: Some(pruning_info_refresh_interval), + bridge_addresses_refresh_interval: self + .config + .optional + .bridge_addresses_refresh_interval(), polling_interval: Some(self.config.optional.polling_interval()), websocket_requests_per_minute_limit: None, // To be set by WS server layer method if required. replication_lag_limit: None, // TODO: Support replication lag limit diff --git a/core/lib/config/src/configs/en_config.rs b/core/lib/config/src/configs/en_config.rs index 7f130e3539a8..4cab47b0779e 100644 --- a/core/lib/config/src/configs/en_config.rs +++ b/core/lib/config/src/configs/en_config.rs @@ -1,4 +1,4 @@ -use std::num::NonZeroUsize; +use std::num::{NonZeroU64, NonZeroUsize}; use serde::Deserialize; use zksync_basic_types::{ @@ -19,4 +19,5 @@ pub struct ENConfig { pub main_node_rate_limit_rps: Option, pub gateway_url: Option, + pub bridge_addresses_refresh_interval_sec: Option, } diff --git a/core/lib/config/src/testonly.rs b/core/lib/config/src/testonly.rs index 6fbbad9d8ff2..86d9545b0fb4 100644 --- a/core/lib/config/src/testonly.rs +++ b/core/lib/config/src/testonly.rs @@ -933,6 +933,7 @@ impl Distribution for EncodeDist { main_node_rate_limit_rps: self.sample_opt(|| rng.gen()), gateway_url: self .sample_opt(|| format!("localhost:{}", rng.gen::()).parse().unwrap()), + bridge_addresses_refresh_interval_sec: self.sample_opt(|| rng.gen()), } } } diff --git a/core/lib/protobuf_config/src/en.rs b/core/lib/protobuf_config/src/en.rs index 9c07d1d39297..9d1a39310604 100644 --- a/core/lib/protobuf_config/src/en.rs +++ b/core/lib/protobuf_config/src/en.rs @@ -1,4 +1,7 @@ -use std::{num::NonZeroUsize, str::FromStr}; +use std::{ + num::{NonZeroU64, NonZeroUsize}, + str::FromStr, +}; use anyhow::Context; use zksync_basic_types::{url::SensitiveUrl, L1ChainId, L2ChainId}; @@ -36,6 +39,9 @@ impl ProtoRepr for proto::ExternalNode { .as_ref() .map(|a| a.parse().context("gateway_url")) .transpose()?, + bridge_addresses_refresh_interval_sec: self + .bridge_addresses_refresh_interval_sec + .and_then(NonZeroU64::new), }) } @@ -55,6 +61,9 @@ impl ProtoRepr for proto::ExternalNode { .gateway_url .as_ref() .map(|a| a.expose_str().to_string()), + bridge_addresses_refresh_interval_sec: this + .bridge_addresses_refresh_interval_sec + .map(|a| a.get()), } } } diff --git a/core/lib/protobuf_config/src/proto/config/en.proto b/core/lib/protobuf_config/src/proto/config/en.proto index d8a13d31d4b9..69412704ea0f 100644 --- a/core/lib/protobuf_config/src/proto/config/en.proto +++ b/core/lib/protobuf_config/src/proto/config/en.proto @@ -10,4 +10,5 @@ message ExternalNode { optional uint64 main_node_rate_limit_rps = 6; // optional optional config.genesis.L1BatchCommitDataGeneratorMode l1_batch_commit_data_generator_mode = 7; // optional, default to rollup optional string gateway_url = 8; // optional + optional uint64 bridge_addresses_refresh_interval_sec = 9; // optional } diff --git a/core/node/api_server/src/web3/backend_jsonrpsee/namespaces/zks.rs b/core/node/api_server/src/web3/backend_jsonrpsee/namespaces/zks.rs index f83eb37ad962..31c8f15bb1ea 100644 --- a/core/node/api_server/src/web3/backend_jsonrpsee/namespaces/zks.rs +++ b/core/node/api_server/src/web3/backend_jsonrpsee/namespaces/zks.rs @@ -55,7 +55,7 @@ impl ZksNamespaceServer for ZksNamespace { } async fn get_bridge_contracts(&self) -> RpcResult { - Ok(self.get_bridge_contracts_impl()) + Ok(self.get_bridge_contracts_impl().await) } async fn l1_chain_id(&self) -> RpcResult { diff --git a/core/node/api_server/src/web3/mod.rs b/core/node/api_server/src/web3/mod.rs index bad1b493a5fd..620e9185078e 100644 --- a/core/node/api_server/src/web3/mod.rs +++ b/core/node/api_server/src/web3/mod.rs @@ -47,6 +47,7 @@ use self::{ use crate::{ execution_sandbox::{BlockStartInfo, VmConcurrencyBarrier}, tx_sender::TxSender, + web3::state::BridgeAddressesHandle, }; pub mod backend_jsonrpsee; @@ -143,7 +144,6 @@ struct OptionalApiParams { #[derive(Debug)] pub struct ApiServer { pool: ConnectionPool, - updaters_pool: ConnectionPool, health_updater: Arc, config: InternalApiConfig, transport: ApiTransport, @@ -153,18 +153,21 @@ pub struct ApiServer { namespaces: Vec, method_tracer: Arc, optional: OptionalApiParams, + bridge_addresses_handle: BridgeAddressesHandle, + sealed_l2_block_handle: SealedL2BlockNumber, } #[derive(Debug)] pub struct ApiBuilder { pool: ConnectionPool, - updaters_pool: ConnectionPool, config: InternalApiConfig, polling_interval: Duration, pruning_info_refresh_interval: Duration, // Mandatory params that must be set using builder methods. transport: Option, tx_sender: Option, + bridge_addresses_handle: Option, + sealed_l2_block_handle: Option, // Optional params that may or may not be set using builder methods. We treat `namespaces` // specially because we want to output a warning if they are not set. namespaces: Option>, @@ -178,13 +181,14 @@ impl ApiBuilder { pub fn jsonrpsee_backend(config: InternalApiConfig, pool: ConnectionPool) -> Self { Self { - updaters_pool: pool.clone(), pool, config, polling_interval: Self::DEFAULT_POLLING_INTERVAL, pruning_info_refresh_interval: Self::DEFAULT_PRUNING_INFO_REFRESH_INTERVAL, transport: None, tx_sender: None, + bridge_addresses_handle: None, + sealed_l2_block_handle: None, namespaces: None, method_tracer: Arc::new(MethodTracer::default()), optional: OptionalApiParams::default(), @@ -201,15 +205,6 @@ impl ApiBuilder { self } - /// Configures a dedicated DB pool to be used for updating different information, - /// such as last mined block number or account nonces. This pool is used to execute - /// in a background task. If not called, the main pool will be used. If the API server is under high load, - /// it may make sense to supply a single-connection pool to reduce pool contention with the API methods. - pub fn with_updaters_pool(mut self, pool: ConnectionPool) -> Self { - self.updaters_pool = pool; - self - } - pub fn with_tx_sender(mut self, tx_sender: TxSender) -> Self { self.tx_sender = Some(tx_sender); self @@ -285,6 +280,22 @@ impl ApiBuilder { self } + pub fn with_sealed_l2_block_handle( + mut self, + sealed_l2_block_handle: SealedL2BlockNumber, + ) -> Self { + self.sealed_l2_block_handle = Some(sealed_l2_block_handle); + self + } + + pub fn with_bridge_addresses_handle( + mut self, + bridge_addresses_handle: BridgeAddressesHandle, + ) -> Self { + self.bridge_addresses_handle = Some(bridge_addresses_handle); + self + } + // Intended for tests only. #[doc(hidden)] fn with_pub_sub_events(mut self, sender: mpsc::UnboundedSender) -> Self { @@ -312,7 +323,6 @@ impl ApiBuilder { Ok(ApiServer { pool: self.pool, health_updater: Arc::new(health_updater), - updaters_pool: self.updaters_pool, config: self.config, transport, tx_sender: self.tx_sender.context("Transaction sender not set")?, @@ -326,6 +336,12 @@ impl ApiBuilder { }), method_tracer: self.method_tracer, optional: self.optional, + sealed_l2_block_handle: self + .sealed_l2_block_handle + .context("Sealed l2 block handle not set")?, + bridge_addresses_handle: self + .bridge_addresses_handle + .context("Bridge addresses handle not set")?, }) } } @@ -335,11 +351,8 @@ impl ApiServer { self.health_updater.subscribe() } - async fn build_rpc_state( - self, - last_sealed_l2_block: SealedL2BlockNumber, - ) -> anyhow::Result { - let mut storage = self.updaters_pool.connection_tagged("api").await?; + async fn build_rpc_state(self) -> anyhow::Result { + let mut storage = self.pool.connection_tagged("api").await?; let start_info = BlockStartInfo::new(&mut storage, self.pruning_info_refresh_interval).await?; drop(storage); @@ -363,7 +376,8 @@ impl ApiServer { api_config: self.config, start_info, mempool_cache: self.optional.mempool_cache, - last_sealed_l2_block, + last_sealed_l2_block: self.sealed_l2_block_handle, + bridge_addresses_handle: self.bridge_addresses_handle, tree_api: self.optional.tree_api, }) } @@ -371,11 +385,10 @@ impl ApiServer { async fn build_rpc_module( self, pub_sub: Option, - last_sealed_l2_block: SealedL2BlockNumber, ) -> anyhow::Result> { let namespaces = self.namespaces.clone(); let zksync_network_id = self.config.l2_chain_id; - let rpc_state = self.build_rpc_state(last_sealed_l2_block).await?; + let rpc_state = self.build_rpc_state().await?; // Collect all the methods into a single RPC module. let mut rpc = RpcModule::new(()); @@ -473,21 +486,9 @@ impl ApiServer { self, stop_receiver: watch::Receiver, ) -> anyhow::Result { - // Chosen to be significantly smaller than the interval between L2 blocks, but larger than - // the latency of getting the latest sealed L2 block number from Postgres. If the API server - // processes enough requests, information about the latest sealed L2 block will be updated - // by reporting block difference metrics, so the actual update lag would be much smaller than this value. - const SEALED_L2_BLOCK_UPDATE_INTERVAL: Duration = Duration::from_millis(25); - let transport = self.transport; + let mut tasks = vec![]; - let (last_sealed_l2_block, sealed_l2_block_update_task) = SealedL2BlockNumber::new( - self.updaters_pool.clone(), - SEALED_L2_BLOCK_UPDATE_INTERVAL, - stop_receiver.clone(), - ); - - let mut tasks = vec![tokio::spawn(sealed_l2_block_update_task)]; let pub_sub = if matches!(transport, ApiTransport::WebSocket(_)) && self.namespaces.contains(&Namespace::Pubsub) { @@ -510,12 +511,8 @@ impl ApiServer { // framework it'll no longer be needed. let health_check = self.health_updater.subscribe(); let (local_addr_sender, local_addr) = oneshot::channel(); - let server_task = tokio::spawn(self.run_jsonrpsee_server( - stop_receiver, - pub_sub, - last_sealed_l2_block, - local_addr_sender, - )); + let server_task = + tokio::spawn(self.run_jsonrpsee_server(stop_receiver, pub_sub, local_addr_sender)); tasks.push(server_task); Ok(ApiServerHandles { @@ -584,7 +581,6 @@ impl ApiServer { self, mut stop_receiver: watch::Receiver, pub_sub: Option, - last_sealed_l2_block: SealedL2BlockNumber, local_addr_sender: oneshot::Sender, ) -> anyhow::Result<()> { let transport = self.transport; @@ -640,7 +636,7 @@ impl ApiServer { tracing::info!("Enabled extended call tracing for {transport_str} API server; this might negatively affect performance"); } - let rpc = self.build_rpc_module(pub_sub, last_sealed_l2_block).await?; + let rpc = self.build_rpc_module(pub_sub).await?; let registered_method_names = Arc::new(rpc.method_names().collect::>()); tracing::debug!( "Built RPC module for {transport_str} server with {} methods: {registered_method_names:?}", diff --git a/core/node/api_server/src/web3/namespaces/zks.rs b/core/node/api_server/src/web3/namespaces/zks.rs index 61456095d67c..2192f11eb14e 100644 --- a/core/node/api_server/src/web3/namespaces/zks.rs +++ b/core/node/api_server/src/web3/namespaces/zks.rs @@ -132,8 +132,8 @@ impl ZksNamespace { self.state.api_config.l2_testnet_paymaster_addr } - pub fn get_bridge_contracts_impl(&self) -> BridgeAddresses { - self.state.api_config.bridge_addresses.clone() + pub async fn get_bridge_contracts_impl(&self) -> BridgeAddresses { + self.state.bridge_addresses_handle.read().await } pub fn l1_chain_id_impl(&self) -> U64 { diff --git a/core/node/api_server/src/web3/state.rs b/core/node/api_server/src/web3/state.rs index 8cbb75103cd9..723661ab9087 100644 --- a/core/node/api_server/src/web3/state.rs +++ b/core/node/api_server/src/web3/state.rs @@ -4,13 +4,13 @@ use std::{ atomic::{AtomicU32, Ordering}, Arc, }, - time::{Duration, Instant}, + time::Instant, }; use anyhow::Context as _; use futures::TryFutureExt; use lru::LruCache; -use tokio::sync::{watch, Mutex}; +use tokio::sync::{Mutex, RwLock}; use vise::GaugeGuard; use zksync_config::{ configs::{api::Web3JsonRpcConfig, ContractsConfig}, @@ -20,8 +20,9 @@ use zksync_dal::{Connection, ConnectionPool, Core, CoreDal, DalError}; use zksync_metadata_calculator::api_server::TreeApiClient; use zksync_node_sync::SyncState; use zksync_types::{ - api, commitment::L1BatchCommitmentMode, l2::L2Tx, transaction_request::CallRequest, Address, - L1BatchNumber, L1ChainId, L2BlockNumber, L2ChainId, H256, U256, U64, + api, api::BridgeAddresses, commitment::L1BatchCommitmentMode, l2::L2Tx, + transaction_request::CallRequest, Address, L1BatchNumber, L1ChainId, L2BlockNumber, L2ChainId, + H256, U256, U64, }; use zksync_web3_decl::{error::Web3Error, types::Filter}; @@ -173,51 +174,16 @@ impl InternalApiConfig { /// Thread-safe updatable information about the last sealed L2 block number. /// /// The information may be temporarily outdated and thus should only be used where this is OK -/// (e.g., for metrics reporting). The value is updated by [`Self::diff()`] and [`Self::diff_with_block_args()`] -/// and on an interval specified when creating an instance. -#[derive(Debug, Clone)] -pub(crate) struct SealedL2BlockNumber(Arc); +/// (e.g., for metrics reporting). The value is updated by [`Self::diff()`] and [`Self::diff_with_block_args()`]. +#[derive(Debug, Clone, Default)] +pub struct SealedL2BlockNumber(Arc); impl SealedL2BlockNumber { - /// Creates a handle to the last sealed L2 block number together with a task that will update - /// it on a schedule. - pub fn new( - connection_pool: ConnectionPool, - update_interval: Duration, - stop_receiver: watch::Receiver, - ) -> (Self, impl Future>) { - let this = Self(Arc::default()); - let number_updater = this.clone(); - - let update_task = async move { - loop { - if *stop_receiver.borrow() { - tracing::debug!("Stopping latest sealed L2 block updates"); - return Ok(()); - } - - let mut connection = connection_pool.connection_tagged("api").await.unwrap(); - let Some(last_sealed_l2_block) = - connection.blocks_dal().get_sealed_l2_block_number().await? - else { - tokio::time::sleep(update_interval).await; - continue; - }; - drop(connection); - - number_updater.update(last_sealed_l2_block); - tokio::time::sleep(update_interval).await; - } - }; - - (this, update_task) - } - /// Potentially updates the last sealed L2 block number by comparing it to the provided /// sealed L2 block number (not necessarily the last one). /// /// Returns the last sealed L2 block number after the update. - fn update(&self, maybe_newer_l2_block_number: L2BlockNumber) -> L2BlockNumber { + pub fn update(&self, maybe_newer_l2_block_number: L2BlockNumber) -> L2BlockNumber { let prev_value = self .0 .fetch_max(maybe_newer_l2_block_number.0, Ordering::Relaxed); @@ -231,7 +197,7 @@ impl SealedL2BlockNumber { /// Returns the difference between the latest L2 block number and the resolved L2 block number /// from `block_args`. - pub fn diff_with_block_args(&self, block_args: &BlockArgs) -> u32 { + pub(crate) fn diff_with_block_args(&self, block_args: &BlockArgs) -> u32 { // We compute the difference in any case, since it may update the stored value. let diff = self.diff(block_args.resolved_block_number()); @@ -243,6 +209,23 @@ impl SealedL2BlockNumber { } } +#[derive(Debug, Clone)] +pub struct BridgeAddressesHandle(Arc>); + +impl BridgeAddressesHandle { + pub fn new(bridge_addresses: BridgeAddresses) -> Self { + Self(Arc::new(RwLock::new(bridge_addresses))) + } + + pub async fn update(&self, bridge_addresses: BridgeAddresses) { + *self.0.write().await = bridge_addresses; + } + + pub async fn read(&self) -> BridgeAddresses { + self.0.read().await.clone() + } +} + /// Holder for the data required for the API to be functional. #[derive(Debug, Clone)] pub(crate) struct RpcState { @@ -258,6 +241,7 @@ pub(crate) struct RpcState { pub(super) start_info: BlockStartInfo, pub(super) mempool_cache: Option, pub(super) last_sealed_l2_block: SealedL2BlockNumber, + pub(super) bridge_addresses_handle: BridgeAddressesHandle, } impl RpcState { diff --git a/core/node/api_server/src/web3/testonly.rs b/core/node/api_server/src/web3/testonly.rs index 93309fc09cf1..3b05e235c6d4 100644 --- a/core/node/api_server/src/web3/testonly.rs +++ b/core/node/api_server/src/web3/testonly.rs @@ -181,6 +181,8 @@ async fn spawn_server( let mut namespaces = Namespace::DEFAULT.to_vec(); namespaces.extend([Namespace::Debug, Namespace::Snapshots, Namespace::Unstable]); + let sealed_l2_block_handle = SealedL2BlockNumber::default(); + let bridge_addresses_handle = BridgeAddressesHandle::new(api_config.bridge_addresses.clone()); let server_builder = match transport { ApiTransportLabel::Http => ApiBuilder::jsonrpsee_backend(api_config, pool).http(0), @@ -202,6 +204,8 @@ async fn spawn_server( .with_pub_sub_events(pub_sub_events_sender) .with_method_tracer(method_tracer) .enable_api_namespaces(namespaces) + .with_sealed_l2_block_handle(sealed_l2_block_handle) + .with_bridge_addresses_handle(bridge_addresses_handle) .build() .expect("Unable to build API server") .run(stop_receiver) diff --git a/core/node/node_framework/src/implementations/layers/web3_api/server/bridge_addresses.rs b/core/node/node_framework/src/implementations/layers/web3_api/server/bridge_addresses.rs new file mode 100644 index 000000000000..4ba8098c8399 --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/web3_api/server/bridge_addresses.rs @@ -0,0 +1,48 @@ +use std::time::Duration; + +use zksync_node_api_server::web3::state::BridgeAddressesHandle; +use zksync_web3_decl::{ + client::{DynClient, L2}, + namespaces::ZksNamespaceClient, +}; + +use crate::{StopReceiver, Task, TaskId}; + +#[derive(Debug)] +pub struct BridgeAddressesUpdaterTask { + pub bridge_address_updater: BridgeAddressesHandle, + pub main_node_client: Box>, + pub update_interval: Option, +} + +#[async_trait::async_trait] +impl Task for BridgeAddressesUpdaterTask { + fn id(&self) -> TaskId { + "bridge_addresses_updater_task".into() + } + + async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { + const DEFAULT_INTERVAL: Duration = Duration::from_secs(30); + + let update_interval = self.update_interval.unwrap_or(DEFAULT_INTERVAL); + while !*stop_receiver.0.borrow_and_update() { + match self.main_node_client.get_bridge_contracts().await { + Ok(bridge_addresses) => { + self.bridge_address_updater.update(bridge_addresses).await; + } + Err(err) => { + tracing::error!("Failed to query `get_bridge_contracts`, error: {err:?}"); + } + } + + if tokio::time::timeout(update_interval, stop_receiver.0.changed()) + .await + .is_ok() + { + break; + } + } + + Ok(()) + } +} diff --git a/core/node/node_framework/src/implementations/layers/web3_api/server.rs b/core/node/node_framework/src/implementations/layers/web3_api/server/mod.rs similarity index 81% rename from core/node/node_framework/src/implementations/layers/web3_api/server.rs rename to core/node/node_framework/src/implementations/layers/web3_api/server/mod.rs index 0a39ae747c71..390d321647cf 100644 --- a/core/node/node_framework/src/implementations/layers/web3_api/server.rs +++ b/core/node/node_framework/src/implementations/layers/web3_api/server/mod.rs @@ -3,15 +3,24 @@ use std::{num::NonZeroU32, time::Duration}; use tokio::{sync::oneshot, task::JoinHandle}; use zksync_circuit_breaker::replication_lag::ReplicationLagChecker; use zksync_config::configs::api::MaxResponseSize; -use zksync_node_api_server::web3::{state::InternalApiConfig, ApiBuilder, ApiServer, Namespace}; +use zksync_node_api_server::web3::{ + state::{BridgeAddressesHandle, InternalApiConfig, SealedL2BlockNumber}, + ApiBuilder, ApiServer, Namespace, +}; use crate::{ - implementations::resources::{ - circuit_breakers::CircuitBreakersResource, - healthcheck::AppHealthCheckResource, - pools::{PoolResource, ReplicaPool}, - sync_state::SyncStateResource, - web3_api::{MempoolCacheResource, TreeApiClientResource, TxSenderResource}, + implementations::{ + layers::web3_api::server::{ + bridge_addresses::BridgeAddressesUpdaterTask, sealed_l2_block::SealedL2BlockUpdaterTask, + }, + resources::{ + circuit_breakers::CircuitBreakersResource, + healthcheck::AppHealthCheckResource, + main_node_client::MainNodeClientResource, + pools::{PoolResource, ReplicaPool}, + sync_state::SyncStateResource, + web3_api::{MempoolCacheResource, TreeApiClientResource, TxSenderResource}, + }, }, service::StopReceiver, task::{Task, TaskId}, @@ -19,6 +28,9 @@ use crate::{ FromContext, IntoContext, }; +mod bridge_addresses; +mod sealed_l2_block; + /// Set of optional variables that can be altered to modify the behavior of API builder. #[derive(Debug, Default)] pub struct Web3ServerOptionalConfig { @@ -33,6 +45,8 @@ pub struct Web3ServerOptionalConfig { pub replication_lag_limit: Option, // Used by the external node. pub pruning_info_refresh_interval: Option, + // Used by the external node. + pub bridge_addresses_refresh_interval: Option, pub polling_interval: Option, } @@ -61,6 +75,10 @@ impl Web3ServerOptionalConfig { if let Some(polling_interval) = self.polling_interval { api_builder = api_builder.with_polling_interval(polling_interval); } + if let Some(pruning_info_refresh_interval) = self.pruning_info_refresh_interval { + api_builder = + api_builder.with_pruning_info_refresh_interval(pruning_info_refresh_interval); + } api_builder = api_builder.with_extended_tracing(self.with_extended_tracing); api_builder } @@ -109,6 +127,7 @@ pub struct Input { pub circuit_breakers: CircuitBreakersResource, #[context(default)] pub app_health: AppHealthCheckResource, + pub main_node_client: Option, } #[derive(Debug, IntoContext)] @@ -118,6 +137,10 @@ pub struct Output { pub web3_api_task: Web3ApiTask, #[context(task)] pub garbage_collector_task: ApiTaskGarbageCollector, + #[context(task)] + pub sealed_l2_block_updater_task: SealedL2BlockUpdaterTask, + #[context(task)] + pub bridge_addresses_updater_task: Option, } impl Web3ServerLayer { @@ -163,20 +186,39 @@ impl WiringLayer for Web3ServerLayer { async fn wire(self, input: Self::Input) -> Result { // Get required resources. let replica_resource_pool = input.replica_pool; - let updaters_pool = replica_resource_pool.get_custom(2).await?; + let updaters_pool = replica_resource_pool.get_custom(1).await?; let replica_pool = replica_resource_pool.get().await?; let TxSenderResource(tx_sender) = input.tx_sender; let MempoolCacheResource(mempool_cache) = input.mempool_cache; let sync_state = input.sync_state.map(|state| state.0); let tree_api_client = input.tree_api_client.map(|client| client.0); + let sealed_l2_block_handle = SealedL2BlockNumber::default(); + let bridge_addresses_handle = + BridgeAddressesHandle::new(self.internal_api_config.bridge_addresses.clone()); + + let sealed_l2_block_updater_task = SealedL2BlockUpdaterTask { + number_updater: sealed_l2_block_handle.clone(), + pool: updaters_pool, + }; + // Bridge addresses updater task must be started for ENs and only for ENs. + let bridge_addresses_updater_task = + input + .main_node_client + .map(|main_node_client| BridgeAddressesUpdaterTask { + bridge_address_updater: bridge_addresses_handle.clone(), + main_node_client: main_node_client.0, + update_interval: self.optional_config.bridge_addresses_refresh_interval, + }); + // Build server. let mut api_builder = ApiBuilder::jsonrpsee_backend(self.internal_api_config, replica_pool.clone()) - .with_updaters_pool(updaters_pool) .with_tx_sender(tx_sender) .with_mempool_cache(mempool_cache) - .with_extended_tracing(self.optional_config.with_extended_tracing); + .with_extended_tracing(self.optional_config.with_extended_tracing) + .with_sealed_l2_block_handle(sealed_l2_block_handle) + .with_bridge_addresses_handle(bridge_addresses_handle); if let Some(client) = tree_api_client { api_builder = api_builder.with_tree_api(client); } @@ -191,14 +233,9 @@ impl WiringLayer for Web3ServerLayer { if let Some(sync_state) = sync_state { api_builder = api_builder.with_sync_state(sync_state); } - if let Some(pruning_info_refresh_interval) = - self.optional_config.pruning_info_refresh_interval - { - api_builder = - api_builder.with_pruning_info_refresh_interval(pruning_info_refresh_interval); - } let replication_lag_limit = self.optional_config.replication_lag_limit; api_builder = self.optional_config.apply(api_builder); + let server = api_builder.build()?; // Insert healthcheck. @@ -230,6 +267,8 @@ impl WiringLayer for Web3ServerLayer { Ok(Output { web3_api_task, garbage_collector_task, + sealed_l2_block_updater_task, + bridge_addresses_updater_task, }) } } diff --git a/core/node/node_framework/src/implementations/layers/web3_api/server/sealed_l2_block.rs b/core/node/node_framework/src/implementations/layers/web3_api/server/sealed_l2_block.rs new file mode 100644 index 000000000000..02552e212cd6 --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/web3_api/server/sealed_l2_block.rs @@ -0,0 +1,50 @@ +use std::time::Duration; + +use zksync_dal::{Core, CoreDal}; +use zksync_db_connection::connection_pool::ConnectionPool; +use zksync_node_api_server::web3::state::SealedL2BlockNumber; + +use crate::{StopReceiver, Task, TaskId}; + +#[derive(Debug)] +pub struct SealedL2BlockUpdaterTask { + pub number_updater: SealedL2BlockNumber, + pub pool: ConnectionPool, +} + +#[async_trait::async_trait] +impl Task for SealedL2BlockUpdaterTask { + fn id(&self) -> TaskId { + "api_sealed_l2_block_updater_task".into() + } + + async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { + // Chosen to be significantly smaller than the interval between L2 blocks, but larger than + // the latency of getting the latest sealed L2 block number from Postgres. If the API server + // processes enough requests, information about the latest sealed L2 block will be updated + // by reporting block difference metrics, so the actual update lag would be much smaller than this value. + const UPDATE_INTERVAL: Duration = Duration::from_millis(25); + + while !*stop_receiver.0.borrow_and_update() { + let mut connection = self.pool.connection_tagged("api").await.unwrap(); + let Some(last_sealed_l2_block) = + connection.blocks_dal().get_sealed_l2_block_number().await? + else { + tokio::time::sleep(UPDATE_INTERVAL).await; + continue; + }; + drop(connection); + + self.number_updater.update(last_sealed_l2_block); + + if tokio::time::timeout(UPDATE_INTERVAL, stop_receiver.0.changed()) + .await + .is_ok() + { + break; + } + } + + Ok(()) + } +} diff --git a/zk_toolbox/crates/zk_inception/src/commands/external_node/prepare_configs.rs b/zk_toolbox/crates/zk_inception/src/commands/external_node/prepare_configs.rs index defbbd12d401..5ab859d17f0a 100644 --- a/zk_toolbox/crates/zk_inception/src/commands/external_node/prepare_configs.rs +++ b/zk_toolbox/crates/zk_inception/src/commands/external_node/prepare_configs.rs @@ -76,6 +76,7 @@ fn prepare_configs( )?, main_node_rate_limit_rps: None, gateway_url: None, + bridge_addresses_refresh_interval_sec: None, }; let mut general_en = general.clone();