Skip to content

Commit

Permalink
Remove delayed lookups (#4992)
Browse files Browse the repository at this point in the history
* initial rip out

* fix unused imports

* delete tests and fix lint

* fix peers scoring for blobs
  • Loading branch information
realbigsean authored Dec 8, 2023
1 parent b882519 commit 46184e5
Show file tree
Hide file tree
Showing 12 changed files with 94 additions and 653 deletions.
25 changes: 0 additions & 25 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,31 +413,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.incomplete_processing_components(slot)
}

/// Determines whether we are at least the `single_lookup_delay` duration into the given slot.
/// If we are not currently in the Deneb fork, this delay is not considered.
///
/// The `single_lookup_delay` is the duration we wait for a blocks or blobs to arrive over
/// gossip before making single block or blob requests. This is to minimize the number of
/// single lookup requests we end up making.
pub fn should_delay_lookup(&self, slot: Slot) -> bool {
if !self.is_deneb() {
return false;
}

let current_or_future_slot = self
.slot_clock
.now()
.map_or(false, |current_slot| current_slot <= slot);

let delay_threshold_unmet = self
.slot_clock
.millis_from_current_slot_start()
.map_or(false, |millis_into_slot| {
millis_into_slot < self.slot_clock.single_lookup_delay()
});
current_or_future_slot && delay_threshold_unmet
}

/// The epoch at which we require a data availability check in block processing.
/// `None` if the `Deneb` fork is disabled.
pub fn data_availability_boundary(&self) -> Option<Epoch> {
Expand Down
81 changes: 14 additions & 67 deletions beacon_node/network/src/network_beacon_processor/gossip_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ use crate::{
service::NetworkMessage,
sync::SyncMessage,
};
use std::collections::HashSet;

use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::store::Error;
Expand Down Expand Up @@ -756,11 +754,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let blob_slot = verified_blob.slot();
let blob_index = verified_blob.id().index;

let delay_lookup = self
.chain
.data_availability_checker
.should_delay_lookup(blob_slot);

match self.chain.process_gossip_blob(verified_blob).await {
Ok(AvailabilityProcessingStatus::Imported(block_root)) => {
// Note: Reusing block imported metric here
Expand All @@ -772,29 +765,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
self.chain.recompute_head_at_current_slot().await;
}
Ok(AvailabilityProcessingStatus::MissingComponents(_slot, block_root)) => {
if delay_lookup {
self.cache_peer(peer_id, &block_root);
trace!(
self.log,
"Processed blob, delaying lookup for other components";
"slot" => %blob_slot,
"blob_index" => %blob_index,
"block_root" => %block_root,
);
} else {
trace!(
self.log,
"Missing block components for gossip verified blob";
"slot" => %blob_slot,
"blob_index" => %blob_index,
"block_root" => %block_root,
);
self.send_sync_message(SyncMessage::MissingGossipBlockComponents(
vec![peer_id],
block_root,
));
}
Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => {
trace!(
self.log,
"Processed blob, waiting for other components";
"slot" => %slot,
"blob_index" => %blob_index,
"block_root" => %block_root,
);
}
Err(err) => {
debug!(
Expand All @@ -818,18 +796,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}

/// Cache the peer id for the given block root.
fn cache_peer(self: &Arc<Self>, peer_id: PeerId, block_root: &Hash256) {
let mut guard = self.delayed_lookup_peers.lock();
if let Some(peers) = guard.get_mut(block_root) {
peers.insert(peer_id);
} else {
let mut peers = HashSet::new();
peers.insert(peer_id);
guard.push(*block_root, peers);
}
}

/// Process the beacon block received from the gossip network and:
///
/// - If it passes gossip propagation criteria, tell the network thread to forward it.
Expand Down Expand Up @@ -1170,11 +1136,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let block = verified_block.block.block_cloned();
let block_root = verified_block.block_root;

let delay_lookup = self
.chain
.data_availability_checker
.should_delay_lookup(verified_block.block.slot());

let result = self
.chain
.process_block_with_early_caching(block_root, verified_block, NotifyExecutionLayer::Yes)
Expand Down Expand Up @@ -1209,26 +1170,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.chain.recompute_head_at_current_slot().await;
}
Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => {
if delay_lookup {
self.cache_peer(peer_id, block_root);
trace!(
self.log,
"Processed block, delaying lookup for other components";
"slot" => slot,
"block_root" => %block_root,
);
} else {
trace!(
self.log,
"Missing block components for gossip verified block";
"slot" => slot,
"block_root" => %block_root,
);
self.send_sync_message(SyncMessage::MissingGossipBlockComponents(
vec![peer_id],
*block_root,
));
}
trace!(
self.log,
"Processed block, waiting for other components";
"slot" => slot,
"block_root" => %block_root,
);
}
Err(BlockError::ParentUnknown(block)) => {
// Inform the sync manager to find parents for this block
Expand Down
73 changes: 2 additions & 71 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,15 @@ use lighthouse_network::{
rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage},
Client, MessageId, NetworkGlobals, PeerId, PeerRequestId,
};
use lru::LruCache;
use parking_lot::Mutex;
use slog::{crit, debug, error, trace, Logger};
use slot_clock::{ManualSlotClock, SlotClock};
use std::collections::HashSet;
use slog::{debug, Logger};
use slot_clock::ManualSlotClock;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use store::MemoryStore;
use task_executor::test_utils::TestRuntime;
use task_executor::TaskExecutor;
use tokio::sync::mpsc::{self, error::TrySendError};
use tokio::time::{interval_at, Instant};
use types::*;

pub use sync_methods::ChainSegmentProcessId;
Expand All @@ -44,7 +40,6 @@ mod sync_methods;
mod tests;

pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1;
pub const DELAYED_PEER_CACHE_SIZE: usize = 16;

/// Defines if and where we will store the SSZ files of invalid blocks.
#[derive(Clone)]
Expand All @@ -65,7 +60,6 @@ pub struct NetworkBeaconProcessor<T: BeaconChainTypes> {
pub reprocess_tx: mpsc::Sender<ReprocessQueueMessage>,
pub network_globals: Arc<NetworkGlobals<T::EthSpec>>,
pub invalid_block_storage: InvalidBlockStorage,
pub delayed_lookup_peers: Mutex<LruCache<Hash256, HashSet<PeerId>>>,
pub executor: TaskExecutor,
pub log: Logger,
}
Expand Down Expand Up @@ -630,68 +624,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"error" => %e)
});
}

/// This service is responsible for collecting lookup messages and sending them back to sync
/// for processing after a short delay.
///
/// We want to delay lookups triggered from gossip for the following reasons:
///
/// - We only want to make one request for components we are unlikely to see on gossip. This means
/// we don't have to repeatedly update our RPC request's state as we receive gossip components.
///
/// - We are likely to receive blocks/blobs over gossip more quickly than we could via an RPC request.
///
/// - Delaying a lookup means we are less likely to simultaneously download the same blocks/blobs
/// over gossip and RPC.
///
/// - We would prefer to request peers based on whether we've seen them attest, because this gives
/// us an idea about whether they *should* have the block/blobs we're missing. This is because a
/// node should not attest to a block unless it has all the blobs for that block. This gives us a
/// stronger basis for peer scoring.
pub fn spawn_delayed_lookup_service(self: &Arc<Self>) {
let processor_clone = self.clone();
let executor = self.executor.clone();
let log = self.log.clone();
let beacon_chain = self.chain.clone();
executor.spawn(
async move {
let slot_duration = beacon_chain.slot_clock.slot_duration();
let delay = beacon_chain.slot_clock.single_lookup_delay();
let interval_start = match (
beacon_chain.slot_clock.duration_to_next_slot(),
beacon_chain.slot_clock.seconds_from_current_slot_start(),
) {
(Some(duration_to_next_slot), Some(seconds_from_current_slot_start)) => {
let duration_until_start = if seconds_from_current_slot_start > delay {
duration_to_next_slot + delay
} else {
delay - seconds_from_current_slot_start
};
Instant::now() + duration_until_start
}
_ => {
crit!(log,
"Failed to read slot clock, delayed lookup service timing will be inaccurate.\
This may degrade performance"
);
Instant::now()
}
};

let mut interval = interval_at(interval_start, slot_duration);
loop {
interval.tick().await;
let Some(slot) = beacon_chain.slot_clock.now_or_genesis() else {
error!(log, "Skipping delayed lookup poll, unable to read slot clock");
continue
};
trace!(log, "Polling delayed lookups for slot: {slot}");
processor_clone.poll_delayed_lookups(slot)
}
},
"delayed_lookups",
);
}
}

type TestBeaconChainType<E> =
Expand Down Expand Up @@ -734,7 +666,6 @@ impl<E: EthSpec> NetworkBeaconProcessor<TestBeaconChainType<E>> {
reprocess_tx: work_reprocessing_tx,
network_globals,
invalid_block_storage: InvalidBlockStorage::Disabled,
delayed_lookup_peers: Mutex::new(LruCache::new(DELAYED_PEER_CACHE_SIZE)),
executor: runtime.task_executor.clone(),
log,
};
Expand Down
25 changes: 2 additions & 23 deletions beacon_node/network/src/network_beacon_processor/sync_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use beacon_processor::{
AsyncFn, BlockingFn, DuplicateCache,
};
use lighthouse_network::PeerAction;
use slog::{debug, error, info, trace, warn};
use slog::{debug, error, info, warn};
use slot_clock::SlotClock;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -28,7 +28,7 @@ use store::KzgCommitment;
use tokio::sync::mpsc;
use types::beacon_block_body::format_kzg_commitments;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{Epoch, Hash256, Slot};
use types::{Epoch, Hash256};

/// Id associated to a batch processing request, either a sync batch or a parent lookup.
#[derive(Clone, Debug, PartialEq)]
Expand Down Expand Up @@ -373,27 +373,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
});
}

/// Poll the beacon chain for any delayed lookups that are now available.
pub fn poll_delayed_lookups(&self, slot: Slot) {
let block_roots = self
.chain
.data_availability_checker
.incomplete_processing_components(slot);
if block_roots.is_empty() {
trace!(self.log, "No delayed lookups found on poll");
} else {
debug!(self.log, "Found delayed lookups on poll"; "lookup_count" => block_roots.len());
}
for block_root in block_roots {
if let Some(peer_ids) = self.delayed_lookup_peers.lock().pop(&block_root) {
self.send_sync_message(SyncMessage::MissingGossipBlockComponents(
peer_ids.into_iter().collect(),
block_root,
));
}
}
}

/// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync
/// thread if more blocks are needed to process it.
pub async fn process_chain_segment(
Expand Down
4 changes: 0 additions & 4 deletions beacon_node/network/src/network_beacon_processor/tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![cfg(not(debug_assertions))] // Tests are too slow in debug.
#![cfg(test)]

use crate::network_beacon_processor::DELAYED_PEER_CACHE_SIZE;
use crate::{
network_beacon_processor::{
ChainSegmentProcessId, DuplicateCache, InvalidBlockStorage, NetworkBeaconProcessor,
Expand All @@ -24,8 +23,6 @@ use lighthouse_network::{
types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield},
Client, MessageId, NetworkGlobals, PeerId, Response,
};
use lru::LruCache;
use parking_lot::Mutex;
use slot_clock::SlotClock;
use std::iter::Iterator;
use std::sync::Arc;
Expand Down Expand Up @@ -223,7 +220,6 @@ impl TestRig {
reprocess_tx: work_reprocessing_tx.clone(),
network_globals: network_globals.clone(),
invalid_block_storage: InvalidBlockStorage::Disabled,
delayed_lookup_peers: Mutex::new(LruCache::new(DELAYED_PEER_CACHE_SIZE)),
executor: executor.clone(),
log: log.clone(),
};
Expand Down
6 changes: 0 additions & 6 deletions beacon_node/network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ use lighthouse_network::{
MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response,
};
use logging::TimeLatch;
use lru::LruCache;
use parking_lot::Mutex;
use slog::{crit, debug, o, trace};
use slog::{error, warn};
use std::sync::Arc;
Expand Down Expand Up @@ -111,14 +109,10 @@ impl<T: BeaconChainTypes> Router<T> {
reprocess_tx: beacon_processor_reprocess_tx,
network_globals: network_globals.clone(),
invalid_block_storage,
delayed_lookup_peers: Mutex::new(LruCache::new(
crate::network_beacon_processor::DELAYED_PEER_CACHE_SIZE,
)),
executor: executor.clone(),
log: log.clone(),
};
let network_beacon_processor = Arc::new(network_beacon_processor);
network_beacon_processor.spawn_delayed_lookup_service();

// spawn the sync thread
crate::sync::manager::spawn(
Expand Down
Loading

0 comments on commit 46184e5

Please sign in to comment.