diff --git a/core/node/consensus/Cargo.toml b/core/node/consensus/Cargo.toml index 68fffa56dcbc..574ae6fdf9f8 100644 --- a/core/node/consensus/Cargo.toml +++ b/core/node/consensus/Cargo.toml @@ -39,13 +39,14 @@ secrecy.workspace = true tempfile.workspace = true thiserror.workspace = true tracing.workspace = true +tokio.workspace = true [dev-dependencies] zksync_node_genesis.workspace = true zksync_node_test_utils.workspace = true zksync_node_api_server.workspace = true zksync_test_account.workspace = true -zksync_contracts.workspace= true +zksync_contracts.workspace = true tokio.workspace = true test-casing.workspace = true diff --git a/core/node/consensus/src/en.rs b/core/node/consensus/src/en.rs index 66bdc822c058..d14893042f5b 100644 --- a/core/node/consensus/src/en.rs +++ b/core/node/consensus/src/en.rs @@ -173,6 +173,7 @@ impl EN { } /// Fetches genesis from the main node. + #[tracing::instrument(skip_all)] async fn fetch_genesis(&self, ctx: &ctx::Ctx) -> ctx::Result { let genesis = ctx .wait(self.client.fetch_consensus_genesis()) diff --git a/core/node/consensus/src/storage/connection.rs b/core/node/consensus/src/storage/connection.rs index 6bae0a490508..8c8992b4d01d 100644 --- a/core/node/consensus/src/storage/connection.rs +++ b/core/node/consensus/src/storage/connection.rs @@ -106,6 +106,7 @@ impl<'a> Connection<'a> { } /// Wrapper for `consensus_dal().insert_block_certificate()`. + #[tracing::instrument(skip_all, fields(l2_block = %cert.message.proposal.number))] pub async fn insert_block_certificate( &mut self, ctx: &ctx::Ctx, @@ -118,6 +119,7 @@ impl<'a> Connection<'a> { /// Wrapper for `consensus_dal().insert_batch_certificate()`, /// which additionally verifies that the batch hash matches the stored batch. + #[tracing::instrument(skip_all, fields(l1_batch = %cert.message.number))] pub async fn insert_batch_certificate( &mut self, ctx: &ctx::Ctx, @@ -223,11 +225,13 @@ impl<'a> Connection<'a> { } /// Wrapper for `consensus_dal().next_block()`. + #[tracing::instrument(skip_all)] async fn next_block(&mut self, ctx: &ctx::Ctx) -> ctx::Result { Ok(ctx.wait(self.0.consensus_dal().next_block()).await??) } /// Wrapper for `consensus_dal().block_certificates_range()`. + #[tracing::instrument(skip_all)] pub(crate) async fn block_certificates_range( &mut self, ctx: &ctx::Ctx, @@ -305,6 +309,7 @@ impl<'a> Connection<'a> { } /// Wrapper for `blocks_dal().get_sealed_l1_batch_number()`. + #[tracing::instrument(skip_all)] pub async fn get_last_batch_number( &mut self, ctx: &ctx::Ctx, @@ -390,6 +395,7 @@ impl<'a> Connection<'a> { } /// Construct the [storage::BatchStoreState] which contains the earliest batch and the last available [attester::SyncBatch]. + #[tracing::instrument(skip_all)] pub async fn batches_range(&mut self, ctx: &ctx::Ctx) -> ctx::Result { let first = self .0 diff --git a/core/node/consensus/src/storage/store.rs b/core/node/consensus/src/storage/store.rs index ce73c946a029..70744988390d 100644 --- a/core/node/consensus/src/storage/store.rs +++ b/core/node/consensus/src/storage/store.rs @@ -1,9 +1,11 @@ use std::sync::Arc; use anyhow::Context as _; +use tokio::sync::watch::Sender; +use tracing::Instrument; use zksync_concurrency::{ctx, error::Wrap as _, scope, sync, time}; use zksync_consensus_bft::PayloadManager; -use zksync_consensus_roles::{attester, validator}; +use zksync_consensus_roles::{attester, attester::BatchNumber, validator}; use zksync_consensus_storage::{self as storage, BatchStoreState}; use zksync_dal::consensus_dal::{self, Payload}; use zksync_node_sync::fetcher::{FetchedBlock, FetchedTransaction}; @@ -129,6 +131,7 @@ impl PersistedBlockState { /// If `persisted.first` is moved forward, it means that blocks have been pruned. /// If `persisted.last` is moved forward, it means that new blocks with certificates have been /// persisted. + #[tracing::instrument(skip_all, fields(first = %new.first, last = ?new.last.as_ref().map(|l| l.message.proposal.number)))] fn update(&self, new: storage::BlockStoreState) { self.0.send_if_modified(|p| { if &new == p { @@ -149,6 +152,7 @@ impl PersistedBlockState { } /// Appends the `cert` to `persisted` range. + #[tracing::instrument(skip_all, fields(batch_number = %cert.message.proposal.number))] fn advance(&self, cert: validator::CommitQC) { self.0.send_if_modified(|p| { if p.next() != cert.header().number { @@ -171,21 +175,61 @@ impl StoreRunner { } = self; let res = scope::run!(ctx, |ctx, s| async { + #[tracing::instrument(skip_all)] + async fn update_blocks_persisted_iteration( + ctx: &ctx::Ctx, + pool: &ConnectionPool, + blocks_persisted: &PersistedBlockState, + ) -> ctx::Result<()> { + const POLL_INTERVAL: time::Duration = time::Duration::seconds(1); + + let range = pool + .connection(ctx) + .await? + .block_certificates_range(ctx) + .await + .wrap("block_certificates_range()")?; + blocks_persisted.update(range); + ctx.sleep(POLL_INTERVAL).await?; + + Ok(()) + } + s.spawn::<()>(async { // Loop updating `blocks_persisted` whenever blocks get pruned. - const POLL_INTERVAL: time::Duration = time::Duration::seconds(1); loop { - let range = pool - .connection(ctx) - .await? - .block_certificates_range(ctx) - .await - .wrap("block_certificates_range()")?; - blocks_persisted.update(range); - ctx.sleep(POLL_INTERVAL).await?; + update_blocks_persisted_iteration(ctx, &pool, &blocks_persisted).await?; } }); + #[tracing::instrument(skip_all, fields(l1_batch = %next_batch_number))] + async fn gossip_sync_batches_iteration( + ctx: &ctx::Ctx, + pool: &ConnectionPool, + next_batch_number: &mut BatchNumber, + batches_persisted: &Sender, + ) -> ctx::Result<()> { + const POLL_INTERVAL: time::Duration = time::Duration::seconds(1); + + let mut conn = pool.connection(ctx).await?; + if let Some(last_batch_number) = conn + .get_last_batch_number(ctx) + .await + .wrap("last_batch_number()")? + { + if last_batch_number >= *next_batch_number { + let range = conn.batches_range(ctx).await.wrap("batches_range()")?; + *next_batch_number = last_batch_number.next(); + tracing::info_span!("batches_persisted_send").in_scope(|| { + batches_persisted.send_replace(range); + }); + } + } + ctx.sleep(POLL_INTERVAL).await?; + + Ok(()) + } + // NOTE: Running this update loop will trigger the gossip of `SyncBatches` which is currently // pointless as there is no proof and we have to ignore them. We can disable it, but bear in // mind that any node which gossips the availability will cause pushes and pulls in the consensus. @@ -200,65 +244,85 @@ impl StoreRunner { // up with L1 batches from peers _without_ the QC, based on L1 inclusion proofs instead. // Nevertheless since the `SyncBatch` contains all transactions for all L2 blocks, // we can try to make it less frequent by querying just the last batch number first. - const POLL_INTERVAL: time::Duration = time::Duration::seconds(1); let mut next_batch_number = { batches_persisted.borrow().next() }; loop { - let mut conn = pool.connection(ctx).await?; - if let Some(last_batch_number) = conn - .get_last_batch_number(ctx) - .await - .wrap("last_batch_number()")? - { - if last_batch_number >= next_batch_number { - let range = conn.batches_range(ctx).await.wrap("batches_range()")?; - next_batch_number = last_batch_number.next(); - batches_persisted.send_replace(range); - } - } - ctx.sleep(POLL_INTERVAL).await?; + gossip_sync_batches_iteration( + ctx, + &pool, + &mut next_batch_number, + &batches_persisted, + ) + .await?; } }); - s.spawn::<()>(async { - // Loop inserting batch certificates into storage + #[tracing::instrument(skip_all)] + async fn insert_batch_certificates_iteration( + ctx: &ctx::Ctx, + pool: &ConnectionPool, + batch_certificates: &mut ctx::channel::UnboundedReceiver, + ) -> ctx::Result<()> { const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50); + + let cert = batch_certificates + .recv(ctx) + .instrument(tracing::info_span!("wait_for_batch_certificate")) + .await?; + loop { - let cert = batch_certificates.recv(ctx).await?; - - loop { - use consensus_dal::InsertCertificateError as E; - // Try to insert the cert. - let res = pool - .connection(ctx) - .await? - .insert_batch_certificate(ctx, &cert) - .await; - - match res { - Ok(()) => { - break; - } - Err(InsertCertificateError::Inner(E::MissingPayload)) => { - // The L1 batch isn't available yet. - // We can wait until it's produced/received, or we could modify gossip - // so that we don't even accept votes until we have the corresponding batch. - ctx.sleep(POLL_INTERVAL).await?; - } - Err(InsertCertificateError::Inner(err)) => { - return Err(ctx::Error::Internal(anyhow::Error::from(err))) - } - Err(InsertCertificateError::Canceled(err)) => { - return Err(ctx::Error::Canceled(err)) - } + use consensus_dal::InsertCertificateError as E; + // Try to insert the cert. + let res = pool + .connection(ctx) + .await? + .insert_batch_certificate(ctx, &cert) + .await; + + match res { + Ok(()) => { + break; + } + Err(InsertCertificateError::Inner(E::MissingPayload)) => { + // The L1 batch isn't available yet. + // We can wait until it's produced/received, or we could modify gossip + // so that we don't even accept votes until we have the corresponding batch. + ctx.sleep(POLL_INTERVAL) + .instrument(tracing::info_span!("wait_for_batch")) + .await?; + } + Err(InsertCertificateError::Inner(err)) => { + return Err(ctx::Error::Internal(anyhow::Error::from(err))) + } + Err(InsertCertificateError::Canceled(err)) => { + return Err(ctx::Error::Canceled(err)) } } } + + Ok(()) + } + + s.spawn::<()>(async { + // Loop inserting batch certificates into storage + loop { + insert_batch_certificates_iteration(ctx, &pool, &mut batch_certificates) + .await?; + } }); - // Loop inserting block certs to storage. - const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50); - loop { - let cert = block_certificates.recv(ctx).await?; + #[tracing::instrument(skip_all)] + async fn insert_block_certificates_iteration( + ctx: &ctx::Ctx, + pool: &ConnectionPool, + block_certificates: &mut ctx::channel::UnboundedReceiver, + blocks_persisted: &PersistedBlockState, + ) -> ctx::Result<()> { + const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50); + + let cert = block_certificates + .recv(ctx) + .instrument(tracing::info_span!("wait_for_block_certificate")) + .await?; // Wait for the block to be persisted, so that we can attach a cert to it. // We may exit this loop without persisting the certificate in case the // corresponding block has been pruned in the meantime. @@ -280,7 +344,9 @@ impl StoreRunner { Err(InsertCertificateError::Inner(E::MissingPayload)) => { // the payload is not in storage, it's either not yet persisted // or already pruned. We will retry after a delay. - ctx.sleep(POLL_INTERVAL).await?; + ctx.sleep(POLL_INTERVAL) + .instrument(tracing::info_span!("wait_for_block")) + .await?; } Err(InsertCertificateError::Canceled(err)) => { return Err(ctx::Error::Canceled(err)) @@ -290,6 +356,19 @@ impl StoreRunner { } } } + + Ok(()) + } + + // Loop inserting block certs to storage. + loop { + insert_block_certificates_iteration( + ctx, + &pool, + &mut block_certificates, + &blocks_persisted, + ) + .await?; } }) .await;