Skip to content

Commit

Permalink
feat(consensus): add tracing instrumentation to consensus store (#2546)
Browse files Browse the repository at this point in the history
## What ❔

This PR adds instrumentation to consensus store layer which should serve
as a good starting point. Most other layers are heavily intertwined with
the `era-consensus` codebase so we will need to introduce
instrumentation there first.

## Why ❔

Better visibility of what we spend our time on, including waiting time

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
itegulov authored Aug 1, 2024
1 parent 1d206c0 commit 1e53940
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 59 deletions.
3 changes: 2 additions & 1 deletion core/node/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ secrecy.workspace = true
tempfile.workspace = true
thiserror.workspace = true
tracing.workspace = true
tokio.workspace = true

[dev-dependencies]
zksync_node_genesis.workspace = true
zksync_node_test_utils.workspace = true
zksync_node_api_server.workspace = true
zksync_test_account.workspace = true
zksync_contracts.workspace= true
zksync_contracts.workspace = true

tokio.workspace = true
test-casing.workspace = true
Expand Down
1 change: 1 addition & 0 deletions core/node/consensus/src/en.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ impl EN {
}

/// Fetches genesis from the main node.
#[tracing::instrument(skip_all)]
async fn fetch_genesis(&self, ctx: &ctx::Ctx) -> ctx::Result<validator::Genesis> {
let genesis = ctx
.wait(self.client.fetch_consensus_genesis())
Expand Down
6 changes: 6 additions & 0 deletions core/node/consensus/src/storage/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ impl<'a> Connection<'a> {
}

/// Wrapper for `consensus_dal().insert_block_certificate()`.
#[tracing::instrument(skip_all, fields(l2_block = %cert.message.proposal.number))]
pub async fn insert_block_certificate(
&mut self,
ctx: &ctx::Ctx,
Expand All @@ -118,6 +119,7 @@ impl<'a> Connection<'a> {

/// Wrapper for `consensus_dal().insert_batch_certificate()`,
/// which additionally verifies that the batch hash matches the stored batch.
#[tracing::instrument(skip_all, fields(l1_batch = %cert.message.number))]
pub async fn insert_batch_certificate(
&mut self,
ctx: &ctx::Ctx,
Expand Down Expand Up @@ -223,11 +225,13 @@ impl<'a> Connection<'a> {
}

/// Wrapper for `consensus_dal().next_block()`.
#[tracing::instrument(skip_all)]
async fn next_block(&mut self, ctx: &ctx::Ctx) -> ctx::Result<validator::BlockNumber> {
Ok(ctx.wait(self.0.consensus_dal().next_block()).await??)
}

/// Wrapper for `consensus_dal().block_certificates_range()`.
#[tracing::instrument(skip_all)]
pub(crate) async fn block_certificates_range(
&mut self,
ctx: &ctx::Ctx,
Expand Down Expand Up @@ -305,6 +309,7 @@ impl<'a> Connection<'a> {
}

/// Wrapper for `blocks_dal().get_sealed_l1_batch_number()`.
#[tracing::instrument(skip_all)]
pub async fn get_last_batch_number(
&mut self,
ctx: &ctx::Ctx,
Expand Down Expand Up @@ -390,6 +395,7 @@ impl<'a> Connection<'a> {
}

/// Construct the [storage::BatchStoreState] which contains the earliest batch and the last available [attester::SyncBatch].
#[tracing::instrument(skip_all)]
pub async fn batches_range(&mut self, ctx: &ctx::Ctx) -> ctx::Result<storage::BatchStoreState> {
let first = self
.0
Expand Down
195 changes: 137 additions & 58 deletions core/node/consensus/src/storage/store.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::sync::Arc;

use anyhow::Context as _;
use tokio::sync::watch::Sender;
use tracing::Instrument;
use zksync_concurrency::{ctx, error::Wrap as _, scope, sync, time};
use zksync_consensus_bft::PayloadManager;
use zksync_consensus_roles::{attester, validator};
use zksync_consensus_roles::{attester, attester::BatchNumber, validator};
use zksync_consensus_storage::{self as storage, BatchStoreState};
use zksync_dal::consensus_dal::{self, Payload};
use zksync_node_sync::fetcher::{FetchedBlock, FetchedTransaction};
Expand Down Expand Up @@ -129,6 +131,7 @@ impl PersistedBlockState {
/// If `persisted.first` is moved forward, it means that blocks have been pruned.
/// If `persisted.last` is moved forward, it means that new blocks with certificates have been
/// persisted.
#[tracing::instrument(skip_all, fields(first = %new.first, last = ?new.last.as_ref().map(|l| l.message.proposal.number)))]
fn update(&self, new: storage::BlockStoreState) {
self.0.send_if_modified(|p| {
if &new == p {
Expand All @@ -149,6 +152,7 @@ impl PersistedBlockState {
}

/// Appends the `cert` to `persisted` range.
#[tracing::instrument(skip_all, fields(batch_number = %cert.message.proposal.number))]
fn advance(&self, cert: validator::CommitQC) {
self.0.send_if_modified(|p| {
if p.next() != cert.header().number {
Expand All @@ -171,21 +175,61 @@ impl StoreRunner {
} = self;

let res = scope::run!(ctx, |ctx, s| async {
#[tracing::instrument(skip_all)]
async fn update_blocks_persisted_iteration(
ctx: &ctx::Ctx,
pool: &ConnectionPool,
blocks_persisted: &PersistedBlockState,
) -> ctx::Result<()> {
const POLL_INTERVAL: time::Duration = time::Duration::seconds(1);

let range = pool
.connection(ctx)
.await?
.block_certificates_range(ctx)
.await
.wrap("block_certificates_range()")?;
blocks_persisted.update(range);
ctx.sleep(POLL_INTERVAL).await?;

Ok(())
}

s.spawn::<()>(async {
// Loop updating `blocks_persisted` whenever blocks get pruned.
const POLL_INTERVAL: time::Duration = time::Duration::seconds(1);
loop {
let range = pool
.connection(ctx)
.await?
.block_certificates_range(ctx)
.await
.wrap("block_certificates_range()")?;
blocks_persisted.update(range);
ctx.sleep(POLL_INTERVAL).await?;
update_blocks_persisted_iteration(ctx, &pool, &blocks_persisted).await?;
}
});

#[tracing::instrument(skip_all, fields(l1_batch = %next_batch_number))]
async fn gossip_sync_batches_iteration(
ctx: &ctx::Ctx,
pool: &ConnectionPool,
next_batch_number: &mut BatchNumber,
batches_persisted: &Sender<BatchStoreState>,
) -> ctx::Result<()> {
const POLL_INTERVAL: time::Duration = time::Duration::seconds(1);

let mut conn = pool.connection(ctx).await?;
if let Some(last_batch_number) = conn
.get_last_batch_number(ctx)
.await
.wrap("last_batch_number()")?
{
if last_batch_number >= *next_batch_number {
let range = conn.batches_range(ctx).await.wrap("batches_range()")?;
*next_batch_number = last_batch_number.next();
tracing::info_span!("batches_persisted_send").in_scope(|| {
batches_persisted.send_replace(range);
});
}
}
ctx.sleep(POLL_INTERVAL).await?;

Ok(())
}

// NOTE: Running this update loop will trigger the gossip of `SyncBatches` which is currently
// pointless as there is no proof and we have to ignore them. We can disable it, but bear in
// mind that any node which gossips the availability will cause pushes and pulls in the consensus.
Expand All @@ -200,65 +244,85 @@ impl StoreRunner {
// up with L1 batches from peers _without_ the QC, based on L1 inclusion proofs instead.
// Nevertheless since the `SyncBatch` contains all transactions for all L2 blocks,
// we can try to make it less frequent by querying just the last batch number first.
const POLL_INTERVAL: time::Duration = time::Duration::seconds(1);
let mut next_batch_number = { batches_persisted.borrow().next() };
loop {
let mut conn = pool.connection(ctx).await?;
if let Some(last_batch_number) = conn
.get_last_batch_number(ctx)
.await
.wrap("last_batch_number()")?
{
if last_batch_number >= next_batch_number {
let range = conn.batches_range(ctx).await.wrap("batches_range()")?;
next_batch_number = last_batch_number.next();
batches_persisted.send_replace(range);
}
}
ctx.sleep(POLL_INTERVAL).await?;
gossip_sync_batches_iteration(
ctx,
&pool,
&mut next_batch_number,
&batches_persisted,
)
.await?;
}
});

s.spawn::<()>(async {
// Loop inserting batch certificates into storage
#[tracing::instrument(skip_all)]
async fn insert_batch_certificates_iteration(
ctx: &ctx::Ctx,
pool: &ConnectionPool,
batch_certificates: &mut ctx::channel::UnboundedReceiver<attester::BatchQC>,
) -> ctx::Result<()> {
const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50);

let cert = batch_certificates
.recv(ctx)
.instrument(tracing::info_span!("wait_for_batch_certificate"))
.await?;

loop {
let cert = batch_certificates.recv(ctx).await?;

loop {
use consensus_dal::InsertCertificateError as E;
// Try to insert the cert.
let res = pool
.connection(ctx)
.await?
.insert_batch_certificate(ctx, &cert)
.await;

match res {
Ok(()) => {
break;
}
Err(InsertCertificateError::Inner(E::MissingPayload)) => {
// The L1 batch isn't available yet.
// We can wait until it's produced/received, or we could modify gossip
// so that we don't even accept votes until we have the corresponding batch.
ctx.sleep(POLL_INTERVAL).await?;
}
Err(InsertCertificateError::Inner(err)) => {
return Err(ctx::Error::Internal(anyhow::Error::from(err)))
}
Err(InsertCertificateError::Canceled(err)) => {
return Err(ctx::Error::Canceled(err))
}
use consensus_dal::InsertCertificateError as E;
// Try to insert the cert.
let res = pool
.connection(ctx)
.await?
.insert_batch_certificate(ctx, &cert)
.await;

match res {
Ok(()) => {
break;
}
Err(InsertCertificateError::Inner(E::MissingPayload)) => {
// The L1 batch isn't available yet.
// We can wait until it's produced/received, or we could modify gossip
// so that we don't even accept votes until we have the corresponding batch.
ctx.sleep(POLL_INTERVAL)
.instrument(tracing::info_span!("wait_for_batch"))
.await?;
}
Err(InsertCertificateError::Inner(err)) => {
return Err(ctx::Error::Internal(anyhow::Error::from(err)))
}
Err(InsertCertificateError::Canceled(err)) => {
return Err(ctx::Error::Canceled(err))
}
}
}

Ok(())
}

s.spawn::<()>(async {
// Loop inserting batch certificates into storage
loop {
insert_batch_certificates_iteration(ctx, &pool, &mut batch_certificates)
.await?;
}
});

// Loop inserting block certs to storage.
const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50);
loop {
let cert = block_certificates.recv(ctx).await?;
#[tracing::instrument(skip_all)]
async fn insert_block_certificates_iteration(
ctx: &ctx::Ctx,
pool: &ConnectionPool,
block_certificates: &mut ctx::channel::UnboundedReceiver<validator::CommitQC>,
blocks_persisted: &PersistedBlockState,
) -> ctx::Result<()> {
const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50);

let cert = block_certificates
.recv(ctx)
.instrument(tracing::info_span!("wait_for_block_certificate"))
.await?;
// Wait for the block to be persisted, so that we can attach a cert to it.
// We may exit this loop without persisting the certificate in case the
// corresponding block has been pruned in the meantime.
Expand All @@ -280,7 +344,9 @@ impl StoreRunner {
Err(InsertCertificateError::Inner(E::MissingPayload)) => {
// the payload is not in storage, it's either not yet persisted
// or already pruned. We will retry after a delay.
ctx.sleep(POLL_INTERVAL).await?;
ctx.sleep(POLL_INTERVAL)
.instrument(tracing::info_span!("wait_for_block"))
.await?;
}
Err(InsertCertificateError::Canceled(err)) => {
return Err(ctx::Error::Canceled(err))
Expand All @@ -290,6 +356,19 @@ impl StoreRunner {
}
}
}

Ok(())
}

// Loop inserting block certs to storage.
loop {
insert_block_certificates_iteration(
ctx,
&pool,
&mut block_certificates,
&blocks_persisted,
)
.await?;
}
})
.await;
Expand Down

0 comments on commit 1e53940

Please sign in to comment.