diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index 48cbb990008..85c47c9be26 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -381,7 +381,7 @@ where /// Returns `Err` if there was an unrecoverable error and restarting the synchronization is /// necessary. #[instrument(skip(self))] - async fn try_to_sync(&mut self) -> Result<(), ()> { + async fn try_to_sync(&mut self) -> Result<(), Report> { self.prospective_tips = HashSet::new(); info!( @@ -390,14 +390,14 @@ where ); if let Err(e) = self.obtain_tips().await { info!("temporary error obtaining tips: {:#}", e); - return Err(()); + return Err(e); } self.update_metrics(); while !self.prospective_tips.is_empty() { // Check whether any block tasks are currently ready: while let Poll::Ready(Some(rsp)) = futures::poll!(self.downloads.next()) { - self.handle_block_response(rsp).await?; + Self::handle_block_response(rsp)?; } self.update_metrics(); @@ -424,7 +424,7 @@ where let response = self.downloads.next().await.expect("downloads is nonempty"); - self.handle_block_response(response).await?; + Self::handle_block_response(response)?; self.update_metrics(); } @@ -439,7 +439,7 @@ where if let Err(e) = self.extend_tips().await { info!("temporary error extending tips: {:#}", e); - return Err(()); + return Err(e); } self.update_metrics(); } @@ -594,7 +594,8 @@ where // so the last peer to respond can't toggle our mempool self.recent_syncs.push_obtain_tips_length(new_downloads); - self.request_blocks(download_set).await?; + let response = self.request_blocks(download_set).await; + Self::handle_response(response)?; Ok(()) } @@ -731,7 +732,8 @@ where // so the last peer to respond can't toggle our mempool self.recent_syncs.push_extend_tips_length(new_downloads); - self.request_blocks(download_set).await?; + let response = self.request_blocks(download_set).await; + Self::handle_response(response)?; Ok(()) } @@ -748,14 +750,28 @@ where // So we just download and verify the genesis block here. while !self.state_contains(self.genesis_hash).await? { info!("starting genesis block download and verify"); - self.downloads - .download_and_verify(self.genesis_hash) - .await - .map_err(|e| eyre!(e))?; - match self.downloads.next().await.expect("downloads is nonempty") { + + let response = self.downloads.download_and_verify(self.genesis_hash).await; + Self::handle_response(response).map_err(|e| eyre!(e))?; + + let response = self.downloads.next().await.expect("downloads is nonempty"); + + match response { Ok(hash) => trace!(?hash, "verified and committed block to state"), - Err(e) => { - warn!(?e, "could not download or verify genesis block, retrying"); + Err(error) => { + // TODO: exit syncer on permanent service errors (NetworkError, VerifierError) + if Self::should_restart_sync(&error) { + warn!( + ?error, + "could not download or verify genesis block, retrying" + ); + } else { + info!( + ?error, + "temporary error downloading or verifying genesis block, retrying" + ); + } + tokio::time::sleep(GENESIS_TIMEOUT_RETRY).await; } } @@ -765,7 +781,10 @@ where } /// Queue download and verify tasks for each block that isn't currently known to our node - async fn request_blocks(&mut self, hashes: IndexSet) -> Result<(), Report> { + async fn request_blocks( + &mut self, + hashes: IndexSet, + ) -> Result<(), BlockDownloadVerifyError> { debug!(hashes.len = hashes.len(), "requesting blocks"); for hash in hashes.into_iter() { self.downloads.download_and_verify(hash).await?; @@ -780,16 +799,30 @@ where /// expected error occurred, so that the synchronization can continue normally. /// /// Returns `Err` if an unexpected error occurred, to force the synchronizer to restart. - async fn handle_block_response( - &mut self, + fn handle_block_response( response: Result, - ) -> Result<(), ()> { + ) -> Result<(), BlockDownloadVerifyError> { match response { Ok(hash) => trace!(?hash, "verified and committed block to state"), - Err(error) => { - if Self::should_restart_sync(error) { - return Err(()); - } + Err(_) => return Self::handle_response(response.map(|_| ())), + } + + Ok(()) + } + + /// Handles a response to a syncer request. + /// + /// Returns `Ok` if the request was successful, or if an expected error occurred, + /// so that the synchronization can continue normally. + /// + /// Returns `Err` if an unexpected error occurred, to force the synchronizer to restart. + fn handle_response( + response: Result<(), BlockDownloadVerifyError>, + ) -> Result<(), BlockDownloadVerifyError> { + if let Err(error) = response { + // TODO: exit syncer on permanent service errors (NetworkError, VerifierError) + if Self::should_restart_sync(&error) { + return Err(error); } } @@ -830,9 +863,9 @@ where /// Return if the sync should be restarted based on the given error /// from the block downloader and verifier stream. - fn should_restart_sync(e: BlockDownloadVerifyError) -> bool { + fn should_restart_sync(e: &BlockDownloadVerifyError) -> bool { match e { - // Structural matches + // Structural matches: downcasts BlockDownloadVerifyError::Invalid(VerifyChainError::Checkpoint( VerifyCheckpointError::AlreadyVerified { .. }, )) => { @@ -847,6 +880,8 @@ where debug!(error = ?e, "block is already in chain, possibly from a previous sync run, continuing"); false } + + // Structural matches: direct BlockDownloadVerifyError::CancelledDuringDownload | BlockDownloadVerifyError::CancelledDuringVerification => { debug!(error = ?e, "block verification was cancelled, continuing"); @@ -860,6 +895,14 @@ where ); false } + BlockDownloadVerifyError::DuplicateBlockQueuedForDownload { .. } => { + debug!( + error = ?e, + "queued duplicate block hash for download, \ + assuming the syncer will eventually resolve duplicates, continuing" + ); + false + } // String matches BlockDownloadVerifyError::Invalid(VerifyChainError::Block( @@ -872,7 +915,9 @@ where BlockDownloadVerifyError::DownloadFailed(ref source) if format!("{:?}", source).contains("NotFound") => { - // Covers both NotFoundResponse and NotFoundRegistry errors. + // Covers these errors: + // - NotFoundResponse + // - NotFoundRegistry // // TODO: improve this by checking the type (#2908) // restart after a certain number of NotFound errors? @@ -887,13 +932,12 @@ where // become incorrect e.g. after some refactoring, and it is difficult // to write a test to check it. The test below is a best-effort // attempt to catch if that happens and log it. + // // TODO: add a proper test and remove this // https://github.com/ZcashFoundation/zebra/issues/2909 let err_str = format!("{:?}", e); if err_str.contains("AlreadyVerified") || err_str.contains("AlreadyInChain") - || err_str.contains("Cancelled") - || err_str.contains("BehindTipHeight") || err_str.contains("block is already committed to the state") || err_str.contains("NotFound") { diff --git a/zebrad/src/components/sync/downloads.rs b/zebrad/src/components/sync/downloads.rs index 1ba2ded0c9c..661aab34430 100644 --- a/zebrad/src/components/sync/downloads.rs +++ b/zebrad/src/components/sync/downloads.rs @@ -8,7 +8,6 @@ use std::{ task::{Context, Poll}, }; -use color_eyre::eyre::{eyre, Report}; use futures::{ future::TryFutureExt, ready, @@ -63,14 +62,17 @@ impl hedge::Policy for AlwaysHedge { #[derive(Error, Debug)] #[allow(dead_code)] pub enum BlockDownloadVerifyError { - #[error("error downloading block")] - DownloadFailed(#[source] BoxError), + #[error("error from the network service")] + NetworkError(#[source] BoxError), #[error("error from the verifier service")] VerifierError(#[source] BoxError), - #[error("block did not pass consensus validation")] - Invalid(#[from] zebra_consensus::chain::VerifyChainError), + #[error("duplicate block hash queued for download: {hash:?}")] + DuplicateBlockQueuedForDownload { hash: block::Hash }, + + #[error("error downloading block")] + DownloadFailed(#[source] BoxError), #[error("downloaded block was too far ahead of the chain tip")] AboveLookaheadHeightLimit, @@ -81,6 +83,9 @@ pub enum BlockDownloadVerifyError { #[error("downloaded block had an invalid height")] InvalidHeight, + #[error("block did not pass consensus validation")] + Invalid(#[from] zebra_consensus::chain::VerifyChainError), + #[error("block download / verification was cancelled during download")] CancelledDuringDownload, @@ -211,10 +216,13 @@ where /// only if the network service fails. It returns immediately after queuing /// the request. #[instrument(level = "debug", skip(self), fields(%hash))] - pub async fn download_and_verify(&mut self, hash: block::Hash) -> Result<(), Report> { + pub async fn download_and_verify( + &mut self, + hash: block::Hash, + ) -> Result<(), BlockDownloadVerifyError> { if self.cancel_handles.contains_key(&hash) { metrics::counter!("sync.already.queued.dropped.block.hash.count", 1); - return Err(eyre!("duplicate hash queued for download: {:?}", hash)); + return Err(BlockDownloadVerifyError::DuplicateBlockQueuedForDownload { hash }); } // We construct the block requests sequentially, waiting for the peer @@ -224,14 +232,12 @@ where // if we waited for readiness and did the service call in the spawned // tasks, all of the spawned tasks would race each other waiting for the // network to become ready. - debug!("waiting to request block"); let block_req = self .network .ready() .await - .map_err(|e| eyre!(e))? + .map_err(BlockDownloadVerifyError::NetworkError)? .call(zn::Request::BlocksByHash(std::iter::once(hash).collect())); - debug!("requested block"); // This oneshot is used to signal cancellation to the download task. let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>();