Skip to content

Commit

Permalink
Merge of #5839
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored May 24, 2024
2 parents 7fda18b + 3d53cd8 commit c0d266b
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 24 deletions.
57 changes: 54 additions & 3 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 4;
/// Maximum time we allow a lookup to exist before assuming it is stuck and will never make
/// progress. Assume the worse case processing time per block component set * times max depth.
/// 15 * 2 * 32 = 16 minutes.
const LOOKUP_MAX_DURATION_SECS: usize = 15 * PARENT_DEPTH_TOLERANCE;
const LOOKUP_MAX_DURATION_STUCK_SECS: u64 = 15 * PARENT_DEPTH_TOLERANCE as u64;
/// The most common case of child-lookup without peers is receiving block components before the
/// attestation deadline when the node is lagging behind. Once peers start attesting for the child
/// lookup at most after 4 seconds, the lookup should gain peers.
const LOOKUP_MAX_DURATION_NO_PEERS_SECS: u64 = 10;

pub enum BlockComponent<E: EthSpec> {
Block(DownloadResult<Arc<SignedBeaconBlock<E>>>),
Expand Down Expand Up @@ -696,6 +700,53 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
);
}

/// Perform some prune operations on lookups on some interval
pub fn prune_lookups(&mut self) {
self.drop_lookups_without_peers();
self.drop_stuck_lookups();
}

/// Lookups without peers are allowed to exist for some time. See this common race condition:
///
/// 1. Receive unknown block parent event
/// 2. Create child lookup with zero peers
/// 3. Parent is processed, before receiving any attestation for the child block
/// 4. Child lookup is attempted to make progress but has no peers
/// 5. We receive an attestion for child block and add a peer to the child block lookup
///
/// On step 4 we could drop the lookup because we attempt to issue a request with no peers
/// available. This has two issues:
/// - We may drop the lookup while some other block component is processing, triggering an
/// unknown lookup error. This can potentially cause un-related child lookups to also be
/// dropped when calling `drop_lookup_and_children`.
/// - We lose all progress of the lookup, and have to re-download its components that we may
/// already have there cached.
///
/// Instead there's no negative for keeping lookups with no peers around for some time. If we
/// regularly prune them, it should not be a memory concern (TODO: maybe yes!).
fn drop_lookups_without_peers(&mut self) {
for (lookup_id, block_root) in self
.single_block_lookups
.values()
.filter(|lookup| {
// Do not drop lookup that are awaiting events to prevent inconsinstencies. If a
// lookup gets stuck, it will be eventually pruned by `drop_stuck_lookups`
lookup.has_no_peers()
&& lookup.elapsed_since_created()
> Duration::from_secs(LOOKUP_MAX_DURATION_NO_PEERS_SECS)
&& !lookup.is_awaiting_event()
})
.map(|lookup| (lookup.id, lookup.block_root()))
.collect::<Vec<_>>()
{
debug!(self.log, "Dropping lookup with no peers";
"id" => lookup_id,
"block_root" => ?block_root
);
self.drop_lookup_and_children(lookup_id);
}
}

/// Safety mechanism to unstuck lookup sync. Lookup sync if purely event driven and depends on
/// external components to feed it events to make progress. If there is a bug in network, in
/// beacon processor, or here internally: lookups can get stuck forever. A stuck lookup can
Expand All @@ -709,10 +760,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
///
/// - One single clear warn level log per stuck incident
/// - If the original bug is sporadic, it reduces the time a node is stuck from forever to 15 min
pub fn drop_stuck_lookups(&mut self) {
fn drop_stuck_lookups(&mut self) {
// While loop to find and drop all disjoint trees of potentially stuck lookups.
while let Some(stuck_lookup) = self.single_block_lookups.values().find(|lookup| {
lookup.elapsed_since_created() > Duration::from_secs(LOOKUP_MAX_DURATION_SECS as u64)
lookup.elapsed_since_created() > Duration::from_secs(LOOKUP_MAX_DURATION_STUCK_SECS)
}) {
let ancestor_stuck_lookup = match self.find_oldest_ancestor_lookup(stuck_lookup) {
Ok(lookup) => lookup,
Expand Down
53 changes: 37 additions & 16 deletions beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,17 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
self.block_root() == block_root
}

/// Get all unique peers that claim to have imported this set of block components
pub fn all_peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
self.peers.iter()
/// Returns true if the block has already been downloaded.
pub fn both_components_processed(&self) -> bool {
self.block_request_state.state.is_processed()
&& self.blob_request_state.state.is_processed()
}

/// Returns true if this request is expecting some event to make progress
pub fn is_awaiting_event(&self) -> bool {
self.awaiting_parent.is_some()
|| self.block_request_state.state.is_awaiting_event()
|| self.blob_request_state.state.is_awaiting_event()
}

/// Makes progress on all requests of this lookup. Any error is not recoverable and must result
Expand Down Expand Up @@ -189,13 +197,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}

let Some(peer_id) = self.use_rand_available_peer() else {
if awaiting_parent {
// Allow lookups awaiting for a parent to have zero peers. If when the parent
// resolve they still have zero peers the lookup will fail gracefully.
return Ok(());
} else {
return Err(LookupRequestError::NoPeers);
}
// Allow lookup to not have any peers. In that case do nothing. If the lookup does
// not have peers for some time, it will be dropped.
return Ok(());
};

let request = R::request_state_mut(self);
Expand Down Expand Up @@ -226,18 +230,17 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
Ok(())
}

/// Get all unique peers that claim to have imported this set of block components
pub fn all_peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
self.peers.iter()
}

/// Add peer to all request states. The peer must be able to serve this request.
/// Returns true if the peer was newly inserted into some request state.
pub fn add_peer(&mut self, peer_id: PeerId) -> bool {
self.peers.insert(peer_id)
}

/// Returns true if the block has already been downloaded.
pub fn both_components_processed(&self) -> bool {
self.block_request_state.state.is_processed()
&& self.blob_request_state.state.is_processed()
}

/// Remove peer from available peers. Return true if there are no more available peers and all
/// requests are not expecting any future event (AwaitingDownload).
pub fn remove_peer(&mut self, peer_id: &PeerId) -> bool {
Expand Down Expand Up @@ -351,6 +354,24 @@ impl<T: Clone> SingleLookupRequestState<T> {
}
}

/// Returns true if we can expect some future event to progress this block component request
/// specifically.
pub fn is_awaiting_event(&self) -> bool {
match self.state {
// No event will progress this request specifically, but the request may be put on hold
// due to some external event
State::AwaitingDownload { .. } => false,
// Network will emit a download success / error event
State::Downloading { .. } => true,
// Not awaiting any external event
State::AwaitingProcess { .. } => false,
// Beacon processor will emit a processing result event
State::Processing { .. } => true,
// Request complete, no future event left
State::Processed { .. } => false,
}
}

pub fn peek_downloaded_data(&self) -> Option<&T> {
match &self.state {
State::AwaitingDownload => None,
Expand Down
11 changes: 6 additions & 5 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,9 +560,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
futures::stream::iter(ee_responsiveness_watch.await).flatten()
};

// LOOKUP_MAX_DURATION_SECS is 60 seconds. Logging every 30 seconds allows enough timely
// visbility while being sparse and not increasing the debug log volume in a noticeable way
let mut interval = tokio::time::interval(Duration::from_secs(30));
// min(LOOKUP_MAX_DURATION_*) is 15 seconds. The cost of calling prune_lookups more often is
// one iteration over the single lookups HashMap. This map is supposed to be very small < 10
// unless there is a bug.
let mut prune_lookups_interval = tokio::time::interval(Duration::from_secs(15));

// process any inbound messages
loop {
Expand All @@ -573,8 +574,8 @@ impl<T: BeaconChainTypes> SyncManager<T> {
Some(engine_state) = check_ee_stream.next(), if check_ee => {
self.handle_new_execution_engine_state(engine_state);
}
_ = interval.tick() => {
self.block_lookups.drop_stuck_lookups();
_ = prune_lookups_interval.tick() => {
self.block_lookups.prune_lookups();
}
}
}
Expand Down

0 comments on commit c0d266b

Please sign in to comment.