Skip to content

Commit

Permalink
feat(consensus): fallback json rpc syncing for consensus (#3211)
Browse files Browse the repository at this point in the history
if for any reason p2p syncing is behind, json RPC syncing will activate.
This is a protective measure for while we deploy changes to the
consensus algorithm.

Fixes BFT-516
  • Loading branch information
pompon0 authored Nov 13, 2024
1 parent 0a88698 commit 726203b
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 160 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/node/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ thiserror.workspace = true
tracing.workspace = true
tokio.workspace = true
semver.workspace = true
vise.workspace = true

[dev-dependencies]
zksync_node_genesis.workspace = true
Expand Down
94 changes: 47 additions & 47 deletions core/node/consensus/src/en.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ use zksync_web3_decl::{

use super::{config, storage::Store, ConsensusConfig, ConsensusSecrets};
use crate::{
metrics::METRICS,
registry,
storage::{self, ConnectionPool},
};

/// If less than TEMPORARY_FETCHER_THRESHOLD certificates are missing,
/// the temporary fetcher will stop fetching blocks.
pub(crate) const TEMPORARY_FETCHER_THRESHOLD: u64 = 10;
/// Whenever more than FALLBACK_FETCHER_THRESHOLD certificates are missing,
/// the fallback fetcher is active.
pub(crate) const FALLBACK_FETCHER_THRESHOLD: u64 = 10;

/// External node.
pub(super) struct EN {
Expand Down Expand Up @@ -115,11 +116,9 @@ impl EN {
let store = store.clone();
async {
let store = store;
self.temporary_block_fetcher(ctx, &store).await?;
tracing::info!(
"temporary block fetcher finished, switching to p2p fetching only"
);
Ok(())
self.fallback_block_fetcher(ctx, &store)
.await
.wrap("fallback_block_fetcher()")
}
});

Expand Down Expand Up @@ -191,7 +190,7 @@ impl EN {
.new_payload_queue(ctx, actions, self.sync_state.clone())
.await
.wrap("new_fetcher_cursor()")?;
self.fetch_blocks(ctx, &mut payload_queue, None).await
self.fetch_blocks(ctx, &mut payload_queue).await
})
.await;
match res {
Expand Down Expand Up @@ -362,9 +361,14 @@ impl EN {
}

/// Fetches (with retries) the given block from the main node.
async fn fetch_block(&self, ctx: &ctx::Ctx, n: L2BlockNumber) -> ctx::Result<FetchedBlock> {
async fn fetch_block(
&self,
ctx: &ctx::Ctx,
n: validator::BlockNumber,
) -> ctx::Result<FetchedBlock> {
const RETRY_INTERVAL: time::Duration = time::Duration::seconds(5);

let n = L2BlockNumber(n.0.try_into().context("overflow")?);
METRICS.fetch_block.inc();
loop {
match ctx.wait(self.client.sync_l2_block(n, true)).await? {
Ok(Some(block)) => return Ok(block.try_into()?),
Expand All @@ -376,76 +380,72 @@ impl EN {
}
}

/// Fetches blocks from the main node directly, until the certificates
/// are backfilled. This allows for smooth transition from json RPC to p2p block syncing.
pub(crate) async fn temporary_block_fetcher(
/// Fetches blocks from the main node directly whenever the EN is lagging behind too much.
pub(crate) async fn fallback_block_fetcher(
&self,
ctx: &ctx::Ctx,
store: &Store,
) -> ctx::Result<()> {
const MAX_CONCURRENT_REQUESTS: usize = 30;
scope::run!(ctx, |ctx, s| async {
let (send, mut recv) = ctx::channel::bounded(MAX_CONCURRENT_REQUESTS);
s.spawn(async {
let Some(mut next) = store.next_block(ctx).await? else {
return Ok(());
};
while store.persisted().borrow().next().0 + TEMPORARY_FETCHER_THRESHOLD < next.0 {
let n = L2BlockNumber(next.0.try_into().context("overflow")?);
self.sync_state.wait_for_main_node_block(ctx, n).await?;
send.send(ctx, s.spawn(self.fetch_block(ctx, n))).await?;
// TODO: metrics.
s.spawn::<()>(async {
let send = send;
let is_lagging =
|main| main >= store.persisted().borrow().next() + FALLBACK_FETCHER_THRESHOLD;
let mut next = store.next_block(ctx).await.wrap("next_block()")?;
loop {
// Wait until p2p syncing is lagging.
self.sync_state
.wait_for_main_node_block(ctx, is_lagging)
.await?;
// Determine the next block to fetch and wait for it to be available.
next = next.max(store.next_block(ctx).await.wrap("next_block()")?);
self.sync_state
.wait_for_main_node_block(ctx, |main| main >= next)
.await?;
// Fetch the block asynchronously.
send.send(ctx, s.spawn(self.fetch_block(ctx, next))).await?;
next = next.next();
}
drop(send);
Ok(())
});
while let Ok(block) = recv.recv_or_disconnected(ctx).await? {
loop {
let block = recv.recv(ctx).await?;
store
.queue_next_fetched_block(ctx, block.join(ctx).await?)
.await
.wrap("queue_next_fetched_block()")?;
}
Ok(())
})
.await
}

/// Fetches blocks from the main node in range `[cursor.next()..end)`.
/// Fetches blocks starting with `queue.next()`.
async fn fetch_blocks(
&self,
ctx: &ctx::Ctx,
queue: &mut storage::PayloadQueue,
end: Option<validator::BlockNumber>,
) -> ctx::Result<()> {
const MAX_CONCURRENT_REQUESTS: usize = 30;
let first = queue.next();
let mut next = first;
let mut next = queue.next();
scope::run!(ctx, |ctx, s| async {
let (send, mut recv) = ctx::channel::bounded(MAX_CONCURRENT_REQUESTS);
s.spawn(async {
s.spawn::<()>(async {
let send = send;
while end.map_or(true, |end| next < end) {
let n = L2BlockNumber(next.0.try_into().context("overflow")?);
self.sync_state.wait_for_main_node_block(ctx, n).await?;
send.send(ctx, s.spawn(self.fetch_block(ctx, n))).await?;
loop {
self.sync_state
.wait_for_main_node_block(ctx, |main| main >= next)
.await?;
send.send(ctx, s.spawn(self.fetch_block(ctx, next))).await?;
next = next.next();
}
Ok(())
});
while end.map_or(true, |end| queue.next() < end) {
loop {
let block = recv.recv(ctx).await?.join(ctx).await?;
queue.send(block).await.context("queue.send()")?;
}
Ok(())
})
.await?;
// If fetched anything, wait for the last block to be stored persistently.
if first < queue.next() {
self.pool
.wait_for_payload(ctx, queue.next().prev().unwrap())
.await
.wrap("wait_for_payload()")?;
}
Ok(())
.await
}
}
1 change: 1 addition & 0 deletions core/node/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod abi;
mod config;
mod en;
pub mod era;
mod metrics;
mod mn;
mod registry;
mod storage;
Expand Down
13 changes: 13 additions & 0 deletions core/node/consensus/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
//! Consensus related metrics.
#[derive(Debug, vise::Metrics)]
#[metrics(prefix = "zksync_node_consensus")]
pub(crate) struct Metrics {
/// Number of blocks that has been fetched via JSON-RPC.
/// It is used only as a fallback when the p2p syncing is disabled or falling behind.
/// so it shouldn't be increasing under normal circumstances if p2p syncing is enabled.
pub fetch_block: vise::Counter,
}

#[vise::register]
pub(super) static METRICS: vise::Global<Metrics> = vise::Global::new();
8 changes: 3 additions & 5 deletions core/node/consensus/src/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,12 @@ impl Store {
}

/// Number of the next block to queue.
pub(crate) async fn next_block(
&self,
ctx: &ctx::Ctx,
) -> ctx::OrCanceled<Option<validator::BlockNumber>> {
pub(crate) async fn next_block(&self, ctx: &ctx::Ctx) -> ctx::Result<validator::BlockNumber> {
Ok(sync::lock(ctx, &self.block_payloads)
.await?
.as_ref()
.map(|p| p.next()))
.context("payload_queue not set")?
.next())
}

/// Queues the next block.
Expand Down
39 changes: 1 addition & 38 deletions core/node/consensus/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,7 @@ use zksync_types::{
};
use zksync_web3_decl::client::{Client, DynClient, L2};

use crate::{
en,
storage::{ConnectionPool, Store},
};
use crate::{en, storage::ConnectionPool};

/// Fake StateKeeper for tests.
#[derive(Debug)]
Expand Down Expand Up @@ -413,40 +410,6 @@ impl StateKeeper {
.await
}

pub async fn run_temporary_fetcher(
self,
ctx: &ctx::Ctx,
client: Box<DynClient<L2>>,
) -> ctx::Result<()> {
scope::run!(ctx, |ctx, s| async {
let payload_queue = self
.pool
.connection(ctx)
.await
.wrap("connection()")?
.new_payload_queue(ctx, self.actions_sender, self.sync_state.clone())
.await
.wrap("new_payload_queue()")?;
let (store, runner) = Store::new(
ctx,
self.pool.clone(),
Some(payload_queue),
Some(client.clone()),
)
.await
.wrap("Store::new()")?;
s.spawn_bg(async { Ok(runner.run(ctx).await?) });
en::EN {
pool: self.pool.clone(),
client,
sync_state: self.sync_state.clone(),
}
.temporary_block_fetcher(ctx, &store)
.await
})
.await
}

/// Runs consensus node for the external node.
pub async fn run_consensus(
self,
Expand Down
65 changes: 4 additions & 61 deletions core/node/consensus/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use zksync_types::ProtocolVersionId;
use zksync_web3_decl::namespaces::EnNamespaceClient as _;

use crate::{
en::TEMPORARY_FETCHER_THRESHOLD,
en::FALLBACK_FETCHER_THRESHOLD,
mn::run_main_node,
storage::{ConnectionPool, Store},
testonly,
Expand Down Expand Up @@ -665,7 +665,7 @@ async fn test_p2p_fetcher_backfill_certs(from_snapshot: bool, version: ProtocolV
// Test temporary fetcher fetching blocks if a lot of certs are missing.
#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))]
#[tokio::test]
async fn test_temporary_fetcher(from_snapshot: bool, version: ProtocolVersionId) {
async fn test_fallback_fetcher(from_snapshot: bool, version: ProtocolVersionId) {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
Expand Down Expand Up @@ -705,7 +705,7 @@ async fn test_temporary_fetcher(from_snapshot: bool, version: ProtocolVersionId)
s.spawn_bg(runner.run(ctx));
s.spawn_bg(node.run_fetcher(ctx, client.clone()));
validator
.push_random_blocks(rng, account, TEMPORARY_FETCHER_THRESHOLD as usize + 1)
.push_random_blocks(rng, account, FALLBACK_FETCHER_THRESHOLD as usize + 1)
.await;
node_pool
.wait_for_payload(ctx, validator.last_block())
Expand All @@ -715,58 +715,7 @@ async fn test_temporary_fetcher(from_snapshot: bool, version: ProtocolVersionId)
.await
.unwrap();

tracing::info!(
"Run p2p fetcher. Blocks should be fetched by the temporary fetcher anyway."
);
scope::run!(ctx, |ctx, s| async {
let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(node.run_consensus(ctx, client.clone(), node_cfg.clone()));
validator.push_random_blocks(rng, account, 5).await;
node_pool
.wait_for_payload(ctx, validator.last_block())
.await?;
Ok(())
})
.await
.unwrap();
Ok(())
})
.await
.unwrap();
}

// Test that temporary fetcher terminates once enough blocks have certs.
#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))]
#[tokio::test]
async fn test_temporary_fetcher_termination(from_snapshot: bool, version: ProtocolVersionId) {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
let setup = Setup::new(rng, 1);
let validator_cfg = testonly::new_configs(rng, &setup, 0)[0].clone();
let node_cfg = validator_cfg.new_fullnode(rng);
let account = &mut Account::random();

scope::run!(ctx, |ctx, s| async {
tracing::info!("Spawn validator.");
let validator_pool = ConnectionPool::test(from_snapshot, version).await;
let (mut validator, runner) =
testonly::StateKeeper::new(ctx, validator_pool.clone()).await?;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(run_main_node(
ctx,
validator_cfg.config.clone(),
validator_cfg.secrets.clone(),
validator_pool.clone(),
));
// API server needs at least 1 L1 batch to start.
validator.seal_batch().await;
let client = validator.connect(ctx).await?;

let node_pool = ConnectionPool::test(from_snapshot, version).await;

// Run the EN so the consensus is initialized on EN and wait for it to sync.
tracing::info!("Run p2p fetcher. Blocks should be fetched by the fallback fetcher anyway.");
scope::run!(ctx, |ctx, s| async {
let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?;
s.spawn_bg(runner.run(ctx));
Expand All @@ -779,12 +728,6 @@ async fn test_temporary_fetcher_termination(from_snapshot: bool, version: Protoc
})
.await
.unwrap();

// Run the temporary fetcher. It should terminate immediately, since EN is synced.
let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?;
s.spawn_bg(runner.run(ctx));
node.run_temporary_fetcher(ctx, client).await?;

Ok(())
})
.await
Expand Down
1 change: 1 addition & 0 deletions core/node/node_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ zksync_health_check.workspace = true
zksync_utils.workspace = true
zksync_eth_client.workspace = true
zksync_concurrency.workspace = true
zksync_consensus_roles.workspace = true
vise.workspace = true
zksync_vm_executor.workspace = true

Expand Down
Loading

0 comments on commit 726203b

Please sign in to comment.