Skip to content

Commit

Permalink
Limit concurrent inbound gossipped block requests
Browse files Browse the repository at this point in the history
Uses the "load shed directly" design pattern from #1618.
  • Loading branch information
teor2345 committed Jan 29, 2021
1 parent 3d9888f commit 21b0360
Showing 1 changed file with 52 additions and 6 deletions.
58 changes: 52 additions & 6 deletions zebrad/src/components/inbound/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,30 @@ use zebra_state as zs;

type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;

/// The maximum number of concurrent inbound download and verify tasks.
///
/// Set to one and a half checkpoint intervals, so that the inbound queue can
/// hold a complete checkpoint interval, if needed. We expect the syncer to
/// download and verify checkpoints, so this bound might never be reached.
const MAX_INBOUND_CONCURRENCY: usize = zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP * 3 / 2;

/// The action taken in response to a peer's gossipped block hash.
pub enum DownloadAction {
/// The block hash was successfully queued for download and verification.
AddedToQueue,

/// The block hash is already queued, so this request was ignored.
///
/// Another peer has already gossipped the same hash to us.
AlreadyQueued,

/// The queue is at capacity, so this request was ignored.
///
/// The sync service should discover this block later, when we are closer
/// to the tip. The queue's capacity is [`MAX_INBOUND_CONCURRENCY`].
FullQueue,
}

/// Manages download and verification of blocks gossiped to this peer.
#[pin_project]
#[derive(Debug)]
Expand Down Expand Up @@ -116,12 +140,27 @@ where

/// Queue a block for download and verification.
///
/// Returns true if the block was newly queued, and false if it was already queued.
/// Returns the action taken in response to the queue request.
#[instrument(skip(self, hash), fields(hash = %hash))]
pub fn download_and_verify(&mut self, hash: block::Hash) -> bool {
pub fn download_and_verify(&mut self, hash: block::Hash) -> DownloadAction {
if self.cancel_handles.contains_key(&hash) {
tracing::debug!("hash already queued for download");
return false;
tracing::debug!(
?hash,
queue_len = self.pending.len(),
?MAX_INBOUND_CONCURRENCY,
"block hash already queued for inbound download: ignored block"
);
return DownloadAction::AlreadyQueued;
}

if self.pending.len() >= MAX_INBOUND_CONCURRENCY {
tracing::info!(
?hash,
queue_len = self.pending.len(),
?MAX_INBOUND_CONCURRENCY,
"too many blocks queued for inbound download: ignored block"
);
return DownloadAction::FullQueue;
}

// This oneshot is used to signal cancellation to the download task.
Expand Down Expand Up @@ -182,7 +221,14 @@ where
"blocks are only queued once"
);

tracing::debug!("queued hash for download");
true
tracing::debug!(
?hash,
queue_len = self.pending.len(),
?MAX_INBOUND_CONCURRENCY,
"queued hash for download"
);
metrics::gauge!("gossip.queued.block.count", self.pending.len() as _);

DownloadAction::AddedToQueue
}
}

0 comments on commit 21b0360

Please sign in to comment.