Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(consensus): better logging of errors #3170

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions core/lib/dal/src/consensus_dal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ pub struct ConsensusDal<'a, 'c> {
pub enum InsertCertificateError {
#[error("corresponding payload is missing")]
MissingPayload,
#[error("certificate doesn't match the payload")]
PayloadMismatch,
#[error("certificate doesn't match the payload, payload = {0:?}")]
PayloadMismatch(Payload),
#[error(transparent)]
Dal(#[from] DalError),
#[error(transparent)]
Expand Down Expand Up @@ -528,7 +528,7 @@ impl ConsensusDal<'_, '_> {
.await?
.ok_or(E::MissingPayload)?;
if header.payload != want_payload.encode().hash() {
return Err(E::PayloadMismatch);
return Err(E::PayloadMismatch(want_payload));
}
sqlx::query!(
r#"
Expand Down Expand Up @@ -634,7 +634,7 @@ impl ConsensusDal<'_, '_> {
pub async fn insert_batch_certificate(
&mut self,
cert: &attester::BatchQC,
) -> Result<(), InsertCertificateError> {
) -> anyhow::Result<()> {
let cfg = self
.global_config()
.await
Expand All @@ -652,9 +652,7 @@ impl ConsensusDal<'_, '_> {
.context("batch()")?
.context("batch is missing")?,
);
if cert.message.hash != hash {
return Err(InsertCertificateError::PayloadMismatch);
}
anyhow::ensure!(cert.message.hash == hash, "hash mismatch");
cert.verify(cfg.genesis.hash(), &committee)
.context("cert.verify()")?;
sqlx::query!(
Expand Down
11 changes: 8 additions & 3 deletions core/node/consensus/src/en.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,11 @@ impl EN {
let mut next = attester::BatchNumber(0);
loop {
let status = loop {
match self.fetch_attestation_status(ctx).await {
match self
.fetch_attestation_status(ctx)
.await
.wrap("fetch_attestation_status()")
{
Err(err) => tracing::warn!("{err:#}"),
Ok(status) => {
if status.genesis != cfg.genesis.hash() {
Expand Down Expand Up @@ -439,7 +443,7 @@ impl EN {
});
while end.map_or(true, |end| queue.next() < end) {
let block = recv.recv(ctx).await?.join(ctx).await?;
queue.send(block).await?;
queue.send(block).await.context("queue.send()")?;
}
Ok(())
})
Expand All @@ -448,7 +452,8 @@ impl EN {
if first < queue.next() {
self.pool
.wait_for_payload(ctx, queue.next().prev().unwrap())
.await?;
.await
.wrap("wait_for_payload()")?;
}
Ok(())
}
Expand Down
7 changes: 2 additions & 5 deletions core/node/consensus/src/mn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use zksync_dal::consensus_dal;

use crate::{
config, registry,
storage::{ConnectionPool, InsertCertificateError, Store},
storage::{ConnectionPool, Store},
};

/// Task running a consensus validator for the main node.
Expand Down Expand Up @@ -179,10 +179,7 @@ async fn run_attestation_controller(
.wrap("connection()")?
.insert_batch_certificate(ctx, &qc)
.await
.map_err(|err| match err {
InsertCertificateError::Canceled(err) => ctx::Error::Canceled(err),
InsertCertificateError::Inner(err) => ctx::Error::Internal(err.into()),
})?;
.wrap("insert_batch_certificate()")?;
}
}
.await;
Expand Down
2 changes: 1 addition & 1 deletion core/node/consensus/src/storage/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl<'a> Connection<'a> {
&mut self,
ctx: &ctx::Ctx,
cert: &attester::BatchQC,
) -> Result<(), super::InsertCertificateError> {
) -> ctx::Result<()> {
Ok(ctx
.wait(self.0.consensus_dal().insert_batch_certificate(cert))
.await??)
Expand Down
4 changes: 1 addition & 3 deletions core/node/consensus/src/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,7 @@ impl StoreRunner {
Err(InsertCertificateError::Canceled(err)) => {
return Err(ctx::Error::Canceled(err))
}
Err(InsertCertificateError::Inner(err)) => {
return Err(ctx::Error::Internal(anyhow::Error::from(err)))
}
Err(err) => Err(err).context("insert_block_certificate()")?,
}
}

Expand Down
27 changes: 24 additions & 3 deletions core/node/node_framework/src/service/error.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,41 @@
use std::fmt;

use crate::{task::TaskId, wiring_layer::WiringError};

/// An error that can occur during the task lifecycle.
#[derive(Debug, thiserror::Error)]
pub enum TaskError {
#[error("Task {0} failed: {1}")]
#[error("Task {0} failed: {1:#}")]
TaskFailed(TaskId, anyhow::Error),
#[error("Task {0} panicked: {1}")]
TaskPanicked(TaskId, String),
#[error("Shutdown for task {0} timed out")]
TaskShutdownTimedOut(TaskId),
#[error("Shutdown hook {0} failed: {1}")]
#[error("Shutdown hook {0} failed: {1:#}")]
ShutdownHookFailed(TaskId, anyhow::Error),
#[error("Shutdown hook {0} timed out")]
ShutdownHookTimedOut(TaskId),
}

/// Wrapper of a list of errors with a reasonable formatting.
pub struct TaskErrors(pub Vec<TaskError>);

impl From<Vec<TaskError>> for TaskErrors {
fn from(errs: Vec<TaskError>) -> Self {
Self(errs)
}
}

impl fmt::Debug for TaskErrors {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.0
.iter()
.map(|err| format!("{err:#}"))
.collect::<Vec<_>>()
.fmt(f)
}
}

/// An error that can occur during the service lifecycle.
#[derive(Debug, thiserror::Error)]
pub enum ZkStackServiceError {
Expand All @@ -25,5 +46,5 @@ pub enum ZkStackServiceError {
#[error("One or more wiring layers failed to initialize: {0:?}")]
Wiring(Vec<(String, WiringError)>),
#[error("One or more tasks failed: {0:?}")]
Task(Vec<TaskError>),
Task(TaskErrors),
}
2 changes: 1 addition & 1 deletion core/node/node_framework/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl ZkStackService {
if self.errors.is_empty() {
Ok(())
} else {
Err(ZkStackServiceError::Task(self.errors))
Err(ZkStackServiceError::Task(self.errors.into()))
}
}

Expand Down
Loading