diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 4be92d59a4b..ce7d04ac0ac 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -952,7 +952,7 @@ impl BackFillSync { Err(e) => { // NOTE: under normal conditions this shouldn't happen but we handle it anyway warn!(self.log, "Could not send batch request"; - "batch_id" => batch_id, "error" => e, &batch); + "batch_id" => batch_id, "error" => ?e, &batch); // register the failed download and check if the batch can be retried if let Err(e) = batch.start_downloading_from_peer(peer, 1) { return self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)); diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index 400d382d6d4..2791623f3ff 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -82,7 +82,7 @@ impl RequestState for BlockRequestState { cx: &mut SyncNetworkContext, ) -> Result { cx.block_lookup_request(id, peer_id, self.requested_block_root) - .map_err(LookupRequestError::SendFailed) + .map_err(LookupRequestError::SendFailedNetwork) } fn send_for_processing( @@ -102,7 +102,7 @@ impl RequestState for BlockRequestState { RpcBlock::new_without_blobs(Some(block_root), value), seen_timestamp, ) - .map_err(LookupRequestError::SendFailed) + .map_err(LookupRequestError::SendFailedProcessor) } fn response_type() -> ResponseType { @@ -135,7 +135,7 @@ impl RequestState for BlobRequestState { self.block_root, downloaded_block_expected_blobs, ) - .map_err(LookupRequestError::SendFailed) + .map_err(LookupRequestError::SendFailedNetwork) } fn send_for_processing( @@ -150,7 +150,7 @@ impl RequestState for BlobRequestState { peer_id: _, } = download_result; cx.send_blobs_for_processing(id, block_root, value, seen_timestamp) - .map_err(LookupRequestError::SendFailed) + .map_err(LookupRequestError::SendFailedProcessor) } fn response_type() -> ResponseType { diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 48dda03facd..e89cd9e4ada 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -2,7 +2,7 @@ use self::parent_chain::{compute_parent_chains, NodeChain}; pub use self::single_block_lookup::DownloadResult; use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup}; use super::manager::{BlockProcessType, BlockProcessingResult}; -use super::network_context::{RpcProcessingResult, SyncNetworkContext}; +use super::network_context::{RpcResponseResult, SyncNetworkContext}; use crate::metrics; use crate::sync::block_lookups::common::{ResponseType, PARENT_DEPTH_TOLERANCE}; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; @@ -310,7 +310,7 @@ impl BlockLookups { &mut self, id: SingleLookupReqId, peer_id: PeerId, - response: RpcProcessingResult, + response: RpcResponseResult, cx: &mut SyncNetworkContext, ) { let result = self.on_download_response_inner::(id, peer_id, response, cx); @@ -322,7 +322,7 @@ impl BlockLookups { &mut self, id: SingleLookupReqId, peer_id: PeerId, - response: RpcProcessingResult, + response: RpcResponseResult, cx: &mut SyncNetworkContext, ) -> Result { // Note: no need to downscore peers here, already downscored on network context diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index b35a3e91fb6..28ac0378b3e 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -2,7 +2,9 @@ use super::common::ResponseType; use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS}; use crate::sync::block_lookups::common::RequestState; use crate::sync::block_lookups::Id; -use crate::sync::network_context::{LookupRequestResult, ReqId, SyncNetworkContext}; +use crate::sync::network_context::{ + LookupRequestResult, ReqId, RpcRequestSendError, SendErrorProcessor, SyncNetworkContext, +}; use beacon_chain::BeaconChainTypes; use derivative::Derivative; use itertools::Itertools; @@ -34,8 +36,10 @@ pub enum LookupRequestError { }, /// No peers left to serve this lookup NoPeers, - /// Error sending event to network or beacon processor - SendFailed(&'static str), + /// Error sending event to network + SendFailedNetwork(RpcRequestSendError), + /// Error sending event to processor + SendFailedProcessor(SendErrorProcessor), /// Inconsistent lookup request state BadState(String), /// Lookup failed for some other reason and should be dropped diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 8693bc0c6c4..fa1f50cee06 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -52,31 +52,43 @@ pub enum RpcEvent { RPCError(RPCError), } -pub type RpcProcessingResult = Result<(T, Duration), LookupFailure>; +pub type RpcResponseResult = Result<(T, Duration), RpcResponseError>; -pub enum LookupFailure { +pub enum RpcResponseError { RpcError(RPCError), - LookupVerifyError(LookupVerifyError), + VerifyError(LookupVerifyError), } -impl std::fmt::Display for LookupFailure { +#[derive(Debug, PartialEq, Eq)] +pub enum RpcRequestSendError { + /// Network channel send failed + NetworkSendError, +} + +#[derive(Debug, PartialEq, Eq)] +pub enum SendErrorProcessor { + SendError, + ProcessorNotAvailable, +} + +impl std::fmt::Display for RpcResponseError { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { - LookupFailure::RpcError(e) => write!(f, "RPC Error: {:?}", e), - LookupFailure::LookupVerifyError(e) => write!(f, "Lookup Verify Error: {:?}", e), + RpcResponseError::RpcError(e) => write!(f, "RPC Error: {:?}", e), + RpcResponseError::VerifyError(e) => write!(f, "Lookup Verify Error: {:?}", e), } } } -impl From for LookupFailure { +impl From for RpcResponseError { fn from(e: RPCError) -> Self { - LookupFailure::RpcError(e) + RpcResponseError::RpcError(e) } } -impl From for LookupFailure { +impl From for RpcResponseError { fn from(e: LookupVerifyError) -> Self { - LookupFailure::LookupVerifyError(e) + RpcResponseError::VerifyError(e) } } @@ -209,7 +221,7 @@ impl SyncNetworkContext { peer_id: PeerId, batch_type: ByRangeRequestType, request: BlocksByRangeRequest, - ) -> Result { + ) -> Result { let id = self.next_id(); trace!( self.log, @@ -218,11 +230,13 @@ impl SyncNetworkContext { "count" => request.count(), "peer" => %peer_id, ); - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request: Request::BlocksByRange(request.clone()), - request_id: RequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), - })?; + self.network_send + .send(NetworkMessage::SendRequest { + peer_id, + request: Request::BlocksByRange(request.clone()), + request_id: RequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), + }) + .map_err(|_| RpcRequestSendError::NetworkSendError)?; if matches!(batch_type, ByRangeRequestType::BlocksAndBlobs) { debug!( @@ -234,14 +248,16 @@ impl SyncNetworkContext { ); // Create the blob request based on the blocks request. - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request: Request::BlobsByRange(BlobsByRangeRequest { - start_slot: *request.start_slot(), - count: *request.count(), - }), - request_id: RequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), - })?; + self.network_send + .send(NetworkMessage::SendRequest { + peer_id, + request: Request::BlobsByRange(BlobsByRangeRequest { + start_slot: *request.start_slot(), + count: *request.count(), + }), + request_id: RequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), + }) + .map_err(|_| RpcRequestSendError::NetworkSendError)?; } Ok(id) @@ -254,7 +270,7 @@ impl SyncNetworkContext { batch_type: ByRangeRequestType, request: BlocksByRangeRequest, sender_id: RangeRequestId, - ) -> Result { + ) -> Result { let id = self.blocks_by_range_request(peer_id, batch_type, request)?; self.range_blocks_and_blobs_requests .insert(id, (sender_id, BlocksAndBlobsRequestInfo::new(batch_type))); @@ -320,7 +336,7 @@ impl SyncNetworkContext { lookup_id: SingleLookupId, peer_id: PeerId, block_root: Hash256, - ) -> Result { + ) -> Result { // da_checker includes block that are execution verified, but are missing components if self .chain @@ -357,11 +373,13 @@ impl SyncNetworkContext { let request = BlocksByRootSingleRequest(block_root); - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request: Request::BlocksByRoot(request.into_request(&self.chain.spec)), - request_id: RequestId::Sync(SyncRequestId::SingleBlock { id }), - })?; + self.network_send + .send(NetworkMessage::SendRequest { + peer_id, + request: Request::BlocksByRoot(request.into_request(&self.chain.spec)), + request_id: RequestId::Sync(SyncRequestId::SingleBlock { id }), + }) + .map_err(|_| RpcRequestSendError::NetworkSendError)?; self.blocks_by_root_requests .insert(id, ActiveBlocksByRootRequest::new(request)); @@ -381,7 +399,7 @@ impl SyncNetworkContext { peer_id: PeerId, block_root: Hash256, downloaded_block_expected_blobs: Option, - ) -> Result { + ) -> Result { let Some(expected_blobs) = downloaded_block_expected_blobs.or_else(|| { self.chain .data_availability_checker @@ -428,11 +446,13 @@ impl SyncNetworkContext { indices, }; - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request: Request::BlobsByRoot(request.clone().into_request(&self.chain.spec)), - request_id: RequestId::Sync(SyncRequestId::SingleBlob { id }), - })?; + self.network_send + .send(NetworkMessage::SendRequest { + peer_id, + request: Request::BlobsByRoot(request.clone().into_request(&self.chain.spec)), + request_id: RequestId::Sync(SyncRequestId::SingleBlob { id }), + }) + .map_err(|_| RpcRequestSendError::NetworkSendError)?; self.blobs_by_root_requests .insert(id, ActiveBlobsByRootRequest::new(request)); @@ -549,7 +569,7 @@ impl SyncNetworkContext { request_id: SingleLookupReqId, peer_id: PeerId, block: RpcEvent>>, - ) -> Option>>> { + ) -> Option>>> { let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else { return None; }; @@ -575,7 +595,7 @@ impl SyncNetworkContext { } }; - if let Err(LookupFailure::LookupVerifyError(e)) = &resp { + if let Err(RpcResponseError::VerifyError(e)) = &resp { self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } Some(resp) @@ -586,7 +606,7 @@ impl SyncNetworkContext { request_id: SingleLookupReqId, peer_id: PeerId, blob: RpcEvent>>, - ) -> Option>> { + ) -> Option>> { let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else { return None; }; @@ -618,7 +638,7 @@ impl SyncNetworkContext { // catch if a peer is returning more blobs than requested or if the excess blobs are // invalid. Err((e, resolved)) => { - if let LookupFailure::LookupVerifyError(e) = &e { + if let RpcResponseError::VerifyError(e) = &e { self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } if resolved { @@ -636,31 +656,27 @@ impl SyncNetworkContext { block_root: Hash256, block: RpcBlock, duration: Duration, - ) -> Result<(), &'static str> { - match self.beacon_processor_if_enabled() { - Some(beacon_processor) => { - debug!(self.log, "Sending block for processing"; "block" => ?block_root, "id" => id); - if let Err(e) = beacon_processor.send_rpc_beacon_block( - block_root, - block, - duration, - BlockProcessType::SingleBlock { id }, - ) { - error!( - self.log, - "Failed to send sync block to processor"; - "error" => ?e - ); - Err("beacon processor send failure") - } else { - Ok(()) - } - } - None => { - trace!(self.log, "Dropping block ready for processing. Beacon processor not available"; "block" => %block_root); - Err("beacon processor unavailable") - } - } + ) -> Result<(), SendErrorProcessor> { + let beacon_processor = self + .beacon_processor_if_enabled() + .ok_or(SendErrorProcessor::ProcessorNotAvailable)?; + + debug!(self.log, "Sending block for processing"; "block" => ?block_root, "id" => id); + beacon_processor + .send_rpc_beacon_block( + block_root, + block, + duration, + BlockProcessType::SingleBlock { id }, + ) + .map_err(|e| { + error!( + self.log, + "Failed to send sync block to processor"; + "error" => ?e + ); + SendErrorProcessor::SendError + }) } pub fn send_blobs_for_processing( @@ -669,31 +685,27 @@ impl SyncNetworkContext { block_root: Hash256, blobs: FixedBlobSidecarList, duration: Duration, - ) -> Result<(), &'static str> { - match self.beacon_processor_if_enabled() { - Some(beacon_processor) => { - debug!(self.log, "Sending blobs for processing"; "block" => ?block_root, "id" => id); - if let Err(e) = beacon_processor.send_rpc_blobs( - block_root, - blobs, - duration, - BlockProcessType::SingleBlob { id }, - ) { - error!( - self.log, - "Failed to send sync blobs to processor"; - "error" => ?e - ); - Err("beacon processor send failure") - } else { - Ok(()) - } - } - None => { - trace!(self.log, "Dropping blobs ready for processing. Beacon processor not available"; "block_root" => %block_root); - Err("beacon processor unavailable") - } - } + ) -> Result<(), SendErrorProcessor> { + let beacon_processor = self + .beacon_processor_if_enabled() + .ok_or(SendErrorProcessor::ProcessorNotAvailable)?; + + debug!(self.log, "Sending blobs for processing"; "block" => ?block_root, "id" => id); + beacon_processor + .send_rpc_blobs( + block_root, + blobs, + duration, + BlockProcessType::SingleBlob { id }, + ) + .map_err(|e| { + error!( + self.log, + "Failed to send sync blobs to processor"; + "error" => ?e + ); + SendErrorProcessor::SendError + }) } } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 9a6c99ebf6c..63cafa9aca4 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -923,7 +923,7 @@ impl SyncingChain { Err(e) => { // NOTE: under normal conditions this shouldn't happen but we handle it anyway warn!(self.log, "Could not send batch request"; - "batch_id" => batch_id, "error" => e, &batch); + "batch_id" => batch_id, "error" => ?e, &batch); // register the failed download and check if the batch can be retried batch.start_downloading_from_peer(peer, 1)?; // fake request_id is not relevant self.peers