From 726203bab540e3d6ada10b6bc12bd3c09220d895 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Wed, 13 Nov 2024 12:40:03 +0100 Subject: [PATCH] feat(consensus): fallback json rpc syncing for consensus (#3211) if for any reason p2p syncing is behind, json RPC syncing will activate. This is a protective measure for while we deploy changes to the consensus algorithm. Fixes BFT-516 --- Cargo.lock | 2 + core/node/consensus/Cargo.toml | 1 + core/node/consensus/src/en.rs | 94 ++++++++++++------------ core/node/consensus/src/lib.rs | 1 + core/node/consensus/src/metrics.rs | 13 ++++ core/node/consensus/src/storage/store.rs | 8 +- core/node/consensus/src/testonly.rs | 39 +--------- core/node/consensus/src/tests/mod.rs | 65 +--------------- core/node/node_sync/Cargo.toml | 1 + core/node/node_sync/src/sync_state.rs | 21 +++--- 10 files changed, 85 insertions(+), 160 deletions(-) create mode 100644 core/node/consensus/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index eb93300b1729..75591bca7293 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11499,6 +11499,7 @@ dependencies = [ "thiserror", "tokio", "tracing", + "vise", "zksync_concurrency", "zksync_config", "zksync_consensus_bft", @@ -11699,6 +11700,7 @@ dependencies = [ "vise", "zksync_concurrency", "zksync_config", + "zksync_consensus_roles", "zksync_contracts", "zksync_dal", "zksync_eth_client", diff --git a/core/node/consensus/Cargo.toml b/core/node/consensus/Cargo.toml index 120d355da9a8..1d6b80f475e7 100644 --- a/core/node/consensus/Cargo.toml +++ b/core/node/consensus/Cargo.toml @@ -42,6 +42,7 @@ thiserror.workspace = true tracing.workspace = true tokio.workspace = true semver.workspace = true +vise.workspace = true [dev-dependencies] zksync_node_genesis.workspace = true diff --git a/core/node/consensus/src/en.rs b/core/node/consensus/src/en.rs index 6f4d80233be4..e417b68cf2cb 100644 --- a/core/node/consensus/src/en.rs +++ b/core/node/consensus/src/en.rs @@ -17,13 +17,14 @@ use zksync_web3_decl::{ use super::{config, storage::Store, ConsensusConfig, ConsensusSecrets}; use crate::{ + metrics::METRICS, registry, storage::{self, ConnectionPool}, }; -/// If less than TEMPORARY_FETCHER_THRESHOLD certificates are missing, -/// the temporary fetcher will stop fetching blocks. -pub(crate) const TEMPORARY_FETCHER_THRESHOLD: u64 = 10; +/// Whenever more than FALLBACK_FETCHER_THRESHOLD certificates are missing, +/// the fallback fetcher is active. +pub(crate) const FALLBACK_FETCHER_THRESHOLD: u64 = 10; /// External node. pub(super) struct EN { @@ -115,11 +116,9 @@ impl EN { let store = store.clone(); async { let store = store; - self.temporary_block_fetcher(ctx, &store).await?; - tracing::info!( - "temporary block fetcher finished, switching to p2p fetching only" - ); - Ok(()) + self.fallback_block_fetcher(ctx, &store) + .await + .wrap("fallback_block_fetcher()") } }); @@ -191,7 +190,7 @@ impl EN { .new_payload_queue(ctx, actions, self.sync_state.clone()) .await .wrap("new_fetcher_cursor()")?; - self.fetch_blocks(ctx, &mut payload_queue, None).await + self.fetch_blocks(ctx, &mut payload_queue).await }) .await; match res { @@ -362,9 +361,14 @@ impl EN { } /// Fetches (with retries) the given block from the main node. - async fn fetch_block(&self, ctx: &ctx::Ctx, n: L2BlockNumber) -> ctx::Result { + async fn fetch_block( + &self, + ctx: &ctx::Ctx, + n: validator::BlockNumber, + ) -> ctx::Result { const RETRY_INTERVAL: time::Duration = time::Duration::seconds(5); - + let n = L2BlockNumber(n.0.try_into().context("overflow")?); + METRICS.fetch_block.inc(); loop { match ctx.wait(self.client.sync_l2_block(n, true)).await? { Ok(Some(block)) => return Ok(block.try_into()?), @@ -376,9 +380,8 @@ impl EN { } } - /// Fetches blocks from the main node directly, until the certificates - /// are backfilled. This allows for smooth transition from json RPC to p2p block syncing. - pub(crate) async fn temporary_block_fetcher( + /// Fetches blocks from the main node directly whenever the EN is lagging behind too much. + pub(crate) async fn fallback_block_fetcher( &self, ctx: &ctx::Ctx, store: &Store, @@ -386,66 +389,63 @@ impl EN { const MAX_CONCURRENT_REQUESTS: usize = 30; scope::run!(ctx, |ctx, s| async { let (send, mut recv) = ctx::channel::bounded(MAX_CONCURRENT_REQUESTS); - s.spawn(async { - let Some(mut next) = store.next_block(ctx).await? else { - return Ok(()); - }; - while store.persisted().borrow().next().0 + TEMPORARY_FETCHER_THRESHOLD < next.0 { - let n = L2BlockNumber(next.0.try_into().context("overflow")?); - self.sync_state.wait_for_main_node_block(ctx, n).await?; - send.send(ctx, s.spawn(self.fetch_block(ctx, n))).await?; + // TODO: metrics. + s.spawn::<()>(async { + let send = send; + let is_lagging = + |main| main >= store.persisted().borrow().next() + FALLBACK_FETCHER_THRESHOLD; + let mut next = store.next_block(ctx).await.wrap("next_block()")?; + loop { + // Wait until p2p syncing is lagging. + self.sync_state + .wait_for_main_node_block(ctx, is_lagging) + .await?; + // Determine the next block to fetch and wait for it to be available. + next = next.max(store.next_block(ctx).await.wrap("next_block()")?); + self.sync_state + .wait_for_main_node_block(ctx, |main| main >= next) + .await?; + // Fetch the block asynchronously. + send.send(ctx, s.spawn(self.fetch_block(ctx, next))).await?; next = next.next(); } - drop(send); - Ok(()) }); - while let Ok(block) = recv.recv_or_disconnected(ctx).await? { + loop { + let block = recv.recv(ctx).await?; store .queue_next_fetched_block(ctx, block.join(ctx).await?) .await .wrap("queue_next_fetched_block()")?; } - Ok(()) }) .await } - /// Fetches blocks from the main node in range `[cursor.next()..end)`. + /// Fetches blocks starting with `queue.next()`. async fn fetch_blocks( &self, ctx: &ctx::Ctx, queue: &mut storage::PayloadQueue, - end: Option, ) -> ctx::Result<()> { const MAX_CONCURRENT_REQUESTS: usize = 30; - let first = queue.next(); - let mut next = first; + let mut next = queue.next(); scope::run!(ctx, |ctx, s| async { let (send, mut recv) = ctx::channel::bounded(MAX_CONCURRENT_REQUESTS); - s.spawn(async { + s.spawn::<()>(async { let send = send; - while end.map_or(true, |end| next < end) { - let n = L2BlockNumber(next.0.try_into().context("overflow")?); - self.sync_state.wait_for_main_node_block(ctx, n).await?; - send.send(ctx, s.spawn(self.fetch_block(ctx, n))).await?; + loop { + self.sync_state + .wait_for_main_node_block(ctx, |main| main >= next) + .await?; + send.send(ctx, s.spawn(self.fetch_block(ctx, next))).await?; next = next.next(); } - Ok(()) }); - while end.map_or(true, |end| queue.next() < end) { + loop { let block = recv.recv(ctx).await?.join(ctx).await?; queue.send(block).await.context("queue.send()")?; } - Ok(()) }) - .await?; - // If fetched anything, wait for the last block to be stored persistently. - if first < queue.next() { - self.pool - .wait_for_payload(ctx, queue.next().prev().unwrap()) - .await - .wrap("wait_for_payload()")?; - } - Ok(()) + .await } } diff --git a/core/node/consensus/src/lib.rs b/core/node/consensus/src/lib.rs index 8bf078120aa9..d89aa5f5e829 100644 --- a/core/node/consensus/src/lib.rs +++ b/core/node/consensus/src/lib.rs @@ -9,6 +9,7 @@ mod abi; mod config; mod en; pub mod era; +mod metrics; mod mn; mod registry; mod storage; diff --git a/core/node/consensus/src/metrics.rs b/core/node/consensus/src/metrics.rs new file mode 100644 index 000000000000..f53bb9320917 --- /dev/null +++ b/core/node/consensus/src/metrics.rs @@ -0,0 +1,13 @@ +//! Consensus related metrics. + +#[derive(Debug, vise::Metrics)] +#[metrics(prefix = "zksync_node_consensus")] +pub(crate) struct Metrics { + /// Number of blocks that has been fetched via JSON-RPC. + /// It is used only as a fallback when the p2p syncing is disabled or falling behind. + /// so it shouldn't be increasing under normal circumstances if p2p syncing is enabled. + pub fetch_block: vise::Counter, +} + +#[vise::register] +pub(super) static METRICS: vise::Global = vise::Global::new(); diff --git a/core/node/consensus/src/storage/store.rs b/core/node/consensus/src/storage/store.rs index 154509e97b14..c42e78658dc2 100644 --- a/core/node/consensus/src/storage/store.rs +++ b/core/node/consensus/src/storage/store.rs @@ -114,14 +114,12 @@ impl Store { } /// Number of the next block to queue. - pub(crate) async fn next_block( - &self, - ctx: &ctx::Ctx, - ) -> ctx::OrCanceled> { + pub(crate) async fn next_block(&self, ctx: &ctx::Ctx) -> ctx::Result { Ok(sync::lock(ctx, &self.block_payloads) .await? .as_ref() - .map(|p| p.next())) + .context("payload_queue not set")? + .next()) } /// Queues the next block. diff --git a/core/node/consensus/src/testonly.rs b/core/node/consensus/src/testonly.rs index ef4226c915f0..faa895c86c71 100644 --- a/core/node/consensus/src/testonly.rs +++ b/core/node/consensus/src/testonly.rs @@ -45,10 +45,7 @@ use zksync_types::{ }; use zksync_web3_decl::client::{Client, DynClient, L2}; -use crate::{ - en, - storage::{ConnectionPool, Store}, -}; +use crate::{en, storage::ConnectionPool}; /// Fake StateKeeper for tests. #[derive(Debug)] @@ -413,40 +410,6 @@ impl StateKeeper { .await } - pub async fn run_temporary_fetcher( - self, - ctx: &ctx::Ctx, - client: Box>, - ) -> ctx::Result<()> { - scope::run!(ctx, |ctx, s| async { - let payload_queue = self - .pool - .connection(ctx) - .await - .wrap("connection()")? - .new_payload_queue(ctx, self.actions_sender, self.sync_state.clone()) - .await - .wrap("new_payload_queue()")?; - let (store, runner) = Store::new( - ctx, - self.pool.clone(), - Some(payload_queue), - Some(client.clone()), - ) - .await - .wrap("Store::new()")?; - s.spawn_bg(async { Ok(runner.run(ctx).await?) }); - en::EN { - pool: self.pool.clone(), - client, - sync_state: self.sync_state.clone(), - } - .temporary_block_fetcher(ctx, &store) - .await - }) - .await - } - /// Runs consensus node for the external node. pub async fn run_consensus( self, diff --git a/core/node/consensus/src/tests/mod.rs b/core/node/consensus/src/tests/mod.rs index 663ccab49904..efb8d14314c8 100644 --- a/core/node/consensus/src/tests/mod.rs +++ b/core/node/consensus/src/tests/mod.rs @@ -16,7 +16,7 @@ use zksync_types::ProtocolVersionId; use zksync_web3_decl::namespaces::EnNamespaceClient as _; use crate::{ - en::TEMPORARY_FETCHER_THRESHOLD, + en::FALLBACK_FETCHER_THRESHOLD, mn::run_main_node, storage::{ConnectionPool, Store}, testonly, @@ -665,7 +665,7 @@ async fn test_p2p_fetcher_backfill_certs(from_snapshot: bool, version: ProtocolV // Test temporary fetcher fetching blocks if a lot of certs are missing. #[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))] #[tokio::test] -async fn test_temporary_fetcher(from_snapshot: bool, version: ProtocolVersionId) { +async fn test_fallback_fetcher(from_snapshot: bool, version: ProtocolVersionId) { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::AffineClock::new(10.)); let rng = &mut ctx.rng(); @@ -705,7 +705,7 @@ async fn test_temporary_fetcher(from_snapshot: bool, version: ProtocolVersionId) s.spawn_bg(runner.run(ctx)); s.spawn_bg(node.run_fetcher(ctx, client.clone())); validator - .push_random_blocks(rng, account, TEMPORARY_FETCHER_THRESHOLD as usize + 1) + .push_random_blocks(rng, account, FALLBACK_FETCHER_THRESHOLD as usize + 1) .await; node_pool .wait_for_payload(ctx, validator.last_block()) @@ -715,58 +715,7 @@ async fn test_temporary_fetcher(from_snapshot: bool, version: ProtocolVersionId) .await .unwrap(); - tracing::info!( - "Run p2p fetcher. Blocks should be fetched by the temporary fetcher anyway." - ); - scope::run!(ctx, |ctx, s| async { - let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?; - s.spawn_bg(runner.run(ctx)); - s.spawn_bg(node.run_consensus(ctx, client.clone(), node_cfg.clone())); - validator.push_random_blocks(rng, account, 5).await; - node_pool - .wait_for_payload(ctx, validator.last_block()) - .await?; - Ok(()) - }) - .await - .unwrap(); - Ok(()) - }) - .await - .unwrap(); -} - -// Test that temporary fetcher terminates once enough blocks have certs. -#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))] -#[tokio::test] -async fn test_temporary_fetcher_termination(from_snapshot: bool, version: ProtocolVersionId) { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::AffineClock::new(10.)); - let rng = &mut ctx.rng(); - let setup = Setup::new(rng, 1); - let validator_cfg = testonly::new_configs(rng, &setup, 0)[0].clone(); - let node_cfg = validator_cfg.new_fullnode(rng); - let account = &mut Account::random(); - - scope::run!(ctx, |ctx, s| async { - tracing::info!("Spawn validator."); - let validator_pool = ConnectionPool::test(from_snapshot, version).await; - let (mut validator, runner) = - testonly::StateKeeper::new(ctx, validator_pool.clone()).await?; - s.spawn_bg(runner.run(ctx)); - s.spawn_bg(run_main_node( - ctx, - validator_cfg.config.clone(), - validator_cfg.secrets.clone(), - validator_pool.clone(), - )); - // API server needs at least 1 L1 batch to start. - validator.seal_batch().await; - let client = validator.connect(ctx).await?; - - let node_pool = ConnectionPool::test(from_snapshot, version).await; - - // Run the EN so the consensus is initialized on EN and wait for it to sync. + tracing::info!("Run p2p fetcher. Blocks should be fetched by the fallback fetcher anyway."); scope::run!(ctx, |ctx, s| async { let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?; s.spawn_bg(runner.run(ctx)); @@ -779,12 +728,6 @@ async fn test_temporary_fetcher_termination(from_snapshot: bool, version: Protoc }) .await .unwrap(); - - // Run the temporary fetcher. It should terminate immediately, since EN is synced. - let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?; - s.spawn_bg(runner.run(ctx)); - node.run_temporary_fetcher(ctx, client).await?; - Ok(()) }) .await diff --git a/core/node/node_sync/Cargo.toml b/core/node/node_sync/Cargo.toml index 9c5b0c000700..e42cbff85806 100644 --- a/core/node/node_sync/Cargo.toml +++ b/core/node/node_sync/Cargo.toml @@ -24,6 +24,7 @@ zksync_health_check.workspace = true zksync_utils.workspace = true zksync_eth_client.workspace = true zksync_concurrency.workspace = true +zksync_consensus_roles.workspace = true vise.workspace = true zksync_vm_executor.workspace = true diff --git a/core/node/node_sync/src/sync_state.rs b/core/node/node_sync/src/sync_state.rs index f8a2fe00ec09..1ffec757c9b1 100644 --- a/core/node/node_sync/src/sync_state.rs +++ b/core/node/node_sync/src/sync_state.rs @@ -4,6 +4,7 @@ use async_trait::async_trait; use serde::Serialize; use tokio::sync::watch; use zksync_concurrency::{ctx, sync}; +use zksync_consensus_roles::validator; use zksync_dal::{ConnectionPool, Core, CoreDal}; use zksync_health_check::{CheckHealth, Health, HealthStatus}; use zksync_shared_metrics::EN_METRICS; @@ -50,18 +51,20 @@ impl SyncState { .unwrap(); } + /// Waits until the main node block is greater or equal to the given block number. + /// Returns the current main node block number. pub async fn wait_for_main_node_block( &self, ctx: &ctx::Ctx, - want: L2BlockNumber, - ) -> ctx::OrCanceled<()> { - sync::wait_for( - ctx, - &mut self.0.subscribe(), - |inner| matches!(inner.main_node_block, Some(got) if got >= want), - ) - .await?; - Ok(()) + pred: impl Fn(validator::BlockNumber) -> bool, + ) -> ctx::OrCanceled { + sync::wait_for_some(ctx, &mut self.0.subscribe(), |inner| { + inner + .main_node_block + .map(|n| validator::BlockNumber(n.0.into())) + .filter(|n| pred(*n)) + }) + .await } pub fn set_main_node_block(&self, block: L2BlockNumber) {