From 46184e5ce49b09cf444c1a7b1e8b10ed9d9c4e13 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Fri, 8 Dec 2023 15:42:55 -0500 Subject: [PATCH] Remove delayed lookups (#4992) * initial rip out * fix unused imports * delete tests and fix lint * fix peers scoring for blobs --- .../src/data_availability_checker.rs | 25 -- .../gossip_methods.rs | 81 ++---- .../src/network_beacon_processor/mod.rs | 73 +----- .../network_beacon_processor/sync_methods.rs | 25 +- .../src/network_beacon_processor/tests.rs | 4 - beacon_node/network/src/router.rs | 6 - .../network/src/sync/block_lookups/common.rs | 47 +--- .../network/src/sync/block_lookups/mod.rs | 91 ++----- .../src/sync/block_lookups/parent_lookup.rs | 15 +- .../sync/block_lookups/single_block_lookup.rs | 111 +++------ .../network/src/sync/block_lookups/tests.rs | 233 +----------------- beacon_node/network/src/sync/manager.rs | 36 +-- 12 files changed, 94 insertions(+), 653 deletions(-) diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 6b327246a2e..67e98a01c1a 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -413,31 +413,6 @@ impl DataAvailabilityChecker { .incomplete_processing_components(slot) } - /// Determines whether we are at least the `single_lookup_delay` duration into the given slot. - /// If we are not currently in the Deneb fork, this delay is not considered. - /// - /// The `single_lookup_delay` is the duration we wait for a blocks or blobs to arrive over - /// gossip before making single block or blob requests. This is to minimize the number of - /// single lookup requests we end up making. - pub fn should_delay_lookup(&self, slot: Slot) -> bool { - if !self.is_deneb() { - return false; - } - - let current_or_future_slot = self - .slot_clock - .now() - .map_or(false, |current_slot| current_slot <= slot); - - let delay_threshold_unmet = self - .slot_clock - .millis_from_current_slot_start() - .map_or(false, |millis_into_slot| { - millis_into_slot < self.slot_clock.single_lookup_delay() - }); - current_or_future_slot && delay_threshold_unmet - } - /// The epoch at which we require a data availability check in block processing. /// `None` if the `Deneb` fork is disabled. pub fn data_availability_boundary(&self) -> Option { diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 5d98039a819..9d9b196e9be 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -4,8 +4,6 @@ use crate::{ service::NetworkMessage, sync::SyncMessage, }; -use std::collections::HashSet; - use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::store::Error; @@ -756,11 +754,6 @@ impl NetworkBeaconProcessor { let blob_slot = verified_blob.slot(); let blob_index = verified_blob.id().index; - let delay_lookup = self - .chain - .data_availability_checker - .should_delay_lookup(blob_slot); - match self.chain.process_gossip_blob(verified_blob).await { Ok(AvailabilityProcessingStatus::Imported(block_root)) => { // Note: Reusing block imported metric here @@ -772,29 +765,14 @@ impl NetworkBeaconProcessor { ); self.chain.recompute_head_at_current_slot().await; } - Ok(AvailabilityProcessingStatus::MissingComponents(_slot, block_root)) => { - if delay_lookup { - self.cache_peer(peer_id, &block_root); - trace!( - self.log, - "Processed blob, delaying lookup for other components"; - "slot" => %blob_slot, - "blob_index" => %blob_index, - "block_root" => %block_root, - ); - } else { - trace!( - self.log, - "Missing block components for gossip verified blob"; - "slot" => %blob_slot, - "blob_index" => %blob_index, - "block_root" => %block_root, - ); - self.send_sync_message(SyncMessage::MissingGossipBlockComponents( - vec![peer_id], - block_root, - )); - } + Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => { + trace!( + self.log, + "Processed blob, waiting for other components"; + "slot" => %slot, + "blob_index" => %blob_index, + "block_root" => %block_root, + ); } Err(err) => { debug!( @@ -818,18 +796,6 @@ impl NetworkBeaconProcessor { } } - /// Cache the peer id for the given block root. - fn cache_peer(self: &Arc, peer_id: PeerId, block_root: &Hash256) { - let mut guard = self.delayed_lookup_peers.lock(); - if let Some(peers) = guard.get_mut(block_root) { - peers.insert(peer_id); - } else { - let mut peers = HashSet::new(); - peers.insert(peer_id); - guard.push(*block_root, peers); - } - } - /// Process the beacon block received from the gossip network and: /// /// - If it passes gossip propagation criteria, tell the network thread to forward it. @@ -1170,11 +1136,6 @@ impl NetworkBeaconProcessor { let block = verified_block.block.block_cloned(); let block_root = verified_block.block_root; - let delay_lookup = self - .chain - .data_availability_checker - .should_delay_lookup(verified_block.block.slot()); - let result = self .chain .process_block_with_early_caching(block_root, verified_block, NotifyExecutionLayer::Yes) @@ -1209,26 +1170,12 @@ impl NetworkBeaconProcessor { self.chain.recompute_head_at_current_slot().await; } Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => { - if delay_lookup { - self.cache_peer(peer_id, block_root); - trace!( - self.log, - "Processed block, delaying lookup for other components"; - "slot" => slot, - "block_root" => %block_root, - ); - } else { - trace!( - self.log, - "Missing block components for gossip verified block"; - "slot" => slot, - "block_root" => %block_root, - ); - self.send_sync_message(SyncMessage::MissingGossipBlockComponents( - vec![peer_id], - *block_root, - )); - } + trace!( + self.log, + "Processed block, waiting for other components"; + "slot" => slot, + "block_root" => %block_root, + ); } Err(BlockError::ParentUnknown(block)) => { // Inform the sync manager to find parents for this block diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 2356a197cc2..67fc2fabb1e 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -18,11 +18,8 @@ use lighthouse_network::{ rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage}, Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, }; -use lru::LruCache; -use parking_lot::Mutex; -use slog::{crit, debug, error, trace, Logger}; -use slot_clock::{ManualSlotClock, SlotClock}; -use std::collections::HashSet; +use slog::{debug, Logger}; +use slot_clock::ManualSlotClock; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -30,7 +27,6 @@ use store::MemoryStore; use task_executor::test_utils::TestRuntime; use task_executor::TaskExecutor; use tokio::sync::mpsc::{self, error::TrySendError}; -use tokio::time::{interval_at, Instant}; use types::*; pub use sync_methods::ChainSegmentProcessId; @@ -44,7 +40,6 @@ mod sync_methods; mod tests; pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1; -pub const DELAYED_PEER_CACHE_SIZE: usize = 16; /// Defines if and where we will store the SSZ files of invalid blocks. #[derive(Clone)] @@ -65,7 +60,6 @@ pub struct NetworkBeaconProcessor { pub reprocess_tx: mpsc::Sender, pub network_globals: Arc>, pub invalid_block_storage: InvalidBlockStorage, - pub delayed_lookup_peers: Mutex>>, pub executor: TaskExecutor, pub log: Logger, } @@ -630,68 +624,6 @@ impl NetworkBeaconProcessor { "error" => %e) }); } - - /// This service is responsible for collecting lookup messages and sending them back to sync - /// for processing after a short delay. - /// - /// We want to delay lookups triggered from gossip for the following reasons: - /// - /// - We only want to make one request for components we are unlikely to see on gossip. This means - /// we don't have to repeatedly update our RPC request's state as we receive gossip components. - /// - /// - We are likely to receive blocks/blobs over gossip more quickly than we could via an RPC request. - /// - /// - Delaying a lookup means we are less likely to simultaneously download the same blocks/blobs - /// over gossip and RPC. - /// - /// - We would prefer to request peers based on whether we've seen them attest, because this gives - /// us an idea about whether they *should* have the block/blobs we're missing. This is because a - /// node should not attest to a block unless it has all the blobs for that block. This gives us a - /// stronger basis for peer scoring. - pub fn spawn_delayed_lookup_service(self: &Arc) { - let processor_clone = self.clone(); - let executor = self.executor.clone(); - let log = self.log.clone(); - let beacon_chain = self.chain.clone(); - executor.spawn( - async move { - let slot_duration = beacon_chain.slot_clock.slot_duration(); - let delay = beacon_chain.slot_clock.single_lookup_delay(); - let interval_start = match ( - beacon_chain.slot_clock.duration_to_next_slot(), - beacon_chain.slot_clock.seconds_from_current_slot_start(), - ) { - (Some(duration_to_next_slot), Some(seconds_from_current_slot_start)) => { - let duration_until_start = if seconds_from_current_slot_start > delay { - duration_to_next_slot + delay - } else { - delay - seconds_from_current_slot_start - }; - Instant::now() + duration_until_start - } - _ => { - crit!(log, - "Failed to read slot clock, delayed lookup service timing will be inaccurate.\ - This may degrade performance" - ); - Instant::now() - } - }; - - let mut interval = interval_at(interval_start, slot_duration); - loop { - interval.tick().await; - let Some(slot) = beacon_chain.slot_clock.now_or_genesis() else { - error!(log, "Skipping delayed lookup poll, unable to read slot clock"); - continue - }; - trace!(log, "Polling delayed lookups for slot: {slot}"); - processor_clone.poll_delayed_lookups(slot) - } - }, - "delayed_lookups", - ); - } } type TestBeaconChainType = @@ -734,7 +666,6 @@ impl NetworkBeaconProcessor> { reprocess_tx: work_reprocessing_tx, network_globals, invalid_block_storage: InvalidBlockStorage::Disabled, - delayed_lookup_peers: Mutex::new(LruCache::new(DELAYED_PEER_CACHE_SIZE)), executor: runtime.task_executor.clone(), log, }; diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index acfa069d355..95c1fa33e85 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -19,7 +19,7 @@ use beacon_processor::{ AsyncFn, BlockingFn, DuplicateCache, }; use lighthouse_network::PeerAction; -use slog::{debug, error, info, trace, warn}; +use slog::{debug, error, info, warn}; use slot_clock::SlotClock; use std::sync::Arc; use std::time::Duration; @@ -28,7 +28,7 @@ use store::KzgCommitment; use tokio::sync::mpsc; use types::beacon_block_body::format_kzg_commitments; use types::blob_sidecar::FixedBlobSidecarList; -use types::{Epoch, Hash256, Slot}; +use types::{Epoch, Hash256}; /// Id associated to a batch processing request, either a sync batch or a parent lookup. #[derive(Clone, Debug, PartialEq)] @@ -373,27 +373,6 @@ impl NetworkBeaconProcessor { }); } - /// Poll the beacon chain for any delayed lookups that are now available. - pub fn poll_delayed_lookups(&self, slot: Slot) { - let block_roots = self - .chain - .data_availability_checker - .incomplete_processing_components(slot); - if block_roots.is_empty() { - trace!(self.log, "No delayed lookups found on poll"); - } else { - debug!(self.log, "Found delayed lookups on poll"; "lookup_count" => block_roots.len()); - } - for block_root in block_roots { - if let Some(peer_ids) = self.delayed_lookup_peers.lock().pop(&block_root) { - self.send_sync_message(SyncMessage::MissingGossipBlockComponents( - peer_ids.into_iter().collect(), - block_root, - )); - } - } - } - /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync /// thread if more blocks are needed to process it. pub async fn process_chain_segment( diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 503d2f12618..844fc53ab17 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -1,7 +1,6 @@ #![cfg(not(debug_assertions))] // Tests are too slow in debug. #![cfg(test)] -use crate::network_beacon_processor::DELAYED_PEER_CACHE_SIZE; use crate::{ network_beacon_processor::{ ChainSegmentProcessId, DuplicateCache, InvalidBlockStorage, NetworkBeaconProcessor, @@ -24,8 +23,6 @@ use lighthouse_network::{ types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield}, Client, MessageId, NetworkGlobals, PeerId, Response, }; -use lru::LruCache; -use parking_lot::Mutex; use slot_clock::SlotClock; use std::iter::Iterator; use std::sync::Arc; @@ -223,7 +220,6 @@ impl TestRig { reprocess_tx: work_reprocessing_tx.clone(), network_globals: network_globals.clone(), invalid_block_storage: InvalidBlockStorage::Disabled, - delayed_lookup_peers: Mutex::new(LruCache::new(DELAYED_PEER_CACHE_SIZE)), executor: executor.clone(), log: log.clone(), }; diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 5d3dde90ce0..f56a3b7445e 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -21,8 +21,6 @@ use lighthouse_network::{ MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response, }; use logging::TimeLatch; -use lru::LruCache; -use parking_lot::Mutex; use slog::{crit, debug, o, trace}; use slog::{error, warn}; use std::sync::Arc; @@ -111,14 +109,10 @@ impl Router { reprocess_tx: beacon_processor_reprocess_tx, network_globals: network_globals.clone(), invalid_block_storage, - delayed_lookup_peers: Mutex::new(LruCache::new( - crate::network_beacon_processor::DELAYED_PEER_CACHE_SIZE, - )), executor: executor.clone(), log: log.clone(), }; let network_beacon_processor = Arc::new(network_beacon_processor); - network_beacon_processor.spawn_delayed_lookup_service(); // spawn the sync thread crate::sync::manager::spawn( diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index e089ef4fef3..7a1be46e69d 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -3,8 +3,7 @@ use crate::sync::block_lookups::single_block_lookup::{ LookupRequestError, LookupVerifyError, SingleBlockLookup, SingleLookupRequestState, State, }; use crate::sync::block_lookups::{ - BlobRequestState, BlockLookups, BlockRequestState, PeerShouldHave, - SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, + BlobRequestState, BlockLookups, BlockRequestState, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, }; use crate::sync::manager::{BlockProcessType, Id, SingleLookupReqId}; use crate::sync::network_context::SyncNetworkContext; @@ -13,7 +12,6 @@ use beacon_chain::data_availability_checker::{AvailabilityView, ChildComponents} use beacon_chain::{get_block_root, BeaconChainTypes}; use lighthouse_network::rpc::methods::BlobsByRootRequest; use lighthouse_network::rpc::BlocksByRootRequest; -use lighthouse_network::PeerId; use rand::prelude::IteratorRandom; use ssz_types::VariableList; use std::ops::IndexMut; @@ -89,7 +87,7 @@ pub trait RequestState { /* Request building methods */ /// Construct a new request. - fn build_request(&mut self) -> Result<(PeerShouldHave, Self::RequestType), LookupRequestError> { + fn build_request(&mut self) -> Result<(PeerId, Self::RequestType), LookupRequestError> { // Verify and construct request. self.too_many_attempts()?; let peer = self.get_peer()?; @@ -121,7 +119,7 @@ pub trait RequestState { id, req_counter: self.get_state().req_counter, }; - Self::make_request(id, peer_id.to_peer_id(), request, cx) + Self::make_request(id, peer_id, request, cx) } /// Verify the current request has not exceeded the maximum number of attempts. @@ -140,26 +138,15 @@ pub trait RequestState { /// Get the next peer to request. Draws from the set of peers we think should have both the /// block and blob first. If that fails, we draw from the set of peers that may have either. - fn get_peer(&mut self) -> Result { + fn get_peer(&mut self) -> Result { let request_state = self.get_state_mut(); - let available_peer_opt = request_state + let peer_id = request_state .available_peers .iter() .choose(&mut rand::thread_rng()) .copied() - .map(PeerShouldHave::BlockAndBlobs); - - let Some(peer_id) = available_peer_opt.or_else(|| { - request_state - .potential_peers - .iter() - .choose(&mut rand::thread_rng()) - .copied() - .map(PeerShouldHave::Neither) - }) else { - return Err(LookupRequestError::NoPeers); - }; - request_state.used_peers.insert(peer_id.to_peer_id()); + .ok_or(LookupRequestError::NoPeers)?; + request_state.used_peers.insert(peer_id); Ok(peer_id) } @@ -211,7 +198,7 @@ pub trait RequestState { &mut self, expected_block_root: Hash256, response: Option, - peer_id: PeerShouldHave, + peer_id: PeerId, ) -> Result, LookupVerifyError>; /// A getter for the parent root of the response. Returns an `Option` because we won't know @@ -241,11 +228,6 @@ pub trait RequestState { cx: &SyncNetworkContext, ) -> Result<(), LookupRequestError>; - /// Remove the peer from the lookup if it is useless. - fn remove_if_useless(&mut self, peer: &PeerId) { - self.get_state_mut().remove_peer_if_useless(peer) - } - /// Register a failure to process the block or blob. fn register_failure_downloading(&mut self) { self.get_state_mut().register_failure_downloading() @@ -290,7 +272,7 @@ impl RequestState for BlockRequestState &mut self, expected_block_root: Hash256, response: Option, - peer_id: PeerShouldHave, + peer_id: PeerId, ) -> Result>>, LookupVerifyError> { match response { Some(block) => { @@ -310,13 +292,8 @@ impl RequestState for BlockRequestState } } None => { - if peer_id.should_have_block() { - self.state.register_failure_downloading(); - Err(LookupVerifyError::NoBlockReturned) - } else { - self.state.state = State::AwaitingDownload; - Err(LookupVerifyError::BenignFailure) - } + self.state.register_failure_downloading(); + Err(LookupVerifyError::NoBlockReturned) } } } @@ -396,7 +373,7 @@ impl RequestState for BlobRequestState, - peer_id: PeerShouldHave, + peer_id: PeerId, ) -> Result>, LookupVerifyError> { match blob { Some(blob) => { diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index c5732069a00..62cdc4fa223 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -7,9 +7,7 @@ use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::block_lookups::common::LookupType; use crate::sync::block_lookups::parent_lookup::{ParentLookup, RequestError}; -use crate::sync::block_lookups::single_block_lookup::{ - CachedChild, LookupRequestError, LookupVerifyError, -}; +use crate::sync::block_lookups::single_block_lookup::{CachedChild, LookupRequestError}; use crate::sync::manager::{Id, SingleLookupReqId}; use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; pub use beacon_chain::data_availability_checker::ChildComponents; @@ -30,11 +28,9 @@ pub use single_block_lookup::{BlobRequestState, BlockRequestState}; use slog::{debug, error, trace, warn, Logger}; use smallvec::SmallVec; use std::collections::{HashMap, VecDeque}; -use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; use store::Hash256; -use strum::Display; use types::blob_sidecar::FixedBlobSidecarList; use types::Slot; @@ -49,43 +45,6 @@ pub type DownloadedBlock = (Hash256, RpcBlock); const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60; pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3; -/// This enum is used to track what a peer *should* be able to respond with based on -/// other messages we've seen from this peer on the network. This is useful for peer scoring. -/// We expect a peer tracked by the `BlockAndBlobs` variant to be able to respond to all -/// components of a block. This peer has either sent an attestation for the requested block -/// or has forwarded a block or blob that is a descendant of the requested block. An honest node -/// should not attest unless it has all components of a block, and it should not forward -/// messages if it does not have all components of the parent block. A peer tracked by the -/// `Neither` variant has likely just sent us a block or blob over gossip, in which case we -/// can't know whether the peer has all components of the block, and could be acting honestly -/// by forwarding a message without any other block components. -#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Display)] -pub enum PeerShouldHave { - BlockAndBlobs(PeerId), - Neither(PeerId), -} - -impl PeerShouldHave { - fn as_peer_id(&self) -> &PeerId { - match self { - PeerShouldHave::BlockAndBlobs(id) => id, - PeerShouldHave::Neither(id) => id, - } - } - fn to_peer_id(self) -> PeerId { - match self { - PeerShouldHave::BlockAndBlobs(id) => id, - PeerShouldHave::Neither(id) => id, - } - } - fn should_have_block(&self) -> bool { - match self { - PeerShouldHave::BlockAndBlobs(_) => true, - PeerShouldHave::Neither(_) => false, - } - } -} - pub struct BlockLookups { /// Parent chain lookups being downloaded. parent_lookups: SmallVec<[ParentLookup; 3]>, @@ -123,7 +82,7 @@ impl BlockLookups { pub fn search_block( &mut self, block_root: Hash256, - peer_source: &[PeerShouldHave], + peer_source: &[PeerId], cx: &mut SyncNetworkContext, ) { self.new_current_lookup(block_root, None, peer_source, cx) @@ -139,7 +98,7 @@ impl BlockLookups { &mut self, block_root: Hash256, child_components: ChildComponents, - peer_source: &[PeerShouldHave], + peer_source: &[PeerId], cx: &mut SyncNetworkContext, ) { self.new_current_lookup(block_root, Some(child_components), peer_source, cx) @@ -180,7 +139,7 @@ impl BlockLookups { &mut self, block_root: Hash256, child_components: Option>, - peers: &[PeerShouldHave], + peers: &[PeerId], cx: &mut SyncNetworkContext, ) { // Do not re-request a block that is already being requested @@ -248,9 +207,6 @@ impl BlockLookups { peer_id: PeerId, cx: &mut SyncNetworkContext, ) { - // Gossip blocks or blobs shouldn't be propagated if parents are unavailable. - let peer_source = PeerShouldHave::BlockAndBlobs(peer_id); - // If this block or it's parent is part of a known failed chain, ignore it. if self.failed_chains.contains(&parent_root) || self.failed_chains.contains(&block_root) { debug!(self.log, "Block is from a past failed chain. Dropping"; @@ -263,7 +219,7 @@ impl BlockLookups { if let Some(parent_lookup) = self.parent_lookups.iter_mut().find(|parent_req| { parent_req.contains_block(&block_root) || parent_req.is_for_block(block_root) }) { - parent_lookup.add_peer(peer_source); + parent_lookup.add_peer(peer_id); // we are already searching for this block, ignore it return; } @@ -279,7 +235,7 @@ impl BlockLookups { let parent_lookup = ParentLookup::new( block_root, parent_root, - peer_source, + peer_id, self.da_checker.clone(), cx, ); @@ -398,14 +354,8 @@ impl BlockLookups { "response_type" => ?response_type, "error" => ?e ); - if matches!(e, LookupVerifyError::BenignFailure) { - request_state - .get_state_mut() - .remove_peer_if_useless(&peer_id); - } else { - let msg = e.into(); - cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); - }; + let msg = e.into(); + cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); request_state.register_failure_downloading(); lookup.request_block_and_blobs(cx)?; @@ -456,7 +406,7 @@ impl BlockLookups { // we should penalize the blobs peer because they did not provide all blobs on the // initial request. if lookup.both_components_downloaded() { - lookup.penalize_blob_peer(false, cx); + lookup.penalize_blob_peer(cx); lookup .blob_request_state .state @@ -619,15 +569,6 @@ impl BlockLookups { "bbroot_failed_chains", ); } - ParentVerifyError::BenignFailure => { - trace!( - self.log, - "Requested peer could not respond to block request, requesting a new peer"; - ); - let request_state = R::request_state_mut(&mut parent_lookup.current_parent_request); - request_state.remove_if_useless(&peer_id); - parent_lookup.request_parent(cx)?; - } } Ok(()) } @@ -846,7 +787,7 @@ impl BlockLookups { request_state.get_state_mut().component_processed = true; if lookup.both_components_processed() { - lookup.penalize_blob_peer(false, cx); + lookup.penalize_blob_peer(cx); // Try it again if possible. lookup @@ -864,7 +805,7 @@ impl BlockLookups { &mut self, cx: &mut SyncNetworkContext, mut lookup: SingleBlockLookup, - peer_id: PeerShouldHave, + peer_id: PeerId, e: BlockError, ) -> Result>, LookupRequestError> { let root = lookup.block_root(); @@ -884,7 +825,7 @@ impl BlockLookups { let parent_root = block.parent_root(); lookup.add_child_components(block.into()); lookup.request_block_and_blobs(cx)?; - self.search_parent(slot, root, parent_root, peer_id.to_peer_id(), cx); + self.search_parent(slot, root, parent_root, peer_id, cx); } ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { // These errors indicate that the execution layer is offline @@ -920,7 +861,7 @@ impl BlockLookups { warn!(self.log, "Peer sent invalid block in single block lookup"; "root" => %root, "error" => ?other, "peer_id" => %peer_id); if let Ok(block_peer) = lookup.block_request_state.state.processing_peer() { cx.report_peer( - block_peer.to_peer_id(), + block_peer, PeerAction::MidToleranceError, "single_block_failure", ); @@ -1141,13 +1082,9 @@ impl BlockLookups { let Ok(block_peer_id) = parent_lookup.block_processing_peer() else { return; }; - let block_peer_id = block_peer_id.to_peer_id(); // We may not have a blob peer, if there were no blobs required for this block. - let blob_peer_id = parent_lookup - .blob_processing_peer() - .ok() - .map(PeerShouldHave::to_peer_id); + let blob_peer_id = parent_lookup.blob_processing_peer().ok(); // all else we consider the chain a failure and downvote the peer that sent // us the last block diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 93bd2f57c09..5c2e90b48c9 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -1,5 +1,5 @@ use super::single_block_lookup::{LookupRequestError, LookupVerifyError, SingleBlockLookup}; -use super::{DownloadedBlock, PeerShouldHave}; +use super::{DownloadedBlock, PeerId}; use crate::sync::block_lookups::common::Parent; use crate::sync::block_lookups::common::RequestState; use crate::sync::{manager::SLOT_IMPORT_TOLERANCE, network_context::SyncNetworkContext}; @@ -8,7 +8,6 @@ use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_availability_checker::{ChildComponents, DataAvailabilityChecker}; use beacon_chain::BeaconChainTypes; use itertools::Itertools; -use lighthouse_network::PeerId; use std::collections::VecDeque; use std::sync::Arc; use store::Hash256; @@ -41,7 +40,6 @@ pub enum ParentVerifyError { ExtraBlobsReturned, InvalidIndex(u64), PreviousFailure { parent_root: Hash256 }, - BenignFailure, } #[derive(Debug, PartialEq, Eq)] @@ -61,7 +59,7 @@ impl ParentLookup { pub fn new( block_root: Hash256, parent_root: Hash256, - peer_id: PeerShouldHave, + peer_id: PeerId, da_checker: Arc>, cx: &mut SyncNetworkContext, ) -> Self { @@ -126,14 +124,14 @@ impl ParentLookup { .update_requested_parent_block(next_parent) } - pub fn block_processing_peer(&self) -> Result { + pub fn block_processing_peer(&self) -> Result { self.current_parent_request .block_request_state .state .processing_peer() } - pub fn blob_processing_peer(&self) -> Result { + pub fn blob_processing_peer(&self) -> Result { self.current_parent_request .blob_request_state .state @@ -211,12 +209,12 @@ impl ParentLookup { Ok(root_and_verified) } - pub fn add_peer(&mut self, peer: PeerShouldHave) { + pub fn add_peer(&mut self, peer: PeerId) { self.current_parent_request.add_peer(peer) } /// Adds a list of peers to the parent request. - pub fn add_peers(&mut self, peers: &[PeerShouldHave]) { + pub fn add_peers(&mut self, peers: &[PeerId]) { self.current_parent_request.add_peers(peers) } @@ -248,7 +246,6 @@ impl From for ParentVerifyError { E::ExtraBlobsReturned => ParentVerifyError::ExtraBlobsReturned, E::InvalidIndex(index) => ParentVerifyError::InvalidIndex(index), E::NotEnoughBlobsReturned => ParentVerifyError::NotEnoughBlobsReturned, - E::BenignFailure => ParentVerifyError::BenignFailure, } } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index e0f7d880949..e10e8328cde 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -1,4 +1,4 @@ -use super::PeerShouldHave; +use super::PeerId; use crate::sync::block_lookups::common::{Lookup, RequestState}; use crate::sync::block_lookups::Id; use crate::sync::network_context::SyncNetworkContext; @@ -8,7 +8,7 @@ use beacon_chain::data_availability_checker::{ }; use beacon_chain::data_availability_checker::{AvailabilityView, ChildComponents}; use beacon_chain::BeaconChainTypes; -use lighthouse_network::{PeerAction, PeerId}; +use lighthouse_network::PeerAction; use slog::{trace, Logger}; use std::collections::HashSet; use std::fmt::Debug; @@ -22,8 +22,8 @@ use types::EthSpec; #[derive(Debug, PartialEq, Eq)] pub enum State { AwaitingDownload, - Downloading { peer_id: PeerShouldHave }, - Processing { peer_id: PeerShouldHave }, + Downloading { peer_id: PeerId }, + Processing { peer_id: PeerId }, } #[derive(Debug, PartialEq, Eq, IntoStaticStr)] @@ -35,10 +35,6 @@ pub enum LookupVerifyError { ExtraBlobsReturned, NotEnoughBlobsReturned, InvalidIndex(u64), - /// We don't have enough information to know - /// whether the peer is at fault or simply missed - /// what was requested on gossip. - BenignFailure, } #[derive(Debug, PartialEq, Eq, IntoStaticStr)] @@ -66,7 +62,7 @@ impl SingleBlockLookup { pub fn new( requested_block_root: Hash256, child_components: Option>, - peers: &[PeerShouldHave], + peers: &[PeerId], da_checker: Arc>, id: Id, ) -> Self { @@ -191,21 +187,13 @@ impl SingleBlockLookup { } /// Add all given peers to both block and blob request states. - pub fn add_peer(&mut self, peer: PeerShouldHave) { - match peer { - PeerShouldHave::BlockAndBlobs(peer_id) => { - self.block_request_state.state.add_peer(&peer_id); - self.blob_request_state.state.add_peer(&peer_id); - } - PeerShouldHave::Neither(peer_id) => { - self.block_request_state.state.add_potential_peer(&peer_id); - self.blob_request_state.state.add_potential_peer(&peer_id); - } - } + pub fn add_peer(&mut self, peer_id: PeerId) { + self.block_request_state.state.add_peer(&peer_id); + self.blob_request_state.state.add_peer(&peer_id); } /// Add all given peers to both block and blob request states. - pub fn add_peers(&mut self, peers: &[PeerShouldHave]) { + pub fn add_peers(&mut self, peers: &[PeerId]) { for peer in peers { self.add_peer(*peer); } @@ -293,38 +281,31 @@ impl SingleBlockLookup { } } - /// Penalizes a blob peer if it should have blobs but didn't return them to us. Does not penalize - /// a peer who we request blobs from based on seeing a block or blobs over gossip. This may - /// have been a benign failure. - pub fn penalize_blob_peer(&mut self, penalize_always: bool, cx: &SyncNetworkContext) { + /// Penalizes a blob peer if it should have blobs but didn't return them to us. + pub fn penalize_blob_peer(&mut self, cx: &SyncNetworkContext) { if let Ok(blob_peer) = self.blob_request_state.state.processing_peer() { - if penalize_always || matches!(blob_peer, PeerShouldHave::BlockAndBlobs(_)) { - cx.report_peer( - blob_peer.to_peer_id(), - PeerAction::MidToleranceError, - "single_blob_failure", - ); - } - self.blob_request_state - .state - .remove_peer_if_useless(blob_peer.as_peer_id()); + cx.report_peer( + blob_peer, + PeerAction::MidToleranceError, + "single_blob_failure", + ); } } - /// This failure occurs on download, so register a failure downloading, penalize the peer if - /// necessary and clear the blob cache. + /// This failure occurs on download, so register a failure downloading, penalize the peer + /// and clear the blob cache. pub fn handle_consistency_failure(&mut self, cx: &SyncNetworkContext) { - self.penalize_blob_peer(false, cx); + self.penalize_blob_peer(cx); if let Some(cached_child) = self.child_components.as_mut() { cached_child.clear_blobs(); } self.blob_request_state.state.register_failure_downloading() } - /// This failure occurs after processing, so register a failure processing, penalize the peer if - /// necessary and clear the blob cache. + /// This failure occurs after processing, so register a failure processing, penalize the peer + /// and clear the blob cache. pub fn handle_availability_check_failure(&mut self, cx: &SyncNetworkContext) { - self.penalize_blob_peer(true, cx); + self.penalize_blob_peer(cx); if let Some(cached_child) = self.child_components.as_mut() { cached_child.clear_blobs(); } @@ -345,7 +326,7 @@ pub struct BlobRequestState { } impl BlobRequestState { - pub fn new(block_root: Hash256, peer_source: &[PeerShouldHave], is_deneb: bool) -> Self { + pub fn new(block_root: Hash256, peer_source: &[PeerId], is_deneb: bool) -> Self { let default_ids = MissingBlobs::new_without_block(block_root, is_deneb); Self { requested_ids: default_ids, @@ -364,7 +345,7 @@ pub struct BlockRequestState { } impl BlockRequestState { - pub fn new(block_root: Hash256, peers: &[PeerShouldHave]) -> Self { + pub fn new(block_root: Hash256, peers: &[PeerId]) -> Self { Self { requested_block_root: block_root, state: SingleLookupRequestState::new(peers), @@ -396,8 +377,6 @@ pub struct SingleLookupRequestState { pub state: State, /// Peers that should have this block or blob. pub available_peers: HashSet, - /// Peers that mar or may not have this block or blob. - pub potential_peers: HashSet, /// Peers from which we have requested this block. pub used_peers: HashSet, /// How many times have we attempted to process this block or blob. @@ -417,24 +396,15 @@ pub struct SingleLookupRequestState { } impl SingleLookupRequestState { - pub fn new(peers: &[PeerShouldHave]) -> Self { + pub fn new(peers: &[PeerId]) -> Self { let mut available_peers = HashSet::default(); - let mut potential_peers = HashSet::default(); - for peer in peers { - match peer { - PeerShouldHave::BlockAndBlobs(peer_id) => { - available_peers.insert(*peer_id); - } - PeerShouldHave::Neither(peer_id) => { - potential_peers.insert(*peer_id); - } - } + for peer in peers.iter().copied() { + available_peers.insert(peer); } Self { state: State::AwaitingDownload, available_peers, - potential_peers, used_peers: HashSet::default(), failed_processing: 0, failed_downloading: 0, @@ -462,25 +432,16 @@ impl SingleLookupRequestState { self.failed_processing + self.failed_downloading } - /// This method should be used for peers wrapped in `PeerShouldHave::BlockAndBlobs`. + /// This method should be used for peers wrapped in `PeerId::BlockAndBlobs`. pub fn add_peer(&mut self, peer_id: &PeerId) { - self.potential_peers.remove(peer_id); self.available_peers.insert(*peer_id); } - /// This method should be used for peers wrapped in `PeerShouldHave::Neither`. - pub fn add_potential_peer(&mut self, peer_id: &PeerId) { - if !self.available_peers.contains(peer_id) { - self.potential_peers.insert(*peer_id); - } - } - /// If a peer disconnects, this request could be failed. If so, an error is returned pub fn check_peer_disconnected(&mut self, dc_peer_id: &PeerId) -> Result<(), ()> { self.available_peers.remove(dc_peer_id); - self.potential_peers.remove(dc_peer_id); if let State::Downloading { peer_id } = &self.state { - if peer_id.as_peer_id() == dc_peer_id { + if peer_id == dc_peer_id { // Peer disconnected before providing a block self.register_failure_downloading(); return Err(()); @@ -491,21 +452,13 @@ impl SingleLookupRequestState { /// Returns the id peer we downloaded from if we have downloaded a verified block, otherwise /// returns an error. - pub fn processing_peer(&self) -> Result { + pub fn processing_peer(&self) -> Result { if let State::Processing { peer_id } = &self.state { Ok(*peer_id) } else { Err(()) } } - - /// Remove the given peer from the set of potential peers, so long as there is at least one - /// other potential peer or we have any available peers. - pub fn remove_peer_if_useless(&mut self, peer_id: &PeerId) { - if !self.available_peers.is_empty() || self.potential_peers.len() > 1 { - self.potential_peers.remove(peer_id); - } - } } impl slog::Value for SingleBlockLookup { @@ -609,7 +562,7 @@ mod tests { #[test] fn test_happy_path() { - let peer_id = PeerShouldHave::BlockAndBlobs(PeerId::random()); + let peer_id = PeerId::random(); let block = rand_block(); let spec = E::default_spec(); let slot_clock = TestingSlotClock::new( @@ -649,7 +602,7 @@ mod tests { #[test] fn test_block_lookup_failures() { - let peer_id = PeerShouldHave::BlockAndBlobs(PeerId::random()); + let peer_id = PeerId::random(); let block = rand_block(); let spec = E::default_spec(); let slot_clock = TestingSlotClock::new( diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 9dd7395a4e0..83f0b26156b 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -233,11 +233,7 @@ fn test_single_block_lookup_happy_path() { let peer_id = PeerId::random(); let block_root = block.canonical_root(); // Trigger the request - bl.search_block( - block_root, - &[PeerShouldHave::BlockAndBlobs(peer_id)], - &mut cx, - ); + bl.search_block(block_root, &[peer_id], &mut cx); let id = rig.expect_lookup_request(response_type); // If we're in deneb, a blob request should have been triggered as well, // we don't require a response because we're generateing 0-blob blocks in this test. @@ -285,11 +281,7 @@ fn test_single_block_lookup_empty_response() { let peer_id = PeerId::random(); // Trigger the request - bl.search_block( - block_hash, - &[PeerShouldHave::BlockAndBlobs(peer_id)], - &mut cx, - ); + bl.search_block(block_hash, &[peer_id], &mut cx); let id = rig.expect_lookup_request(response_type); // If we're in deneb, a blob request should have been triggered as well, // we don't require a response because we're generateing 0-blob blocks in this test. @@ -317,11 +309,7 @@ fn test_single_block_lookup_wrong_response() { let peer_id = PeerId::random(); // Trigger the request - bl.search_block( - block_hash, - &[PeerShouldHave::BlockAndBlobs(peer_id)], - &mut cx, - ); + bl.search_block(block_hash, &[peer_id], &mut cx); let id = rig.expect_lookup_request(response_type); // If we're in deneb, a blob request should have been triggered as well, // we don't require a response because we're generateing 0-blob blocks in this test. @@ -359,11 +347,7 @@ fn test_single_block_lookup_failure() { let peer_id = PeerId::random(); // Trigger the request - bl.search_block( - block_hash, - &[PeerShouldHave::BlockAndBlobs(peer_id)], - &mut cx, - ); + bl.search_block(block_hash, &[peer_id], &mut cx); let id = rig.expect_lookup_request(response_type); // If we're in deneb, a blob request should have been triggered as well, // we don't require a response because we're generateing 0-blob blocks in this test. @@ -395,11 +379,7 @@ fn test_single_block_lookup_becomes_parent_request() { let peer_id = PeerId::random(); // Trigger the request - bl.search_block( - block.canonical_root(), - &[PeerShouldHave::BlockAndBlobs(peer_id)], - &mut cx, - ); + bl.search_block(block.canonical_root(), &[peer_id], &mut cx); let id = rig.expect_lookup_request(response_type); // If we're in deneb, a blob request should have been triggered as well, // we don't require a response because we're generateing 0-blob blocks in this test. @@ -981,11 +961,7 @@ fn test_single_block_lookup_ignored_response() { let peer_id = PeerId::random(); // Trigger the request - bl.search_block( - block.canonical_root(), - &[PeerShouldHave::BlockAndBlobs(peer_id)], - &mut cx, - ); + bl.search_block(block.canonical_root(), &[peer_id], &mut cx); let id = rig.expect_lookup_request(response_type); // If we're in deneb, a blob request should have been triggered as well, // we don't require a response because we're generateing 0-blob blocks in this test. @@ -1205,7 +1181,6 @@ mod deneb_only { AttestationUnknownBlock, GossipUnknownParentBlock, GossipUnknownParentBlob, - GossipUnknownBlockOrBlob, } impl DenebTester { @@ -1234,11 +1209,7 @@ mod deneb_only { let (block_req_id, blob_req_id, parent_block_req_id, parent_blob_req_id) = match request_trigger { RequestTrigger::AttestationUnknownBlock => { - bl.search_block( - block_root, - &[PeerShouldHave::BlockAndBlobs(peer_id)], - &mut cx, - ); + bl.search_block(block_root, &[peer_id], &mut cx); let block_req_id = rig.expect_lookup_request(ResponseType::Block); let blob_req_id = rig.expect_lookup_request(ResponseType::Blob); (Some(block_req_id), Some(blob_req_id), None, None) @@ -1261,7 +1232,7 @@ mod deneb_only { bl.search_child_block( child_root, ChildComponents::new(child_root, Some(child_block), None), - &[PeerShouldHave::Neither(peer_id)], + &[peer_id], &mut cx, ); @@ -1299,7 +1270,7 @@ mod deneb_only { bl.search_child_block( child_root, ChildComponents::new(child_root, None, Some(blobs)), - &[PeerShouldHave::Neither(peer_id)], + &[peer_id], &mut cx, ); @@ -1316,12 +1287,6 @@ mod deneb_only { Some(parent_blob_req_id), ) } - RequestTrigger::GossipUnknownBlockOrBlob => { - bl.search_block(block_root, &[PeerShouldHave::Neither(peer_id)], &mut cx); - let block_req_id = rig.expect_lookup_request(ResponseType::Block); - let blob_req_id = rig.expect_lookup_request(ResponseType::Blob); - (Some(block_req_id), Some(blob_req_id), None, None) - } }; Some(Self { @@ -1838,186 +1803,6 @@ mod deneb_only { .block_response_triggering_process(); } - #[test] - fn single_block_and_blob_lookup_block_returned_first_gossip() { - let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownBlockOrBlob) else { - return; - }; - - tester - .block_response_triggering_process() - .blobs_response() - .blobs_response_was_valid() - .block_imported(); - } - - #[test] - fn single_block_and_blob_lookup_blobs_returned_first_gossip() { - let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownBlockOrBlob) else { - return; - }; - - tester - .blobs_response() - .blobs_response_was_valid() - .block_response_triggering_process() - .block_imported(); - } - - #[test] - fn single_block_and_blob_lookup_empty_response_gossip() { - let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownBlockOrBlob) else { - return; - }; - - tester - .empty_block_response() - .expect_block_request() - .expect_no_penalty() - .expect_no_blobs_request() - .empty_blobs_response() - .expect_no_penalty() - .expect_no_block_request() - .expect_no_blobs_request() - .block_response_triggering_process() - .missing_components_from_block_request(); - } - - #[test] - fn single_block_response_then_empty_blob_response_gossip() { - let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownBlockOrBlob) else { - return; - }; - - tester - .block_response_triggering_process() - .missing_components_from_block_request() - .empty_blobs_response() - .missing_components_from_blob_request() - .expect_blobs_request() - .expect_no_penalty() - .expect_no_block_request(); - } - - #[test] - fn single_blob_response_then_empty_block_response_gossip() { - let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownBlockOrBlob) else { - return; - }; - - tester - .blobs_response() - .blobs_response_was_valid() - .expect_no_penalty() - .expect_no_block_request() - .expect_no_blobs_request() - .missing_components_from_blob_request() - .empty_block_response() - .expect_block_request() - .expect_no_penalty() - .expect_no_blobs_request(); - } - - #[test] - fn single_invalid_block_response_then_blob_response_gossip() { - let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownBlockOrBlob) else { - return; - }; - - tester - .block_response_triggering_process() - .invalid_block_processed() - .expect_penalty() - .expect_block_request() - .expect_no_blobs_request() - .blobs_response() - .missing_components_from_blob_request() - .expect_no_penalty() - .expect_no_block_request() - .expect_no_block_request(); - } - - #[test] - fn single_block_response_then_invalid_blob_response_gossip() { - let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownBlockOrBlob) else { - return; - }; - - tester - .block_response_triggering_process() - .missing_components_from_block_request() - .blobs_response() - .invalid_blob_processed() - .expect_penalty() - .expect_blobs_request() - .expect_no_block_request(); - } - - #[test] - fn single_block_response_then_too_few_blobs_response_gossip() { - let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownBlockOrBlob) else { - return; - }; - - tester - .block_response_triggering_process() - .missing_components_from_block_request() - .invalidate_blobs_too_few() - .blobs_response() - .missing_components_from_blob_request() - .expect_blobs_request() - .expect_no_penalty() - .expect_no_block_request(); - } - - #[test] - fn single_block_response_then_too_many_blobs_response_gossip() { - let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownBlockOrBlob) else { - return; - }; - - tester - .block_response_triggering_process() - .invalidate_blobs_too_many() - .blobs_response() - .expect_penalty() - .expect_blobs_request() - .expect_no_block_request(); - } - #[test] - fn too_few_blobs_response_then_block_response_gossip() { - let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownBlockOrBlob) else { - return; - }; - - tester - .invalidate_blobs_too_few() - .blobs_response() - .blobs_response_was_valid() - .missing_components_from_blob_request() - .expect_no_penalty() - .expect_no_blobs_request() - .expect_no_block_request() - .block_response_triggering_process() - .missing_components_from_block_request() - .expect_blobs_request(); - } - - #[test] - fn too_many_blobs_response_then_block_response_gossip() { - let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownBlockOrBlob) else { - return; - }; - - tester - .invalidate_blobs_too_many() - .blobs_response() - .expect_penalty() - .expect_blobs_request() - .expect_no_block_request() - .block_response_triggering_process(); - } - #[test] fn parent_block_unknown_parent() { let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownParentBlock) else { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 3bd32308ae8..bcb239aaa05 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -34,7 +34,7 @@ //! search for the block and subsequently search for parents if needed. use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; -use super::block_lookups::{BlockLookups, PeerShouldHave}; +use super::block_lookups::BlockLookups; use super::network_context::{BlockOrBlob, SyncNetworkContext}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; @@ -139,13 +139,6 @@ pub enum SyncMessage { /// manager to attempt to find the block matching the unknown hash. UnknownBlockHashFromAttestation(PeerId, Hash256), - /// A peer has sent a blob that references a block that is unknown or a peer has sent a block for - /// which we haven't received blobs. - /// - /// We will either attempt to find the block matching the unknown hash immediately or queue a lookup, - /// which will then trigger the request when we receive `MissingGossipBlockComponentsDelayed`. - MissingGossipBlockComponents(Vec, Hash256), - /// A peer has disconnected. Disconnect(PeerId), @@ -658,31 +651,8 @@ impl SyncManager { SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash) => { // If we are not synced, ignore this block. if self.synced_and_connected(&peer_id) { - self.block_lookups.search_block( - block_hash, - &[PeerShouldHave::BlockAndBlobs(peer_id)], - &mut self.network, - ); - } - } - SyncMessage::MissingGossipBlockComponents(peer_id, block_root) => { - let peers_guard = self.network_globals().peers.read(); - let connected_peers = peer_id - .into_iter() - .filter_map(|peer_id| { - if peers_guard.is_connected(&peer_id) { - Some(PeerShouldHave::Neither(peer_id)) - } else { - None - } - }) - .collect::>(); - drop(peers_guard); - - // If we are not synced, ignore this block. - if self.synced() && !connected_peers.is_empty() { self.block_lookups - .search_block(block_root, &connected_peers, &mut self.network) + .search_block(block_hash, &[peer_id], &mut self.network); } } SyncMessage::Disconnect(peer_id) => { @@ -766,7 +736,7 @@ impl SyncManager { self.block_lookups.search_child_block( block_root, child_components, - &[PeerShouldHave::Neither(peer_id)], + &[peer_id], &mut self.network, ); }