Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Ignore sync errors when the block is already verified #980

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 81 additions & 21 deletions zebrad/src/commands/start/sync.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{collections::HashSet, iter, pin::Pin, sync::Arc, time::Duration};

use color_eyre::eyre::{eyre, Report};
use futures::future::FutureExt;
use color_eyre::eyre::{eyre, Report, WrapErr};
use futures::future::{FutureExt, TryFutureExt};
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::{task::JoinHandle, time::delay_for};
use tower::{builder::ServiceBuilder, retry::Retry, timeout::Timeout, Service, ServiceExt};
Expand Down Expand Up @@ -52,7 +52,7 @@ where
state: ZS,
verifier: ZV,
prospective_tips: HashSet<CheckedTip>,
pending_blocks: Pin<Box<FuturesUnordered<JoinHandle<Result<block::Hash, Error>>>>>,
pending_blocks: Pin<Box<FuturesUnordered<JoinHandle<Result<block::Hash, ReportAndHash>>>>>,
genesis_hash: block::Hash,
}

Expand Down Expand Up @@ -93,6 +93,17 @@ where
self.request_genesis().await?;

'sync: loop {
// Update metrics for any ready tasks, before wiping state
while let Some(Some(rsp)) = self.pending_blocks.next().now_or_never() {
match rsp.expect("block download and verify tasks should not panic") {
Ok(hash) => tracing::trace!(?hash, "verified and committed block to state"),
Err((e, _)) => {
tracing::trace!(?e, "sync error before restarting sync, ignoring")
}
}
}
self.update_metrics();

// Wipe state from prevous iterations.
self.prospective_tips = HashSet::new();
self.pending_blocks = Box::pin(FuturesUnordered::new());
Expand All @@ -108,11 +119,34 @@ where
while !self.prospective_tips.is_empty() {
// Check whether any block tasks are currently ready:
while let Some(Some(rsp)) = self.pending_blocks.next().now_or_never() {
match rsp.expect("block download tasks should not panic") {
Ok(hash) => tracing::debug!(?hash, "verified and committed block to state"),
Err(e) => {
tracing::info!(?e, "restarting sync");
continue 'sync;
match rsp.expect("block download and verify tasks should not panic") {
Ok(hash) => {
tracing::trace!(?hash, "verified and committed block to state");
}
Err((e, hash)) => {
// We must restart the sync on every error, unless
// this block has already been verified.
//
// If we ignore other errors, the syncer can:
// - get a long way ahead of the state, and queue
// up a lot of unverified blocks in memory, or
// - get into an endless error cycle.
//
// In particular, we must restart if the checkpoint
// verifier has verified a block at this height, but
// the hash is different. In that case, we want to
// stop following an ancient side-chain.
if self.state_contains(hash).await? {
tracing::debug!(?e,
"sync error in ready task, but block is already verified, ignoring");
} else {
tracing::warn!(
?e,
"sync error in ready task, waiting to restart sync"
);
delay_for(SYNC_RESTART_TIMEOUT).await;
continue 'sync;
}
}
}
self.update_metrics();
Expand All @@ -133,10 +167,22 @@ where
.expect("pending_blocks is nonempty")
.expect("block download tasks should not panic")
{
Ok(hash) => tracing::debug!(?hash, "verified and committed block to state"),
Err(e) => {
tracing::info!(?e, "restarting sync");
continue 'sync;
Ok(hash) => {
tracing::trace!(?hash, "verified and committed block to state");
}
Err((e, hash)) => {
// We must restart the sync on every error, unless
// this block has already been verified.
// See the comment above for details.
if self.state_contains(hash).await? {
tracing::debug!(?e,
"sync error with pending above lookahead limit, but block is already verified, ignoring");
} else {
tracing::warn!(?e,
"sync error with pending above lookahead limit, waiting to restart sync");
delay_for(SYNC_RESTART_TIMEOUT).await;
continue 'sync;
}
}
}
} else {
Expand Down Expand Up @@ -362,8 +408,10 @@ where
.expect("inserted a download request")
.expect("block download tasks should not panic")
{
Ok(hash) => tracing::debug!(?hash, "verified and committed block to state"),
Err(e) => tracing::warn!(?e, "could not download genesis block, retrying"),
Ok(hash) => tracing::trace!(?hash, "verified and committed block to state"),
Err((e, _)) => {
tracing::warn!(?e, "could not download or verify genesis block, retrying")
}
}
}

Expand Down Expand Up @@ -399,7 +447,8 @@ where

tracing::debug!(?hash, "requested block");

let span = tracing::info_span!("block_fetch_verify", ?hash);
// This span is used to help diagnose sync warnings
let span = tracing::warn_span!("block_fetch_verify", ?hash);
let mut verifier = self.verifier.clone();
let task = tokio::spawn(
async move {
Expand All @@ -409,15 +458,25 @@ where
.next()
.expect("successful response has the block in it"),
Ok(_) => unreachable!("wrong response to block request"),
Err(e) => return Err(e),
// Make sure we can distinguish download and verify timeouts
Err(e) => Err(eyre!(e)).wrap_err("failed to download block")?,
};
metrics::counter!("sync.downloaded_blocks", 1);
metrics::counter!("sync.downloaded.block.count", 1);

let result = verifier.ready_and().await?.call(block).await;
metrics::counter!("sync.verified_blocks", 1);
result
let result = verifier
.ready_and()
.await
.map_err(|e| eyre!(e))
.wrap_err("verifier service failed to be ready")?
.call(block)
.await
.map_err(|e| eyre!(e))
.wrap_err("failed to verify block")?;
metrics::counter!("sync.verified.block.count", 1);
Result::<block::Hash, Report>::Ok(result)
}
.instrument(span),
.instrument(span)
.map_err(move |e| (e, hash)),
);
self.pending_blocks.push(task);
}
Expand Down Expand Up @@ -457,3 +516,4 @@ where
}

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type ReportAndHash = (Report, block::Hash);