Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(consensus): add tracing instrumentation to consensus store #2546

Merged
merged 7 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion core/node/consensus/src/en.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::Context as _;
use tracing::Instrument;
use zksync_concurrency::{ctx, error::Wrap as _, scope, time};
use zksync_consensus_executor as executor;
use zksync_consensus_roles::validator;
Expand Down Expand Up @@ -70,7 +71,11 @@ impl EN {
s.spawn_bg::<()>(async {
let old = genesis;
loop {
if let Ok(new) = self.fetch_genesis(ctx).await {
if let Ok(new) = self
.fetch_genesis(ctx)
.instrument(tracing::info_span!("genesis_monitor_fetch"))
itegulov marked this conversation as resolved.
Show resolved Hide resolved
.await
{
if new != old {
return Err(anyhow::format_err!(
"genesis changed: old {old:?}, new {new:?}"
Expand Down
6 changes: 6 additions & 0 deletions core/node/consensus/src/storage/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ impl<'a> Connection<'a> {
}

/// Wrapper for `consensus_dal().insert_block_certificate()`.
#[tracing::instrument(skip_all, fields(block_number = %cert.message.proposal.number))]
itegulov marked this conversation as resolved.
Show resolved Hide resolved
pub async fn insert_block_certificate(
&mut self,
ctx: &ctx::Ctx,
Expand All @@ -118,6 +119,7 @@ impl<'a> Connection<'a> {

/// Wrapper for `consensus_dal().insert_batch_certificate()`,
/// which additionally verifies that the batch hash matches the stored batch.
#[tracing::instrument(skip_all, fields(batch_number = %cert.message.number))]
pub async fn insert_batch_certificate(
&mut self,
ctx: &ctx::Ctx,
Expand Down Expand Up @@ -223,11 +225,13 @@ impl<'a> Connection<'a> {
}

/// Wrapper for `consensus_dal().next_block()`.
#[tracing::instrument(skip_all)]
async fn next_block(&mut self, ctx: &ctx::Ctx) -> ctx::Result<validator::BlockNumber> {
Ok(ctx.wait(self.0.consensus_dal().next_block()).await??)
}

/// Wrapper for `consensus_dal().block_certificates_range()`.
#[tracing::instrument(skip_all)]
pub(crate) async fn block_certificates_range(
&mut self,
ctx: &ctx::Ctx,
Expand Down Expand Up @@ -305,6 +309,7 @@ impl<'a> Connection<'a> {
}

/// Wrapper for `blocks_dal().get_sealed_l1_batch_number()`.
#[tracing::instrument(skip_all)]
pub async fn get_last_batch_number(
&mut self,
ctx: &ctx::Ctx,
Expand Down Expand Up @@ -390,6 +395,7 @@ impl<'a> Connection<'a> {
}

/// Construct the [storage::BatchStoreState] which contains the earliest batch and the last available [attester::SyncBatch].
#[tracing::instrument(skip_all)]
pub async fn batches_range(&mut self, ctx: &ctx::Ctx) -> ctx::Result<storage::BatchStoreState> {
let first = self
.0
Expand Down
181 changes: 115 additions & 66 deletions core/node/consensus/src/storage/store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use anyhow::Context as _;
use tracing::Instrument;
use zksync_concurrency::{ctx, error::Wrap as _, scope, sync, time};
use zksync_consensus_bft::PayloadManager;
use zksync_consensus_roles::{attester, validator};
Expand Down Expand Up @@ -129,6 +130,7 @@ impl PersistedBlockState {
/// If `persisted.first` is moved forward, it means that blocks have been pruned.
/// If `persisted.last` is moved forward, it means that new blocks with certificates have been
/// persisted.
#[tracing::instrument(skip_all, fields(first = %new.first, last = ?new.last.as_ref().map(|l| l.message.proposal.number)))]
fn update(&self, new: storage::BlockStoreState) {
self.0.send_if_modified(|p| {
if &new == p {
Expand All @@ -149,6 +151,7 @@ impl PersistedBlockState {
}

/// Appends the `cert` to `persisted` range.
#[tracing::instrument(skip_all, fields(batch_number = %cert.message.proposal.number))]
fn advance(&self, cert: validator::CommitQC) {
self.0.send_if_modified(|p| {
if p.next() != cert.header().number {
Expand All @@ -175,14 +178,23 @@ impl StoreRunner {
// Loop updating `blocks_persisted` whenever blocks get pruned.
const POLL_INTERVAL: time::Duration = time::Duration::seconds(1);
loop {
let range = pool
.connection(ctx)
.await?
.block_certificates_range(ctx)
.await
.wrap("block_certificates_range()")?;
blocks_persisted.update(range);
ctx.sleep(POLL_INTERVAL).await?;
let span = tracing::info_span!("blocks_persisted_loop");
itegulov marked this conversation as resolved.
Show resolved Hide resolved
async {
let range = pool
.connection(ctx)
.await?
.block_certificates_range(ctx)
.await
.wrap("block_certificates_range()")?;
blocks_persisted.update(range);
ctx.sleep(POLL_INTERVAL)
.instrument(tracing::info_span!("sleep"))
itegulov marked this conversation as resolved.
Show resolved Hide resolved
.await?;

anyhow::Ok(())
}
.instrument(span)
.await?;
}
});

Expand All @@ -203,93 +215,130 @@ impl StoreRunner {
const POLL_INTERVAL: time::Duration = time::Duration::seconds(1);
let mut next_batch_number = { batches_persisted.borrow().next() };
loop {
let mut conn = pool.connection(ctx).await?;
if let Some(last_batch_number) = conn
.get_last_batch_number(ctx)
.await
.wrap("last_batch_number()")?
{
if last_batch_number >= next_batch_number {
let range = conn.batches_range(ctx).await.wrap("batches_range()")?;
next_batch_number = last_batch_number.next();
batches_persisted.send_replace(range);
let span = tracing::info_span!("gossip_sync_batches_loop", %next_batch_number);
async {
let mut conn = pool.connection(ctx).await?;
if let Some(last_batch_number) = conn
.get_last_batch_number(ctx)
.await
.wrap("last_batch_number()")?
{
if last_batch_number >= next_batch_number {
let range =
conn.batches_range(ctx).await.wrap("batches_range()")?;
next_batch_number = last_batch_number.next();
tracing::info_span!("batches_persisted_send").in_scope(|| {
batches_persisted.send_replace(range);
});
}
}
ctx.sleep(POLL_INTERVAL)
.instrument(tracing::info_span!("sleep"))
itegulov marked this conversation as resolved.
Show resolved Hide resolved
.await?;

anyhow::Ok(())
}
ctx.sleep(POLL_INTERVAL).await?;
.instrument(span)
.await?;
}
});

s.spawn::<()>(async {
// Loop inserting batch certificates into storage
const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50);
loop {
let cert = batch_certificates.recv(ctx).await?;
let span = tracing::info_span!("insert_batch_certificates_loop");

async {
let cert = batch_certificates
.recv(ctx)
.instrument(tracing::info_span!("wait_for_certificate"))
itegulov marked this conversation as resolved.
Show resolved Hide resolved
.await?;

loop {
itegulov marked this conversation as resolved.
Show resolved Hide resolved
use consensus_dal::InsertCertificateError as E;
// Try to insert the cert.
let res = pool
.connection(ctx)
.await?
.insert_batch_certificate(ctx, &cert)
.await;

match res {
Ok(()) => {
break;
}
Err(InsertCertificateError::Inner(E::MissingPayload)) => {
// The L1 batch isn't available yet.
// We can wait until it's produced/received, or we could modify gossip
// so that we don't even accept votes until we have the corresponding batch.
ctx.sleep(POLL_INTERVAL)
.instrument(tracing::info_span!("wait_for_batch"))
.await?;
}
Err(InsertCertificateError::Inner(err)) => {
return Err(ctx::Error::Internal(anyhow::Error::from(err)))
}
Err(InsertCertificateError::Canceled(err)) => {
return Err(ctx::Error::Canceled(err))
}
}
}

Ok(())
}
.instrument(span)
.await?
}
});

loop {
// Loop inserting block certs to storage.
const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50);
loop {
let span = tracing::info_span!("insert_block_certificates_loop");
async {
let cert = block_certificates
.recv(ctx)
.instrument(tracing::info_span!("wait_for_certificate"))
itegulov marked this conversation as resolved.
Show resolved Hide resolved
.await?;
// Wait for the block to be persisted, so that we can attach a cert to it.
// We may exit this loop without persisting the certificate in case the
// corresponding block has been pruned in the meantime.
while blocks_persisted.should_be_persisted(&cert) {
use consensus_dal::InsertCertificateError as E;
// Try to insert the cert.
let res = pool
.connection(ctx)
.await?
.insert_batch_certificate(ctx, &cert)
.insert_block_certificate(ctx, &cert)
.await;

match res {
Ok(()) => {
// Insertion succeeded: update persisted state
// and wait for the next cert.
blocks_persisted.advance(cert);
break;
}
Err(InsertCertificateError::Inner(E::MissingPayload)) => {
// The L1 batch isn't available yet.
// We can wait until it's produced/received, or we could modify gossip
// so that we don't even accept votes until we have the corresponding batch.
ctx.sleep(POLL_INTERVAL).await?;
}
Err(InsertCertificateError::Inner(err)) => {
return Err(ctx::Error::Internal(anyhow::Error::from(err)))
// the payload is not in storage, it's either not yet persisted
// or already pruned. We will retry after a delay.
ctx.sleep(POLL_INTERVAL)
.instrument(tracing::info_span!("wait_for_block"))
.await?;
}
Err(InsertCertificateError::Canceled(err)) => {
return Err(ctx::Error::Canceled(err))
}
Err(InsertCertificateError::Inner(err)) => {
return Err(ctx::Error::Internal(anyhow::Error::from(err)))
}
}
}
}
});

// Loop inserting block certs to storage.
const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50);
loop {
let cert = block_certificates.recv(ctx).await?;
// Wait for the block to be persisted, so that we can attach a cert to it.
// We may exit this loop without persisting the certificate in case the
// corresponding block has been pruned in the meantime.
while blocks_persisted.should_be_persisted(&cert) {
use consensus_dal::InsertCertificateError as E;
// Try to insert the cert.
let res = pool
.connection(ctx)
.await?
.insert_block_certificate(ctx, &cert)
.await;
match res {
Ok(()) => {
// Insertion succeeded: update persisted state
// and wait for the next cert.
blocks_persisted.advance(cert);
break;
}
Err(InsertCertificateError::Inner(E::MissingPayload)) => {
// the payload is not in storage, it's either not yet persisted
// or already pruned. We will retry after a delay.
ctx.sleep(POLL_INTERVAL).await?;
}
Err(InsertCertificateError::Canceled(err)) => {
return Err(ctx::Error::Canceled(err))
}
Err(InsertCertificateError::Inner(err)) => {
return Err(ctx::Error::Internal(anyhow::Error::from(err)))
}
}
Ok(())
}
.instrument(span)
.await?;
}
})
.await;
Expand Down
Loading