Skip to content

Commit

Permalink
feat(en): periodically fetch bridge addresses (#2949)
Browse files Browse the repository at this point in the history
## What ❔

Periodically fetch bridge addresses

## Why ❔

Addresses will be changed during gateway upgrade

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
perekopskiy authored Oct 7, 2024
1 parent 2a7e72b commit e984bfb
Show file tree
Hide file tree
Showing 16 changed files with 254 additions and 107 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ prover/data/keys/setup_*

# Zk Toolbox
chains/era/configs/*
chains/gateway/*
configs/*
era-observability/
core/tests/ts-integration/deployments-zk
Expand Down
8 changes: 8 additions & 0 deletions core/bin/external_node/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,8 @@ pub(crate) struct OptionalENConfig {
/// Gateway RPC URL, needed for operating during migration.
#[allow(dead_code)]
pub gateway_url: Option<SensitiveUrl>,
/// Interval for bridge addresses refreshing in seconds.
bridge_addresses_refresh_interval_sec: Option<NonZeroU64>,
}

impl OptionalENConfig {
Expand Down Expand Up @@ -675,6 +677,7 @@ impl OptionalENConfig {
api_namespaces,
contracts_diamond_proxy_addr: None,
gateway_url: enconfig.gateway_url.clone(),
bridge_addresses_refresh_interval_sec: enconfig.bridge_addresses_refresh_interval_sec,
})
}

Expand Down Expand Up @@ -901,6 +904,11 @@ impl OptionalENConfig {
Duration::from_secs(self.pruning_data_retention_sec)
}

pub fn bridge_addresses_refresh_interval(&self) -> Option<Duration> {
self.bridge_addresses_refresh_interval_sec
.map(|n| Duration::from_secs(n.get()))
}

#[cfg(test)]
fn mock() -> Self {
// Set all values to their defaults
Expand Down
4 changes: 4 additions & 0 deletions core/bin/external_node/src/node_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,10 @@ impl ExternalNodeBuilder {
response_body_size_limit: Some(self.config.optional.max_response_body_size()),
with_extended_tracing: self.config.optional.extended_rpc_tracing,
pruning_info_refresh_interval: Some(pruning_info_refresh_interval),
bridge_addresses_refresh_interval: self
.config
.optional
.bridge_addresses_refresh_interval(),
polling_interval: Some(self.config.optional.polling_interval()),
websocket_requests_per_minute_limit: None, // To be set by WS server layer method if required.
replication_lag_limit: None, // TODO: Support replication lag limit
Expand Down
3 changes: 2 additions & 1 deletion core/lib/config/src/configs/en_config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::num::NonZeroUsize;
use std::num::{NonZeroU64, NonZeroUsize};

use serde::Deserialize;
use zksync_basic_types::{
Expand All @@ -19,4 +19,5 @@ pub struct ENConfig {
pub main_node_rate_limit_rps: Option<NonZeroUsize>,

pub gateway_url: Option<SensitiveUrl>,
pub bridge_addresses_refresh_interval_sec: Option<NonZeroU64>,
}
1 change: 1 addition & 0 deletions core/lib/config/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,7 @@ impl Distribution<configs::en_config::ENConfig> for EncodeDist {
main_node_rate_limit_rps: self.sample_opt(|| rng.gen()),
gateway_url: self
.sample_opt(|| format!("localhost:{}", rng.gen::<u16>()).parse().unwrap()),
bridge_addresses_refresh_interval_sec: self.sample_opt(|| rng.gen()),
}
}
}
Expand Down
11 changes: 10 additions & 1 deletion core/lib/protobuf_config/src/en.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{num::NonZeroUsize, str::FromStr};
use std::{
num::{NonZeroU64, NonZeroUsize},
str::FromStr,
};

use anyhow::Context;
use zksync_basic_types::{url::SensitiveUrl, L1ChainId, L2ChainId};
Expand Down Expand Up @@ -36,6 +39,9 @@ impl ProtoRepr for proto::ExternalNode {
.as_ref()
.map(|a| a.parse().context("gateway_url"))
.transpose()?,
bridge_addresses_refresh_interval_sec: self
.bridge_addresses_refresh_interval_sec
.and_then(NonZeroU64::new),
})
}

Expand All @@ -55,6 +61,9 @@ impl ProtoRepr for proto::ExternalNode {
.gateway_url
.as_ref()
.map(|a| a.expose_str().to_string()),
bridge_addresses_refresh_interval_sec: this
.bridge_addresses_refresh_interval_sec
.map(|a| a.get()),
}
}
}
1 change: 1 addition & 0 deletions core/lib/protobuf_config/src/proto/config/en.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ message ExternalNode {
optional uint64 main_node_rate_limit_rps = 6; // optional
optional config.genesis.L1BatchCommitDataGeneratorMode l1_batch_commit_data_generator_mode = 7; // optional, default to rollup
optional string gateway_url = 8; // optional
optional uint64 bridge_addresses_refresh_interval_sec = 9; // optional
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl ZksNamespaceServer for ZksNamespace {
}

async fn get_bridge_contracts(&self) -> RpcResult<BridgeAddresses> {
Ok(self.get_bridge_contracts_impl())
Ok(self.get_bridge_contracts_impl().await)
}

async fn l1_chain_id(&self) -> RpcResult<U64> {
Expand Down
80 changes: 38 additions & 42 deletions core/node/api_server/src/web3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use self::{
use crate::{
execution_sandbox::{BlockStartInfo, VmConcurrencyBarrier},
tx_sender::TxSender,
web3::state::BridgeAddressesHandle,
};

pub mod backend_jsonrpsee;
Expand Down Expand Up @@ -143,7 +144,6 @@ struct OptionalApiParams {
#[derive(Debug)]
pub struct ApiServer {
pool: ConnectionPool<Core>,
updaters_pool: ConnectionPool<Core>,
health_updater: Arc<HealthUpdater>,
config: InternalApiConfig,
transport: ApiTransport,
Expand All @@ -153,18 +153,21 @@ pub struct ApiServer {
namespaces: Vec<Namespace>,
method_tracer: Arc<MethodTracer>,
optional: OptionalApiParams,
bridge_addresses_handle: BridgeAddressesHandle,
sealed_l2_block_handle: SealedL2BlockNumber,
}

#[derive(Debug)]
pub struct ApiBuilder {
pool: ConnectionPool<Core>,
updaters_pool: ConnectionPool<Core>,
config: InternalApiConfig,
polling_interval: Duration,
pruning_info_refresh_interval: Duration,
// Mandatory params that must be set using builder methods.
transport: Option<ApiTransport>,
tx_sender: Option<TxSender>,
bridge_addresses_handle: Option<BridgeAddressesHandle>,
sealed_l2_block_handle: Option<SealedL2BlockNumber>,
// Optional params that may or may not be set using builder methods. We treat `namespaces`
// specially because we want to output a warning if they are not set.
namespaces: Option<Vec<Namespace>>,
Expand All @@ -178,13 +181,14 @@ impl ApiBuilder {

pub fn jsonrpsee_backend(config: InternalApiConfig, pool: ConnectionPool<Core>) -> Self {
Self {
updaters_pool: pool.clone(),
pool,
config,
polling_interval: Self::DEFAULT_POLLING_INTERVAL,
pruning_info_refresh_interval: Self::DEFAULT_PRUNING_INFO_REFRESH_INTERVAL,
transport: None,
tx_sender: None,
bridge_addresses_handle: None,
sealed_l2_block_handle: None,
namespaces: None,
method_tracer: Arc::new(MethodTracer::default()),
optional: OptionalApiParams::default(),
Expand All @@ -201,15 +205,6 @@ impl ApiBuilder {
self
}

/// Configures a dedicated DB pool to be used for updating different information,
/// such as last mined block number or account nonces. This pool is used to execute
/// in a background task. If not called, the main pool will be used. If the API server is under high load,
/// it may make sense to supply a single-connection pool to reduce pool contention with the API methods.
pub fn with_updaters_pool(mut self, pool: ConnectionPool<Core>) -> Self {
self.updaters_pool = pool;
self
}

pub fn with_tx_sender(mut self, tx_sender: TxSender) -> Self {
self.tx_sender = Some(tx_sender);
self
Expand Down Expand Up @@ -285,6 +280,22 @@ impl ApiBuilder {
self
}

pub fn with_sealed_l2_block_handle(
mut self,
sealed_l2_block_handle: SealedL2BlockNumber,
) -> Self {
self.sealed_l2_block_handle = Some(sealed_l2_block_handle);
self
}

pub fn with_bridge_addresses_handle(
mut self,
bridge_addresses_handle: BridgeAddressesHandle,
) -> Self {
self.bridge_addresses_handle = Some(bridge_addresses_handle);
self
}

// Intended for tests only.
#[doc(hidden)]
fn with_pub_sub_events(mut self, sender: mpsc::UnboundedSender<PubSubEvent>) -> Self {
Expand Down Expand Up @@ -312,7 +323,6 @@ impl ApiBuilder {
Ok(ApiServer {
pool: self.pool,
health_updater: Arc::new(health_updater),
updaters_pool: self.updaters_pool,
config: self.config,
transport,
tx_sender: self.tx_sender.context("Transaction sender not set")?,
Expand All @@ -326,6 +336,12 @@ impl ApiBuilder {
}),
method_tracer: self.method_tracer,
optional: self.optional,
sealed_l2_block_handle: self
.sealed_l2_block_handle
.context("Sealed l2 block handle not set")?,
bridge_addresses_handle: self
.bridge_addresses_handle
.context("Bridge addresses handle not set")?,
})
}
}
Expand All @@ -335,11 +351,8 @@ impl ApiServer {
self.health_updater.subscribe()
}

async fn build_rpc_state(
self,
last_sealed_l2_block: SealedL2BlockNumber,
) -> anyhow::Result<RpcState> {
let mut storage = self.updaters_pool.connection_tagged("api").await?;
async fn build_rpc_state(self) -> anyhow::Result<RpcState> {
let mut storage = self.pool.connection_tagged("api").await?;
let start_info =
BlockStartInfo::new(&mut storage, self.pruning_info_refresh_interval).await?;
drop(storage);
Expand All @@ -363,19 +376,19 @@ impl ApiServer {
api_config: self.config,
start_info,
mempool_cache: self.optional.mempool_cache,
last_sealed_l2_block,
last_sealed_l2_block: self.sealed_l2_block_handle,
bridge_addresses_handle: self.bridge_addresses_handle,
tree_api: self.optional.tree_api,
})
}

async fn build_rpc_module(
self,
pub_sub: Option<EthSubscribe>,
last_sealed_l2_block: SealedL2BlockNumber,
) -> anyhow::Result<RpcModule<()>> {
let namespaces = self.namespaces.clone();
let zksync_network_id = self.config.l2_chain_id;
let rpc_state = self.build_rpc_state(last_sealed_l2_block).await?;
let rpc_state = self.build_rpc_state().await?;

// Collect all the methods into a single RPC module.
let mut rpc = RpcModule::new(());
Expand Down Expand Up @@ -473,21 +486,9 @@ impl ApiServer {
self,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<ApiServerHandles> {
// Chosen to be significantly smaller than the interval between L2 blocks, but larger than
// the latency of getting the latest sealed L2 block number from Postgres. If the API server
// processes enough requests, information about the latest sealed L2 block will be updated
// by reporting block difference metrics, so the actual update lag would be much smaller than this value.
const SEALED_L2_BLOCK_UPDATE_INTERVAL: Duration = Duration::from_millis(25);

let transport = self.transport;
let mut tasks = vec![];

let (last_sealed_l2_block, sealed_l2_block_update_task) = SealedL2BlockNumber::new(
self.updaters_pool.clone(),
SEALED_L2_BLOCK_UPDATE_INTERVAL,
stop_receiver.clone(),
);

let mut tasks = vec![tokio::spawn(sealed_l2_block_update_task)];
let pub_sub = if matches!(transport, ApiTransport::WebSocket(_))
&& self.namespaces.contains(&Namespace::Pubsub)
{
Expand All @@ -510,12 +511,8 @@ impl ApiServer {
// framework it'll no longer be needed.
let health_check = self.health_updater.subscribe();
let (local_addr_sender, local_addr) = oneshot::channel();
let server_task = tokio::spawn(self.run_jsonrpsee_server(
stop_receiver,
pub_sub,
last_sealed_l2_block,
local_addr_sender,
));
let server_task =
tokio::spawn(self.run_jsonrpsee_server(stop_receiver, pub_sub, local_addr_sender));

tasks.push(server_task);
Ok(ApiServerHandles {
Expand Down Expand Up @@ -584,7 +581,6 @@ impl ApiServer {
self,
mut stop_receiver: watch::Receiver<bool>,
pub_sub: Option<EthSubscribe>,
last_sealed_l2_block: SealedL2BlockNumber,
local_addr_sender: oneshot::Sender<SocketAddr>,
) -> anyhow::Result<()> {
let transport = self.transport;
Expand Down Expand Up @@ -640,7 +636,7 @@ impl ApiServer {
tracing::info!("Enabled extended call tracing for {transport_str} API server; this might negatively affect performance");
}

let rpc = self.build_rpc_module(pub_sub, last_sealed_l2_block).await?;
let rpc = self.build_rpc_module(pub_sub).await?;
let registered_method_names = Arc::new(rpc.method_names().collect::<HashSet<_>>());
tracing::debug!(
"Built RPC module for {transport_str} server with {} methods: {registered_method_names:?}",
Expand Down
4 changes: 2 additions & 2 deletions core/node/api_server/src/web3/namespaces/zks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ impl ZksNamespace {
self.state.api_config.l2_testnet_paymaster_addr
}

pub fn get_bridge_contracts_impl(&self) -> BridgeAddresses {
self.state.api_config.bridge_addresses.clone()
pub async fn get_bridge_contracts_impl(&self) -> BridgeAddresses {
self.state.bridge_addresses_handle.read().await
}

pub fn l1_chain_id_impl(&self) -> U64 {
Expand Down
Loading

0 comments on commit e984bfb

Please sign in to comment.