Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove delayed lookups #4992

Merged
merged 4 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 0 additions & 25 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,31 +413,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.incomplete_processing_components(slot)
}

/// Determines whether we are at least the `single_lookup_delay` duration into the given slot.
/// If we are not currently in the Deneb fork, this delay is not considered.
///
/// The `single_lookup_delay` is the duration we wait for a blocks or blobs to arrive over
/// gossip before making single block or blob requests. This is to minimize the number of
/// single lookup requests we end up making.
pub fn should_delay_lookup(&self, slot: Slot) -> bool {
if !self.is_deneb() {
return false;
}

let current_or_future_slot = self
.slot_clock
.now()
.map_or(false, |current_slot| current_slot <= slot);

let delay_threshold_unmet = self
.slot_clock
.millis_from_current_slot_start()
.map_or(false, |millis_into_slot| {
millis_into_slot < self.slot_clock.single_lookup_delay()
});
current_or_future_slot && delay_threshold_unmet
}

/// The epoch at which we require a data availability check in block processing.
/// `None` if the `Deneb` fork is disabled.
pub fn data_availability_boundary(&self) -> Option<Epoch> {
Expand Down
81 changes: 14 additions & 67 deletions beacon_node/network/src/network_beacon_processor/gossip_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ use crate::{
service::NetworkMessage,
sync::SyncMessage,
};
use std::collections::HashSet;

use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::store::Error;
Expand Down Expand Up @@ -756,11 +754,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let blob_slot = verified_blob.slot();
let blob_index = verified_blob.id().index;

let delay_lookup = self
.chain
.data_availability_checker
.should_delay_lookup(blob_slot);

match self.chain.process_gossip_blob(verified_blob).await {
Ok(AvailabilityProcessingStatus::Imported(block_root)) => {
// Note: Reusing block imported metric here
Expand All @@ -772,29 +765,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
self.chain.recompute_head_at_current_slot().await;
}
Ok(AvailabilityProcessingStatus::MissingComponents(_slot, block_root)) => {
if delay_lookup {
self.cache_peer(peer_id, &block_root);
trace!(
self.log,
"Processed blob, delaying lookup for other components";
"slot" => %blob_slot,
"blob_index" => %blob_index,
"block_root" => %block_root,
);
} else {
trace!(
self.log,
"Missing block components for gossip verified blob";
"slot" => %blob_slot,
"blob_index" => %blob_index,
"block_root" => %block_root,
);
self.send_sync_message(SyncMessage::MissingGossipBlockComponents(
vec![peer_id],
block_root,
));
}
Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => {
trace!(
self.log,
"Processed blob, waiting for other components";
"slot" => %slot,
"blob_index" => %blob_index,
"block_root" => %block_root,
);
}
Err(err) => {
debug!(
Expand All @@ -818,18 +796,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}

/// Cache the peer id for the given block root.
fn cache_peer(self: &Arc<Self>, peer_id: PeerId, block_root: &Hash256) {
let mut guard = self.delayed_lookup_peers.lock();
if let Some(peers) = guard.get_mut(block_root) {
peers.insert(peer_id);
} else {
let mut peers = HashSet::new();
peers.insert(peer_id);
guard.push(*block_root, peers);
}
}

/// Process the beacon block received from the gossip network and:
///
/// - If it passes gossip propagation criteria, tell the network thread to forward it.
Expand Down Expand Up @@ -1170,11 +1136,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let block = verified_block.block.block_cloned();
let block_root = verified_block.block_root;

let delay_lookup = self
.chain
.data_availability_checker
.should_delay_lookup(verified_block.block.slot());

let result = self
.chain
.process_block_with_early_caching(block_root, verified_block, NotifyExecutionLayer::Yes)
Expand Down Expand Up @@ -1209,26 +1170,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.chain.recompute_head_at_current_slot().await;
}
Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => {
if delay_lookup {
self.cache_peer(peer_id, block_root);
trace!(
self.log,
"Processed block, delaying lookup for other components";
"slot" => slot,
"block_root" => %block_root,
);
} else {
trace!(
self.log,
"Missing block components for gossip verified block";
"slot" => slot,
"block_root" => %block_root,
);
self.send_sync_message(SyncMessage::MissingGossipBlockComponents(
vec![peer_id],
*block_root,
));
}
trace!(
self.log,
"Processed block, waiting for other components";
"slot" => slot,
"block_root" => %block_root,
);
}
Err(BlockError::ParentUnknown(block)) => {
// Inform the sync manager to find parents for this block
Expand Down
73 changes: 2 additions & 71 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,15 @@ use lighthouse_network::{
rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage},
Client, MessageId, NetworkGlobals, PeerId, PeerRequestId,
};
use lru::LruCache;
use parking_lot::Mutex;
use slog::{crit, debug, error, trace, Logger};
use slot_clock::{ManualSlotClock, SlotClock};
use std::collections::HashSet;
use slog::{debug, Logger};
use slot_clock::ManualSlotClock;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use store::MemoryStore;
use task_executor::test_utils::TestRuntime;
use task_executor::TaskExecutor;
use tokio::sync::mpsc::{self, error::TrySendError};
use tokio::time::{interval_at, Instant};
use types::*;

pub use sync_methods::ChainSegmentProcessId;
Expand All @@ -44,7 +40,6 @@ mod sync_methods;
mod tests;

pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1;
pub const DELAYED_PEER_CACHE_SIZE: usize = 16;

/// Defines if and where we will store the SSZ files of invalid blocks.
#[derive(Clone)]
Expand All @@ -65,7 +60,6 @@ pub struct NetworkBeaconProcessor<T: BeaconChainTypes> {
pub reprocess_tx: mpsc::Sender<ReprocessQueueMessage>,
pub network_globals: Arc<NetworkGlobals<T::EthSpec>>,
pub invalid_block_storage: InvalidBlockStorage,
pub delayed_lookup_peers: Mutex<LruCache<Hash256, HashSet<PeerId>>>,
pub executor: TaskExecutor,
pub log: Logger,
}
Expand Down Expand Up @@ -630,68 +624,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"error" => %e)
});
}

/// This service is responsible for collecting lookup messages and sending them back to sync
/// for processing after a short delay.
///
/// We want to delay lookups triggered from gossip for the following reasons:
///
/// - We only want to make one request for components we are unlikely to see on gossip. This means
/// we don't have to repeatedly update our RPC request's state as we receive gossip components.
///
/// - We are likely to receive blocks/blobs over gossip more quickly than we could via an RPC request.
///
/// - Delaying a lookup means we are less likely to simultaneously download the same blocks/blobs
/// over gossip and RPC.
///
/// - We would prefer to request peers based on whether we've seen them attest, because this gives
/// us an idea about whether they *should* have the block/blobs we're missing. This is because a
/// node should not attest to a block unless it has all the blobs for that block. This gives us a
/// stronger basis for peer scoring.
pub fn spawn_delayed_lookup_service(self: &Arc<Self>) {
let processor_clone = self.clone();
let executor = self.executor.clone();
let log = self.log.clone();
let beacon_chain = self.chain.clone();
executor.spawn(
async move {
let slot_duration = beacon_chain.slot_clock.slot_duration();
let delay = beacon_chain.slot_clock.single_lookup_delay();
let interval_start = match (
beacon_chain.slot_clock.duration_to_next_slot(),
beacon_chain.slot_clock.seconds_from_current_slot_start(),
) {
(Some(duration_to_next_slot), Some(seconds_from_current_slot_start)) => {
let duration_until_start = if seconds_from_current_slot_start > delay {
duration_to_next_slot + delay
} else {
delay - seconds_from_current_slot_start
};
Instant::now() + duration_until_start
}
_ => {
crit!(log,
"Failed to read slot clock, delayed lookup service timing will be inaccurate.\
This may degrade performance"
);
Instant::now()
}
};

let mut interval = interval_at(interval_start, slot_duration);
loop {
interval.tick().await;
let Some(slot) = beacon_chain.slot_clock.now_or_genesis() else {
error!(log, "Skipping delayed lookup poll, unable to read slot clock");
continue
};
trace!(log, "Polling delayed lookups for slot: {slot}");
processor_clone.poll_delayed_lookups(slot)
}
},
"delayed_lookups",
);
}
}

type TestBeaconChainType<E> =
Expand Down Expand Up @@ -734,7 +666,6 @@ impl<E: EthSpec> NetworkBeaconProcessor<TestBeaconChainType<E>> {
reprocess_tx: work_reprocessing_tx,
network_globals,
invalid_block_storage: InvalidBlockStorage::Disabled,
delayed_lookup_peers: Mutex::new(LruCache::new(DELAYED_PEER_CACHE_SIZE)),
executor: runtime.task_executor.clone(),
log,
};
Expand Down
25 changes: 2 additions & 23 deletions beacon_node/network/src/network_beacon_processor/sync_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use beacon_processor::{
AsyncFn, BlockingFn, DuplicateCache,
};
use lighthouse_network::PeerAction;
use slog::{debug, error, info, trace, warn};
use slog::{debug, error, info, warn};
use slot_clock::SlotClock;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -28,7 +28,7 @@ use store::KzgCommitment;
use tokio::sync::mpsc;
use types::beacon_block_body::format_kzg_commitments;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{Epoch, Hash256, Slot};
use types::{Epoch, Hash256};

/// Id associated to a batch processing request, either a sync batch or a parent lookup.
#[derive(Clone, Debug, PartialEq)]
Expand Down Expand Up @@ -373,27 +373,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
});
}

/// Poll the beacon chain for any delayed lookups that are now available.
pub fn poll_delayed_lookups(&self, slot: Slot) {
let block_roots = self
.chain
.data_availability_checker
.incomplete_processing_components(slot);
if block_roots.is_empty() {
trace!(self.log, "No delayed lookups found on poll");
} else {
debug!(self.log, "Found delayed lookups on poll"; "lookup_count" => block_roots.len());
}
for block_root in block_roots {
if let Some(peer_ids) = self.delayed_lookup_peers.lock().pop(&block_root) {
self.send_sync_message(SyncMessage::MissingGossipBlockComponents(
peer_ids.into_iter().collect(),
block_root,
));
}
}
}

/// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync
/// thread if more blocks are needed to process it.
pub async fn process_chain_segment(
Expand Down
4 changes: 0 additions & 4 deletions beacon_node/network/src/network_beacon_processor/tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![cfg(not(debug_assertions))] // Tests are too slow in debug.
#![cfg(test)]

use crate::network_beacon_processor::DELAYED_PEER_CACHE_SIZE;
use crate::{
network_beacon_processor::{
ChainSegmentProcessId, DuplicateCache, InvalidBlockStorage, NetworkBeaconProcessor,
Expand All @@ -24,8 +23,6 @@ use lighthouse_network::{
types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield},
Client, MessageId, NetworkGlobals, PeerId, Response,
};
use lru::LruCache;
use parking_lot::Mutex;
use slot_clock::SlotClock;
use std::iter::Iterator;
use std::sync::Arc;
Expand Down Expand Up @@ -223,7 +220,6 @@ impl TestRig {
reprocess_tx: work_reprocessing_tx.clone(),
network_globals: network_globals.clone(),
invalid_block_storage: InvalidBlockStorage::Disabled,
delayed_lookup_peers: Mutex::new(LruCache::new(DELAYED_PEER_CACHE_SIZE)),
executor: executor.clone(),
log: log.clone(),
};
Expand Down
6 changes: 0 additions & 6 deletions beacon_node/network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ use lighthouse_network::{
MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response,
};
use logging::TimeLatch;
use lru::LruCache;
use parking_lot::Mutex;
use slog::{crit, debug, o, trace};
use slog::{error, warn};
use std::sync::Arc;
Expand Down Expand Up @@ -111,14 +109,10 @@ impl<T: BeaconChainTypes> Router<T> {
reprocess_tx: beacon_processor_reprocess_tx,
network_globals: network_globals.clone(),
invalid_block_storage,
delayed_lookup_peers: Mutex::new(LruCache::new(
crate::network_beacon_processor::DELAYED_PEER_CACHE_SIZE,
)),
executor: executor.clone(),
log: log.clone(),
};
let network_beacon_processor = Arc::new(network_beacon_processor);
network_beacon_processor.spawn_delayed_lookup_service();

// spawn the sync thread
crate::sync::manager::spawn(
Expand Down
Loading
Loading