Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(consensus): smooth transition to p2p syncing (BFT-515) #3075

Merged
merged 4 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 55 additions & 3 deletions core/node/consensus/src/en.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use anyhow::Context as _;
use zksync_concurrency::{ctx, error::Wrap as _, scope, time};
use zksync_consensus_executor::{self as executor, attestation};
use zksync_consensus_roles::{attester, validator};
use zksync_consensus_storage::BlockStore;
use zksync_consensus_storage::{BlockStore, PersistentBlockStore as _};
use zksync_dal::consensus_dal;
use zksync_node_sync::{fetcher::FetchedBlock, sync_action::ActionQueueSender, SyncState};
use zksync_types::L2BlockNumber;
Expand All @@ -21,6 +21,10 @@ use crate::{
storage::{self, ConnectionPool},
};

/// If less than TEMPORARY_FETCHER_THRESHOLD certificates are missing,
/// the temporary fetcher will stop fetching blocks.
pub(crate) const TEMPORARY_FETCHER_THRESHOLD: u64 = 10;

/// External node.
pub(super) struct EN {
pub(super) pool: ConnectionPool,
Expand Down Expand Up @@ -120,6 +124,20 @@ impl EN {
.wrap("Store::new()")?;
s.spawn_bg(async { Ok(runner.run(ctx).await?) });

// Run the temporary fetcher until the certificates are backfilled.
// Temporary fetcher should be removed once json RPC syncing is fully deprecated.
s.spawn_bg({
let store = store.clone();
async {
let store = store;
self.temporary_block_fetcher(ctx, &store).await?;
tracing::info!(
"temporary block fetcher finished, switching to p2p fetching only"
);
Ok(())
}
});

let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone()))
.await
.wrap("BlockStore::new()")?;
Expand Down Expand Up @@ -358,8 +376,42 @@ impl EN {
}
}

/// Fetches blocks from the main node directly, until the certificates
/// are backfilled. This allows for smooth transition from json RPC to p2p block syncing.
pub(crate) async fn temporary_block_fetcher(
&self,
ctx: &ctx::Ctx,
store: &Store,
) -> ctx::Result<()> {
const MAX_CONCURRENT_REQUESTS: usize = 30;
scope::run!(ctx, |ctx, s| async {
let (send, mut recv) = ctx::channel::bounded(MAX_CONCURRENT_REQUESTS);
s.spawn(async {
let Some(mut next) = store.next_block(ctx).await? else {
return Ok(());
};
while store.persisted().borrow().next().0 + TEMPORARY_FETCHER_THRESHOLD < next.0 {
let n = L2BlockNumber(next.0.try_into().context("overflow")?);
self.sync_state.wait_for_main_node_block(ctx, n).await?;
send.send(ctx, s.spawn(self.fetch_block(ctx, n))).await?;
next = next.next();
}
drop(send);
Ok(())
});
while let Ok(block) = recv.recv_or_disconnected(ctx).await? {
store
.queue_next_fetched_block(ctx, block.join(ctx).await?)
.await
.wrap("queue_next_fetched_block()")?;
}
Ok(())
})
.await
}

/// Fetches blocks from the main node in range `[cursor.next()..end)`.
pub(super) async fn fetch_blocks(
async fn fetch_blocks(
&self,
ctx: &ctx::Ctx,
queue: &mut storage::PayloadQueue,
Expand All @@ -373,7 +425,7 @@ impl EN {
s.spawn(async {
let send = send;
while end.map_or(true, |end| next < end) {
let n = L2BlockNumber(next.0.try_into().unwrap());
let n = L2BlockNumber(next.0.try_into().context("overflow")?);
self.sync_state.wait_for_main_node_block(ctx, n).await?;
send.send(ctx, s.spawn(self.fetch_block(ctx, n))).await?;
next = next.next();
Expand Down
24 changes: 24 additions & 0 deletions core/node/consensus/src/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,30 @@ impl Store {
async fn conn(&self, ctx: &ctx::Ctx) -> ctx::Result<Connection> {
self.pool.connection(ctx).await.wrap("connection")
}

/// Number of the next block to queue.
pub(crate) async fn next_block(
&self,
ctx: &ctx::Ctx,
) -> ctx::OrCanceled<Option<validator::BlockNumber>> {
Ok(sync::lock(ctx, &self.block_payloads)
.await?
.as_ref()
.map(|p| p.next()))
}

/// Queues the next block.
pub(crate) async fn queue_next_fetched_block(
&self,
ctx: &ctx::Ctx,
block: FetchedBlock,
) -> ctx::Result<()> {
let mut payloads = sync::lock(ctx, &self.block_payloads).await?.into_async();
if let Some(payloads) = &mut *payloads {
payloads.send(block).await.context("payloads.send()")?;
}
Ok(())
}
}

impl PersistedBlockState {
Expand Down
39 changes: 38 additions & 1 deletion core/node/consensus/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ use zksync_types::{
};
use zksync_web3_decl::client::{Client, DynClient, L2};

use crate::{en, storage::ConnectionPool};
use crate::{
en,
storage::{ConnectionPool, Store},
};

/// Fake StateKeeper for tests.
#[derive(Debug)]
Expand Down Expand Up @@ -416,6 +419,40 @@ impl StateKeeper {
.await
}

pub async fn run_temporary_fetcher(
self,
ctx: &ctx::Ctx,
client: Box<DynClient<L2>>,
) -> ctx::Result<()> {
scope::run!(ctx, |ctx, s| async {
let payload_queue = self
.pool
.connection(ctx)
.await
.wrap("connection()")?
.new_payload_queue(ctx, self.actions_sender, self.sync_state.clone())
.await
.wrap("new_payload_queue()")?;
let (store, runner) = Store::new(
ctx,
self.pool.clone(),
Some(payload_queue),
Some(client.clone()),
)
.await
.wrap("Store::new()")?;
s.spawn_bg(async { Ok(runner.run(ctx).await?) });
en::EN {
pool: self.pool.clone(),
client,
sync_state: self.sync_state.clone(),
}
.temporary_block_fetcher(ctx, &store)
.await
})
.await
}

/// Runs consensus node for the external node.
pub async fn run_consensus(
self,
Expand Down
132 changes: 132 additions & 0 deletions core/node/consensus/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ use zksync_consensus_storage::{BlockStore, PersistentBlockStore};
use zksync_dal::consensus_dal;
use zksync_test_account::Account;
use zksync_types::ProtocolVersionId;
use zksync_web3_decl::namespaces::EnNamespaceClient as _;

use crate::{
en::TEMPORARY_FETCHER_THRESHOLD,
mn::run_main_node,
storage::{ConnectionPool, Store},
testonly,
Expand Down Expand Up @@ -665,6 +667,136 @@ async fn test_p2p_fetcher_backfill_certs(
.unwrap();
}

// Test temporary fetcher fetching blocks if a lot of certs are missing.
#[test_casing(8, Product((FROM_SNAPSHOT,VERSIONS,PREGENESIS)))]
#[tokio::test]
async fn test_temporary_fetcher(from_snapshot: bool, version: ProtocolVersionId, pregenesis: bool) {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
// We force certs to be missing on EN by having 1 of the validators permanently offline.
// This way no blocks will be finalized at all, so no one will have certs.
let setup = Setup::new(rng, 2);
let validator_cfg = testonly::new_configs(rng, &setup, 0, pregenesis)[0].clone();
let node_cfg = validator_cfg.new_fullnode(rng);
let account = &mut Account::random();

scope::run!(ctx, |ctx, s| async {
tracing::info!("Spawn validator.");
let validator_pool = ConnectionPool::test(from_snapshot, version).await;
let (mut validator, runner) =
testonly::StateKeeper::new(ctx, validator_pool.clone()).await?;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(run_main_node(
ctx,
validator_cfg.config.clone(),
validator_cfg.secrets.clone(),
validator_pool.clone(),
));
// API server needs at least 1 L1 batch to start.
validator.seal_batch().await;
let client = validator.connect(ctx).await?;

// Wait for the consensus to be initialized.
while ctx.wait(client.consensus_global_config()).await??.is_none() {
ctx.sleep(time::Duration::milliseconds(100)).await?;
}

let node_pool = ConnectionPool::test(from_snapshot, version).await;

tracing::info!("Run centralized fetcher, so that there is a lot of certs missing.");
scope::run!(ctx, |ctx, s| async {
let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(node.run_fetcher(ctx, client.clone()));
validator
.push_random_blocks(rng, account, TEMPORARY_FETCHER_THRESHOLD as usize + 1)
.await;
node_pool
.wait_for_payload(ctx, validator.last_block())
.await?;
Ok(())
})
.await
.unwrap();

tracing::info!(
"Run p2p fetcher. Blocks should be fetched by the temporary fetcher anyway."
);
scope::run!(ctx, |ctx, s| async {
let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(node.run_consensus(ctx, client.clone(), node_cfg.clone()));
validator.push_random_blocks(rng, account, 5).await;
node_pool
.wait_for_payload(ctx, validator.last_block())
.await?;
Ok(())
})
.await
.unwrap();
Ok(())
})
.await
.unwrap();
}

// Test that temporary fetcher terminates once enough blocks have certs.
#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))]
#[tokio::test]
async fn test_temporary_fetcher_termination(from_snapshot: bool, version: ProtocolVersionId) {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
let setup = Setup::new(rng, 1);
let pregenesis = true;
let validator_cfg = testonly::new_configs(rng, &setup, 0, pregenesis)[0].clone();
let node_cfg = validator_cfg.new_fullnode(rng);
let account = &mut Account::random();

scope::run!(ctx, |ctx, s| async {
tracing::info!("Spawn validator.");
let validator_pool = ConnectionPool::test(from_snapshot, version).await;
let (mut validator, runner) =
testonly::StateKeeper::new(ctx, validator_pool.clone()).await?;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(run_main_node(
ctx,
validator_cfg.config.clone(),
validator_cfg.secrets.clone(),
validator_pool.clone(),
));
// API server needs at least 1 L1 batch to start.
validator.seal_batch().await;
let client = validator.connect(ctx).await?;

let node_pool = ConnectionPool::test(from_snapshot, version).await;

// Run the EN so the consensus is initialized on EN and wait for it to sync.
scope::run!(ctx, |ctx, s| async {
let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(node.run_consensus(ctx, client.clone(), node_cfg.clone()));
validator.push_random_blocks(rng, account, 5).await;
node_pool
.wait_for_payload(ctx, validator.last_block())
.await?;
Ok(())
})
.await
.unwrap();

// Run the temporary fetcher. It should terminate immediately, since EN is synced.
let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?;
s.spawn_bg(runner.run(ctx));
node.run_temporary_fetcher(ctx, client).await?;

Ok(())
})
.await
.unwrap();
}

#[test_casing(4, Product((VERSIONS,PREGENESIS)))]
#[tokio::test]
async fn test_with_pruning(version: ProtocolVersionId, pregenesis: bool) {
Expand Down
Loading