Skip to content

Commit

Permalink
Merge of #5561
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Nov 9, 2022
2 parents 1a62223 + 4e687d7 commit c3892b1
Show file tree
Hide file tree
Showing 10 changed files with 323 additions and 62 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/continous-integration-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,8 @@ jobs:
app_name: zebrad
test_id: full-sync-to-tip
test_description: Test a full sync up to the tip
test_variables: '-e TEST_FULL_SYNC=1 -e ZEBRA_FORCE_USE_COLOR=1 -e FULL_SYNC_MAINNET_TIMEOUT_MINUTES=600'
# The value of FULL_SYNC_MAINNET_TIMEOUT_MINUTES is currently ignored.
test_variables: '-e TEST_FULL_SYNC=1 -e ZEBRA_FORCE_USE_COLOR=1 -e FULL_SYNC_MAINNET_TIMEOUT_MINUTES=0'
# This test runs for longer than 6 hours, so it needs multiple jobs
is_long_test: true
needs_zebra_state: false
Expand Down
4 changes: 3 additions & 1 deletion zebra-state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ pub use request::{FinalizedBlock, HashOrHeight, PreparedBlock, ReadRequest, Requ
pub use response::{ReadResponse, Response};
pub use service::{
chain_tip::{ChainTipChange, LatestChainTip, TipAction},
init, spawn_init, OutputIndex, OutputLocation, TransactionLocation,
init, spawn_init,
watch_receiver::WatchReceiver,
OutputIndex, OutputLocation, TransactionLocation,
};

#[cfg(any(test, feature = "proptest-impl"))]
Expand Down
49 changes: 33 additions & 16 deletions zebrad/src/components/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use color_eyre::eyre::{eyre, Report};
use futures::stream::{FuturesUnordered, StreamExt};
use indexmap::IndexSet;
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
use tokio::{sync::watch, time::sleep};
use tower::{
builder::ServiceBuilder, hedge::Hedge, limit::ConcurrencyLimit, retry::Retry, timeout::Timeout,
Service, ServiceExt,
Expand Down Expand Up @@ -83,8 +83,7 @@ pub const MIN_CHECKPOINT_CONCURRENCY_LIMIT: usize = zebra_consensus::MAX_CHECKPO
/// The default for the user-specified lookahead limit.
///
/// See [`MIN_CHECKPOINT_CONCURRENCY_LIMIT`] for details.
pub const DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT: usize =
zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP * 2;
pub const DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT: usize = MAX_TIPS_RESPONSE_HASH_COUNT * 2;

/// A lower bound on the user-specified concurrency limit.
///
Expand Down Expand Up @@ -359,6 +358,10 @@ where

/// The lengths of recent sync responses.
recent_syncs: RecentSyncLengths,

/// Receiver that is `true` when the downloader is past the lookahead limit.
/// This is based on the downloaded block height and the state tip height.
past_lookahead_limit_receiver: zs::WatchReceiver<bool>,
}

/// Polls the network to determine whether further blocks are available and
Expand Down Expand Up @@ -438,6 +441,7 @@ where
}

let tip_network = Timeout::new(peers.clone(), TIPS_RESPONSE_TIMEOUT);

// The Hedge middleware is the outermost layer, hedging requests
// between two retry-wrapped networks. The innermost timeout
// layer is relatively unimportant, because slow requests will
Expand All @@ -464,27 +468,33 @@ where

let (sync_status, recent_syncs) = SyncStatus::new();

let (past_lookahead_limit_sender, past_lookahead_limit_receiver) = watch::channel(false);
let past_lookahead_limit_receiver = zs::WatchReceiver::new(past_lookahead_limit_receiver);

let downloads = Box::pin(Downloads::new(
block_network,
verifier,
latest_chain_tip.clone(),
past_lookahead_limit_sender,
max(
checkpoint_verify_concurrency_limit,
full_verify_concurrency_limit,
),
max_checkpoint_height,
));

let new_syncer = Self {
genesis_hash: genesis_hash(config.network.network),
max_checkpoint_height,
checkpoint_verify_concurrency_limit,
full_verify_concurrency_limit,
tip_network,
downloads: Box::pin(Downloads::new(
block_network,
verifier,
latest_chain_tip.clone(),
// TODO: change the download lookahead for full verification?
max(
checkpoint_verify_concurrency_limit,
full_verify_concurrency_limit,
),
max_checkpoint_height,
)),
downloads,
state,
latest_chain_tip,
prospective_tips: HashSet::new(),
recent_syncs,
past_lookahead_limit_receiver,
};

(new_syncer, sync_status)
Expand Down Expand Up @@ -545,7 +555,14 @@ where
}
self.update_metrics();

while self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len()) {
// Pause new downloads while the syncer or downloader are past their lookahead limits.
//
// To avoid a deadlock or long waits for blocks to expire, we ignore the download
// lookahead limit when there are only a small number of blocks waiting.
while self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len())
|| (self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len()) / 2
&& self.past_lookahead_limit_receiver.cloned_watch_data())
{
trace!(
tips.len = self.prospective_tips.len(),
in_flight = self.downloads.in_flight(),
Expand Down Expand Up @@ -957,7 +974,7 @@ where
}

/// The configured lookahead limit, based on the currently verified height,
/// and the number of hashes we haven't queued yet..
/// and the number of hashes we haven't queued yet.
fn lookahead_limit(&self, new_hashes: usize) -> usize {
let max_checkpoint_height: usize = self
.max_checkpoint_height
Expand Down
149 changes: 116 additions & 33 deletions zebrad/src/components/sync/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
collections::HashMap,
convert::{self, TryFrom},
pin::Pin,
sync::Arc,
sync::{Arc, TryLockError},
task::{Context, Poll},
};

Expand All @@ -15,7 +15,11 @@ use futures::{
};
use pin_project::pin_project;
use thiserror::Error;
use tokio::{sync::oneshot, task::JoinHandle, time::timeout};
use tokio::{
sync::{oneshot, watch},
task::JoinHandle,
time::timeout,
};
use tower::{hedge, Service, ServiceExt};
use tracing_futures::Instrument;

Expand All @@ -42,14 +46,17 @@ type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// to hold a few extra tips responses worth of blocks,
/// even if the syncer queue is full. Any unused capacity is shared between both queues.
///
/// If this capacity is exceeded, the downloader will start failing download blocks with
/// [`BlockDownloadVerifyError::AboveLookaheadHeightLimit`], and the syncer will reset.
/// If this capacity is exceeded, the downloader will tell the syncer to pause new downloads.
///
/// Since the syncer queue is limited to the `lookahead_limit`,
/// the rest of the capacity is reserved for the other queues.
/// There is no reserved capacity for the syncer queue:
/// if the other queues stay full, the syncer will eventually time out and reset.
pub const VERIFICATION_PIPELINE_SCALING_MULTIPLIER: usize = 5;
pub const VERIFICATION_PIPELINE_SCALING_MULTIPLIER: usize = 2;

/// The maximum height difference between Zebra's state tip and a downloaded block.
/// Blocks higher than this will get dropped and return an error.
pub const VERIFICATION_PIPELINE_DROP_LIMIT: i32 = 50_000;

#[derive(Copy, Clone, Debug)]
pub(super) struct AlwaysHedge;
Expand Down Expand Up @@ -89,6 +96,14 @@ pub enum BlockDownloadVerifyError {
hash: block::Hash,
},

/// A downloaded block was a long way ahead of the state chain tip.
/// This error should be very rare during normal operation.
///
/// We need to reset the syncer on this error, to allow the verifier and state to catch up,
/// or prevent it following a bad chain.
///
/// If we don't reset the syncer on this error, it will continue downloading blocks from a bad
/// chain, or blocks far ahead of the current state tip.
#[error("downloaded block was too far ahead of the chain tip: {height:?} {hash:?}")]
AboveLookaheadHeightLimit {
height: block::Height,
Expand Down Expand Up @@ -157,6 +172,7 @@ where
ZSTip: ChainTip + Clone + Send + 'static,
{
// Services
//
/// A service that forwards requests to connected peers, and returns their
/// responses.
network: ZN,
Expand All @@ -168,13 +184,24 @@ where
latest_chain_tip: ZSTip,

// Configuration
//
/// The configured lookahead limit, after applying the minimum limit.
lookahead_limit: usize,

/// The largest block height for the checkpoint verifier, based on the current config.
max_checkpoint_height: Height,

// Shared syncer state
//
/// Sender that is set to `true` when the downloader is past the lookahead limit.
/// This is based on the downloaded block height and the state tip height.
past_lookahead_limit_sender: Arc<std::sync::Mutex<watch::Sender<bool>>>,

/// Receiver for `past_lookahead_limit_sender`, which is used to avoid accessing the mutex.
past_lookahead_limit_receiver: zs::WatchReceiver<bool>,

// Internal downloads state
//
/// A list of pending block download and verify tasks.
#[pin]
pending: FuturesUnordered<
Expand Down Expand Up @@ -259,15 +286,23 @@ where
network: ZN,
verifier: ZV,
latest_chain_tip: ZSTip,
past_lookahead_limit_sender: watch::Sender<bool>,
lookahead_limit: usize,
max_checkpoint_height: Height,
) -> Self {
let past_lookahead_limit_receiver =
zs::WatchReceiver::new(past_lookahead_limit_sender.subscribe());

Self {
network,
verifier,
latest_chain_tip,
lookahead_limit,
max_checkpoint_height,
past_lookahead_limit_sender: Arc::new(std::sync::Mutex::new(
past_lookahead_limit_sender,
)),
past_lookahead_limit_receiver,
pending: FuturesUnordered::new(),
cancel_handles: HashMap::new(),
}
Expand Down Expand Up @@ -307,9 +342,13 @@ where

let mut verifier = self.verifier.clone();
let latest_chain_tip = self.latest_chain_tip.clone();

let lookahead_limit = self.lookahead_limit;
let max_checkpoint_height = self.max_checkpoint_height;

let past_lookahead_limit_sender = self.past_lookahead_limit_sender.clone();
let past_lookahead_limit_receiver = self.past_lookahead_limit_receiver.clone();

let task = tokio::spawn(
async move {
// Download the block.
Expand Down Expand Up @@ -346,19 +385,26 @@ where
// that will timeout before being verified.
let tip_height = latest_chain_tip.best_tip_height();

// TODO: don't use VERIFICATION_PIPELINE_SCALING_MULTIPLIER for full verification?
let max_lookahead_height = if let Some(tip_height) = tip_height {
let (lookahead_drop_height, lookahead_pause_height, lookahead_reset_height) = if let Some(tip_height) = tip_height {
// Scale the height limit with the lookahead limit,
// so users with low capacity or under DoS can reduce them both.
let lookahead = i32::try_from(
let lookahead_pause = i32::try_from(
lookahead_limit + lookahead_limit * VERIFICATION_PIPELINE_SCALING_MULTIPLIER,
)
.expect("fits in i32");
(tip_height + lookahead).expect("tip is much lower than Height::MAX")
.expect("fits in i32");


((tip_height + VERIFICATION_PIPELINE_DROP_LIMIT).expect("tip is much lower than Height::MAX"),
(tip_height + lookahead_pause).expect("tip is much lower than Height::MAX"),
(tip_height + lookahead_pause/2).expect("tip is much lower than Height::MAX"))
} else {
let genesis_drop = VERIFICATION_PIPELINE_DROP_LIMIT.try_into().expect("fits in u32");
let genesis_lookahead =
u32::try_from(lookahead_limit - 1).expect("fits in u32");
block::Height(genesis_lookahead)

(block::Height(genesis_drop),
block::Height(genesis_lookahead),
block::Height(genesis_lookahead/2))
};

// Get the finalized tip height, assuming we're using the non-finalized state.
Expand Down Expand Up @@ -388,28 +434,59 @@ where
return Err(BlockDownloadVerifyError::InvalidHeight { hash });
};

if block_height > max_lookahead_height {
info!(
?hash,
?block_height,
?tip_height,
?max_lookahead_height,
lookahead_limit = ?lookahead_limit,
"synced block height too far ahead of the tip: dropped downloaded block",
);
metrics::counter!("sync.max.height.limit.dropped.block.count", 1);

// This error should be very rare during normal operation.
//
// We need to reset the syncer on this error,
// to allow the verifier and state to catch up,
// or prevent it following a bad chain.
//
// If we don't reset the syncer on this error,
// it will continue downloading blocks from a bad chain,
// (or blocks far ahead of the current state tip).
if block_height > lookahead_drop_height {
Err(BlockDownloadVerifyError::AboveLookaheadHeightLimit { height: block_height, hash })?;
} else if block_height < min_accepted_height {
} else if block_height > lookahead_pause_height {
// This log can be very verbose, usually hundreds of blocks are dropped.
// So we only log at info level for the first above-height block.
if !past_lookahead_limit_receiver.cloned_watch_data() {
info!(
?hash,
?block_height,
?tip_height,
?lookahead_pause_height,
?lookahead_reset_height,
lookahead_limit = ?lookahead_limit,
"synced block height too far ahead of the tip: \
waiting for downloaded blocks to commit to the state",
);

// Set the watched value to true, since we're over the limit.
//
// It is ok to block here, because we're going to pause new downloads anyway.
// But if Zebra is shutting down, ignore the send error.
let _ = past_lookahead_limit_sender.lock().expect("thread panicked while holding the past_lookahead_limit_sender mutex guard").send(true);
} else {
debug!(
?hash,
?block_height,
?tip_height,
?lookahead_pause_height,
?lookahead_reset_height,
lookahead_limit = ?lookahead_limit,
"synced block height too far ahead of the tip: \
waiting for downloaded blocks to commit to the state",
);
}

metrics::counter!("sync.max.height.limit.paused.count", 1);
} else if block_height <= lookahead_reset_height && past_lookahead_limit_receiver.cloned_watch_data() {
// Try to reset the watched value to false, since we're well under the limit.
match past_lookahead_limit_sender.try_lock() {
Ok(watch_sender_guard) => {
// If Zebra is shutting down, ignore the send error.
let _ = watch_sender_guard.send(true);
metrics::counter!("sync.max.height.limit.reset.count", 1);
},
Err(TryLockError::Poisoned(_)) => panic!("thread panicked while holding the past_lookahead_limit_sender mutex guard"),
// We'll try allowing new downloads when we get the next block
Err(TryLockError::WouldBlock) => {}
}

metrics::counter!("sync.max.height.limit.reset.attempt.count", 1);
}

if block_height < min_accepted_height {
debug!(
?hash,
?block_height,
Expand Down Expand Up @@ -504,8 +581,14 @@ where
assert!(self.cancel_handles.is_empty());
}

/// Get the number of currently in-flight download tasks.
/// Get the number of currently in-flight download and verify tasks.
pub fn in_flight(&mut self) -> usize {
self.pending.len()
}

/// Returns true if there are no in-flight download and verify tasks.
#[allow(dead_code)]
pub fn is_empty(&mut self) -> bool {
self.pending.is_empty()
}
}
Loading

0 comments on commit c3892b1

Please sign in to comment.