Skip to content

Commit

Permalink
fix(sync): fix testnet syncer loop on large Orchard blocks (#4286)
Browse files Browse the repository at this point in the history
* Return BlockDownloadVerifyError from download_and_verify

* Check block requests and genesis for temporary errors

* Ignore DuplicateBlockQueuedForDownload as a temporary error

* Propagate error info to the syncer main loop

* Sleep after temporary genesis download and verify errors
  • Loading branch information
teor2345 authored May 4, 2022
1 parent e9d37c6 commit 56f766f
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 37 deletions.
98 changes: 71 additions & 27 deletions zebrad/src/components/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ where
/// Returns `Err` if there was an unrecoverable error and restarting the synchronization is
/// necessary.
#[instrument(skip(self))]
async fn try_to_sync(&mut self) -> Result<(), ()> {
async fn try_to_sync(&mut self) -> Result<(), Report> {
self.prospective_tips = HashSet::new();

info!(
Expand All @@ -390,14 +390,14 @@ where
);
if let Err(e) = self.obtain_tips().await {
info!("temporary error obtaining tips: {:#}", e);
return Err(());
return Err(e);
}
self.update_metrics();

while !self.prospective_tips.is_empty() {
// Check whether any block tasks are currently ready:
while let Poll::Ready(Some(rsp)) = futures::poll!(self.downloads.next()) {
self.handle_block_response(rsp).await?;
Self::handle_block_response(rsp)?;
}
self.update_metrics();

Expand All @@ -424,7 +424,7 @@ where

let response = self.downloads.next().await.expect("downloads is nonempty");

self.handle_block_response(response).await?;
Self::handle_block_response(response)?;
self.update_metrics();
}

Expand All @@ -439,7 +439,7 @@ where

if let Err(e) = self.extend_tips().await {
info!("temporary error extending tips: {:#}", e);
return Err(());
return Err(e);
}
self.update_metrics();
}
Expand Down Expand Up @@ -594,7 +594,8 @@ where
// so the last peer to respond can't toggle our mempool
self.recent_syncs.push_obtain_tips_length(new_downloads);

self.request_blocks(download_set).await?;
let response = self.request_blocks(download_set).await;
Self::handle_response(response)?;

Ok(())
}
Expand Down Expand Up @@ -731,7 +732,8 @@ where
// so the last peer to respond can't toggle our mempool
self.recent_syncs.push_extend_tips_length(new_downloads);

self.request_blocks(download_set).await?;
let response = self.request_blocks(download_set).await;
Self::handle_response(response)?;

Ok(())
}
Expand All @@ -748,14 +750,28 @@ where
// So we just download and verify the genesis block here.
while !self.state_contains(self.genesis_hash).await? {
info!("starting genesis block download and verify");
self.downloads
.download_and_verify(self.genesis_hash)
.await
.map_err(|e| eyre!(e))?;
match self.downloads.next().await.expect("downloads is nonempty") {

let response = self.downloads.download_and_verify(self.genesis_hash).await;
Self::handle_response(response).map_err(|e| eyre!(e))?;

let response = self.downloads.next().await.expect("downloads is nonempty");

match response {
Ok(hash) => trace!(?hash, "verified and committed block to state"),
Err(e) => {
warn!(?e, "could not download or verify genesis block, retrying");
Err(error) => {
// TODO: exit syncer on permanent service errors (NetworkError, VerifierError)
if Self::should_restart_sync(&error) {
warn!(
?error,
"could not download or verify genesis block, retrying"
);
} else {
info!(
?error,
"temporary error downloading or verifying genesis block, retrying"
);
}

tokio::time::sleep(GENESIS_TIMEOUT_RETRY).await;
}
}
Expand All @@ -765,7 +781,10 @@ where
}

/// Queue download and verify tasks for each block that isn't currently known to our node
async fn request_blocks(&mut self, hashes: IndexSet<block::Hash>) -> Result<(), Report> {
async fn request_blocks(
&mut self,
hashes: IndexSet<block::Hash>,
) -> Result<(), BlockDownloadVerifyError> {
debug!(hashes.len = hashes.len(), "requesting blocks");
for hash in hashes.into_iter() {
self.downloads.download_and_verify(hash).await?;
Expand All @@ -780,16 +799,30 @@ where
/// expected error occurred, so that the synchronization can continue normally.
///
/// Returns `Err` if an unexpected error occurred, to force the synchronizer to restart.
async fn handle_block_response(
&mut self,
fn handle_block_response(
response: Result<block::Hash, BlockDownloadVerifyError>,
) -> Result<(), ()> {
) -> Result<(), BlockDownloadVerifyError> {
match response {
Ok(hash) => trace!(?hash, "verified and committed block to state"),
Err(error) => {
if Self::should_restart_sync(error) {
return Err(());
}
Err(_) => return Self::handle_response(response.map(|_| ())),
}

Ok(())
}

/// Handles a response to a syncer request.
///
/// Returns `Ok` if the request was successful, or if an expected error occurred,
/// so that the synchronization can continue normally.
///
/// Returns `Err` if an unexpected error occurred, to force the synchronizer to restart.
fn handle_response(
response: Result<(), BlockDownloadVerifyError>,
) -> Result<(), BlockDownloadVerifyError> {
if let Err(error) = response {
// TODO: exit syncer on permanent service errors (NetworkError, VerifierError)
if Self::should_restart_sync(&error) {
return Err(error);
}
}

Expand Down Expand Up @@ -830,9 +863,9 @@ where

/// Return if the sync should be restarted based on the given error
/// from the block downloader and verifier stream.
fn should_restart_sync(e: BlockDownloadVerifyError) -> bool {
fn should_restart_sync(e: &BlockDownloadVerifyError) -> bool {
match e {
// Structural matches
// Structural matches: downcasts
BlockDownloadVerifyError::Invalid(VerifyChainError::Checkpoint(
VerifyCheckpointError::AlreadyVerified { .. },
)) => {
Expand All @@ -847,6 +880,8 @@ where
debug!(error = ?e, "block is already in chain, possibly from a previous sync run, continuing");
false
}

// Structural matches: direct
BlockDownloadVerifyError::CancelledDuringDownload
| BlockDownloadVerifyError::CancelledDuringVerification => {
debug!(error = ?e, "block verification was cancelled, continuing");
Expand All @@ -860,6 +895,14 @@ where
);
false
}
BlockDownloadVerifyError::DuplicateBlockQueuedForDownload { .. } => {
debug!(
error = ?e,
"queued duplicate block hash for download, \
assuming the syncer will eventually resolve duplicates, continuing"
);
false
}

// String matches
BlockDownloadVerifyError::Invalid(VerifyChainError::Block(
Expand All @@ -872,7 +915,9 @@ where
BlockDownloadVerifyError::DownloadFailed(ref source)
if format!("{:?}", source).contains("NotFound") =>
{
// Covers both NotFoundResponse and NotFoundRegistry errors.
// Covers these errors:
// - NotFoundResponse
// - NotFoundRegistry
//
// TODO: improve this by checking the type (#2908)
// restart after a certain number of NotFound errors?
Expand All @@ -887,13 +932,12 @@ where
// become incorrect e.g. after some refactoring, and it is difficult
// to write a test to check it. The test below is a best-effort
// attempt to catch if that happens and log it.
//
// TODO: add a proper test and remove this
// https://github.com/ZcashFoundation/zebra/issues/2909
let err_str = format!("{:?}", e);
if err_str.contains("AlreadyVerified")
|| err_str.contains("AlreadyInChain")
|| err_str.contains("Cancelled")
|| err_str.contains("BehindTipHeight")
|| err_str.contains("block is already committed to the state")
|| err_str.contains("NotFound")
{
Expand Down
26 changes: 16 additions & 10 deletions zebrad/src/components/sync/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::{
task::{Context, Poll},
};

use color_eyre::eyre::{eyre, Report};
use futures::{
future::TryFutureExt,
ready,
Expand Down Expand Up @@ -63,14 +62,17 @@ impl<Request: Clone> hedge::Policy<Request> for AlwaysHedge {
#[derive(Error, Debug)]
#[allow(dead_code)]
pub enum BlockDownloadVerifyError {
#[error("error downloading block")]
DownloadFailed(#[source] BoxError),
#[error("error from the network service")]
NetworkError(#[source] BoxError),

#[error("error from the verifier service")]
VerifierError(#[source] BoxError),

#[error("block did not pass consensus validation")]
Invalid(#[from] zebra_consensus::chain::VerifyChainError),
#[error("duplicate block hash queued for download: {hash:?}")]
DuplicateBlockQueuedForDownload { hash: block::Hash },

#[error("error downloading block")]
DownloadFailed(#[source] BoxError),

#[error("downloaded block was too far ahead of the chain tip")]
AboveLookaheadHeightLimit,
Expand All @@ -81,6 +83,9 @@ pub enum BlockDownloadVerifyError {
#[error("downloaded block had an invalid height")]
InvalidHeight,

#[error("block did not pass consensus validation")]
Invalid(#[from] zebra_consensus::chain::VerifyChainError),

#[error("block download / verification was cancelled during download")]
CancelledDuringDownload,

Expand Down Expand Up @@ -211,10 +216,13 @@ where
/// only if the network service fails. It returns immediately after queuing
/// the request.
#[instrument(level = "debug", skip(self), fields(%hash))]
pub async fn download_and_verify(&mut self, hash: block::Hash) -> Result<(), Report> {
pub async fn download_and_verify(
&mut self,
hash: block::Hash,
) -> Result<(), BlockDownloadVerifyError> {
if self.cancel_handles.contains_key(&hash) {
metrics::counter!("sync.already.queued.dropped.block.hash.count", 1);
return Err(eyre!("duplicate hash queued for download: {:?}", hash));
return Err(BlockDownloadVerifyError::DuplicateBlockQueuedForDownload { hash });
}

// We construct the block requests sequentially, waiting for the peer
Expand All @@ -224,14 +232,12 @@ where
// if we waited for readiness and did the service call in the spawned
// tasks, all of the spawned tasks would race each other waiting for the
// network to become ready.
debug!("waiting to request block");
let block_req = self
.network
.ready()
.await
.map_err(|e| eyre!(e))?
.map_err(BlockDownloadVerifyError::NetworkError)?
.call(zn::Request::BlocksByHash(std::iter::once(hash).collect()));
debug!("requested block");

// This oneshot is used to signal cancellation to the download task.
let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>();
Expand Down

0 comments on commit 56f766f

Please sign in to comment.