Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/cargo/indexmap-1.9.1
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jun 23, 2022
2 parents f6d977a + 6aea0fd commit 02dead4
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 45 deletions.
19 changes: 11 additions & 8 deletions .github/workflows/deploy-gcp-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ jobs:
gcloud compute instances create-with-container "${{ inputs.test_id }}-${{ env.GITHUB_REF_SLUG_URL }}-${{ env.GITHUB_SHA_SHORT }}" \
--boot-disk-size 100GB \
--boot-disk-type pd-ssd \
--create-disk image=${{ env.CACHED_DISK_NAME }},name="${{ inputs.disk_prefix }}-${{ inputs.test_id }}-${{ env.GITHUB_SHA_SHORT }}",device-name="${{ inputs.disk_prefix }}-${{ inputs.test_id }}-${{ env.GITHUB_SHA_SHORT }}",size=100GB,type=pd-ssd \
--create-disk image=${{ env.CACHED_DISK_NAME }},name="${{ inputs.test_id }}-${{ env.GITHUB_SHA_SHORT }}",device-name="${{ inputs.test_id }}-${{ env.GITHUB_SHA_SHORT }}",size=100GB,type=pd-ssd \
--container-image debian:buster \
--container-restart-policy=never \
--machine-type ${{ env.MACHINE_TYPE }} \
Expand Down Expand Up @@ -263,10 +263,10 @@ jobs:
--command \
"\
docker volume create --driver local --opt type=ext4 --opt device=/dev/sdb \
${{ inputs.disk_prefix }}-${{ inputs.test_id }}-${{ env.GITHUB_SHA_SHORT }} \
${{ inputs.test_id }}-${{ env.GITHUB_SHA_SHORT }} \
&& \
docker run ${{ inputs.test_variables }} -t --name ${{ inputs.test_id }} \
--mount type=volume,src=${{ inputs.disk_prefix }}-${{ inputs.test_id }}-${{ env.GITHUB_SHA_SHORT }},dst=${{ inputs.root_state_path }}/${{ inputs.zebra_state_dir }} \
--mount type=volume,src=${{ inputs.test_id }}-${{ env.GITHUB_SHA_SHORT }},dst=${{ inputs.root_state_path }}/${{ inputs.zebra_state_dir }} \
${{ env.GAR_BASE }}/${{ env.IMAGE_NAME }}:sha-${{ env.GITHUB_SHA_SHORT }}"
# SSH into the just created VM, and create a Docker container to run the incoming test
Expand Down Expand Up @@ -309,18 +309,21 @@ jobs:
--command \
"\
docker volume create --driver local --opt type=ext4 --opt device=/dev/sdb \
${{ inputs.disk_prefix }}-${{ inputs.test_id }}-${{ env.GITHUB_SHA_SHORT }} \
${{ inputs.test_id }}-${{ env.GITHUB_SHA_SHORT }} \
&& \
docker run ${{ inputs.test_variables }} -t --name ${{ inputs.test_id }} \
--mount type=volume,src=${{ inputs.disk_prefix }}-${{ inputs.test_id }}-${{ env.GITHUB_SHA_SHORT }},dst=${{ inputs.root_state_path }}/${{ inputs.zebra_state_dir }} \
--mount type=volume,src=${{ inputs.disk_prefix }}-${{ inputs.test_id }}-${{ env.GITHUB_SHA_SHORT }},dst=${{ inputs.root_state_path }}/${{ inputs.lwd_state_dir }} \
--mount type=volume,src=${{ inputs.test_id }}-${{ env.GITHUB_SHA_SHORT }},dst=${{ inputs.root_state_path }}/${{ inputs.zebra_state_dir }} \
--mount type=volume,src=${{ inputs.test_id }}-${{ env.GITHUB_SHA_SHORT }},dst=${{ inputs.root_state_path }}/${{ inputs.lwd_state_dir }} \
${{ env.GAR_BASE }}/${{ env.IMAGE_NAME }}:sha-${{ env.GITHUB_SHA_SHORT }}"
create-state-image:
name: Create ${{ inputs.test_id }} cached state image
runs-on: ubuntu-latest
# We run exactly one of without-cached-state or with-cached-state, and we always skip the other one.
# Normally, if a job is skipped, all the jobs that depend on it are also skipped.
# So we need to override the default success() check to make this job run.
needs: [ test-without-cached-state, test-with-cached-state ]
if: ${{ inputs.saves_to_disk }}
if: ${{ !cancelled() && !failure() && inputs.saves_to_disk }}
permissions:
contents: 'read'
id-token: 'write'
Expand Down Expand Up @@ -394,7 +397,7 @@ jobs:
run: |
gcloud compute images create ${{ inputs.disk_prefix }}-${{ env.GITHUB_REF_SLUG_URL }}-${{ env.GITHUB_SHA_SHORT }}-v${{ env.STATE_VERSION }}-${{ env.NETWORK }}-${{ inputs.disk_suffix }} \
--force \
--source-disk=${{ inputs.disk_prefix }}-${{ inputs.test_id }}-${{ env.GITHUB_SHA_SHORT }} \
--source-disk=${{ inputs.test_id }}-${{ env.GITHUB_SHA_SHORT }} \
--source-disk-zone=${{ env.ZONE }} \
--storage-location=us \
--description="Created from commit ${{ env.GITHUB_SHA_SHORT }} with height ${{ env.SYNC_HEIGHT }}"
Expand Down
37 changes: 18 additions & 19 deletions zebra-network/src/address_book_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,33 +59,32 @@ impl AddressBookUpdater {
let address_book = Arc::new(std::sync::Mutex::new(address_book));

let worker_address_book = address_book.clone();
let span = Span::current();
let worker = move || {
span.in_scope(|| {
info!("starting the address book updater");
info!("starting the address book updater");

while let Some(event) = worker_rx.blocking_recv() {
trace!(?event, "got address book change");
while let Some(event) = worker_rx.blocking_recv() {
trace!(?event, "got address book change");

// # Correctness
//
// Briefly hold the address book threaded mutex, to update the
// state for a single address.
worker_address_book
.lock()
.expect("mutex should be unpoisoned")
.update(event);
}
// # Correctness
//
// Briefly hold the address book threaded mutex, to update the
// state for a single address.
worker_address_book
.lock()
.expect("mutex should be unpoisoned")
.update(event);
}

let error = Err(AllAddressBookUpdaterSendersClosed.into());
info!(?error, "stopping address book updater");
error
})
let error = Err(AllAddressBookUpdaterSendersClosed.into());
info!(?error, "stopping address book updater");
error
};

// Correctness: spawn address book accesses on a blocking thread,
// to avoid deadlocks (see #1976)
let address_book_updater_task_handle = tokio::task::spawn_blocking(worker);
let span = Span::current();
let address_book_updater_task_handle =
tokio::task::spawn_blocking(move || span.in_scope(worker));

(
address_book,
Expand Down
22 changes: 15 additions & 7 deletions zebra-network/src/peer_set/candidate_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use chrono::Utc;
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::time::{sleep_until, timeout, Instant};
use tower::{Service, ServiceExt};
use tracing::Span;

use zebra_chain::serialization::DateTime32;

Expand Down Expand Up @@ -333,9 +334,12 @@ where
//
// Extend handles duplicate addresses internally.
let address_book = self.address_book.clone();
tokio::task::spawn_blocking(move || address_book.lock().unwrap().extend(addrs))
.await
.expect("panic in new peers address book update task");
let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(|| address_book.lock().unwrap().extend(addrs))
})
.await
.expect("panic in new peers address book update task");
}

/// Returns the next candidate for a connection attempt, if any are available.
Expand Down Expand Up @@ -386,7 +390,8 @@ where
};

// Correctness: Spawn address book accesses on a blocking thread, to avoid deadlocks (see #1976).
let next_peer = tokio::task::spawn_blocking(next_peer)
let span = Span::current();
let next_peer = tokio::task::spawn_blocking(move || span.in_scope(next_peer))
.await
.expect("panic in next peer address book task")?;

Expand All @@ -406,9 +411,12 @@ where
// Spawn address book accesses on a blocking thread,
// to avoid deadlocks (see #1976).
let address_book = self.address_book.clone();
tokio::task::spawn_blocking(move || address_book.lock().unwrap().update(addr))
.await
.expect("panic in peer failure address book update task");
let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(|| address_book.lock().unwrap().update(addr))
})
.await
.expect("panic in peer failure address book update task");
}
}

Expand Down
10 changes: 7 additions & 3 deletions zebrad/src/components/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const BLOCK_DOWNLOAD_RETRY_LIMIT: usize = 3;

/// A lower bound on the user-specified lookahead limit.
///
/// Set to the maximum checkpoint interval, so the pipeline holds at least one checkpoint's
/// Set to the maximum checkpoint interval, so the pipeline holds around a checkpoint's
/// worth of blocks.
///
/// ## Security
Expand All @@ -79,7 +79,9 @@ pub const MIN_LOOKAHEAD_LIMIT: usize = zebra_consensus::MAX_CHECKPOINT_HEIGHT_GA
/// The default for the user-specified lookahead limit.
///
/// See [`MIN_LOOKAHEAD_LIMIT`] for details.
pub const DEFAULT_LOOKAHEAD_LIMIT: usize = zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP * 5;
///
/// TODO: increase to `MAX_CHECKPOINT_HEIGHT_GAP * 5`, after we implement orchard batching
pub const DEFAULT_LOOKAHEAD_LIMIT: usize = MIN_LOOKAHEAD_LIMIT;

/// The expected maximum number of hashes in an ObtainTips or ExtendTips response.
///
Expand Down Expand Up @@ -141,7 +143,9 @@ pub(super) const BLOCK_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(15);
///
/// If this timeout is set too low, the syncer will sometimes get stuck in a
/// failure loop.
pub(super) const BLOCK_VERIFY_TIMEOUT: Duration = Duration::from_secs(6 * 60);
///
/// TODO: reduce to `6 * 60`, after we implement orchard batching?
pub(super) const BLOCK_VERIFY_TIMEOUT: Duration = Duration::from_secs(10 * 60);

/// Controls how long we wait to restart syncing after finishing a sync run.
///
Expand Down
32 changes: 29 additions & 3 deletions zebrad/src/components/sync/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ pub enum BlockDownloadVerifyError {
#[error("block download & verification was cancelled during download: {hash:?}")]
CancelledDuringDownload { hash: block::Hash },

#[error(
"block download & verification was cancelled while waiting for the verifier service: \
to become ready: {height:?} {hash:?}"
)]
CancelledAwaitingVerifierReadiness {
height: block::Height,
hash: block::Hash,
},

#[error(
"block download & verification was cancelled during verification: {height:?} {hash:?}"
)]
Expand Down Expand Up @@ -282,6 +291,7 @@ where

let task = tokio::spawn(
async move {
// Download the block.
// Prefer the cancel handle if both are ready.
let rsp = tokio::select! {
biased;
Expand Down Expand Up @@ -393,12 +403,24 @@ where
Err(BlockDownloadVerifyError::BehindTipHeightLimit { height: block_height, hash })?;
}

// Wait for the verifier service to be ready.
let readiness = verifier.ready();
// Prefer the cancel handle if both are ready.
let verifier = tokio::select! {
biased;
_ = &mut cancel_rx => {
trace!("task cancelled waiting for verifier service readiness");
metrics::counter!("sync.cancelled.verify.ready.count", 1);
return Err(BlockDownloadVerifyError::CancelledAwaitingVerifierReadiness { height: block_height, hash })
}
verifier = readiness => verifier,
};

// Verify the block.
let rsp = verifier
.ready()
.await
.map_err(|error| BlockDownloadVerifyError::VerifierServiceError { error })?
.call(block);
// Prefer the cancel handle if both are ready.

let verification = tokio::select! {
biased;
_ = &mut cancel_rx => {
Expand All @@ -408,6 +430,7 @@ where
}
verification = rsp => verification,
};

if verification.is_ok() {
metrics::counter!("sync.verified.block.count", 1);
}
Expand All @@ -425,6 +448,9 @@ where
.map_err(move |e| (e, hash)),
);

// Try to start the spawned task before queueing the next block request
tokio::task::yield_now().await;

self.pending.push(task);
assert!(
self.cancel_handles.insert(hash, cancel_tx).is_none(),
Expand Down
15 changes: 10 additions & 5 deletions zebrad/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl Default for MetricsSection {
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields, default)]
pub struct SyncSection {
/// The maximum number of concurrent block requests during sync.
/// The maximum number of concurrent block download requests during sync.
///
/// This is set to a low value by default, to avoid task and
/// network contention. Increasing this value may improve
Expand All @@ -178,22 +178,27 @@ pub struct SyncSection {
/// download before waiting for queued verifications to complete.
///
/// Increasing this limit increases the buffer size, so it reduces
/// the impact of an individual block request failing. The block
/// size limit is 2MB, so in theory, this could represent multiple
/// the impact of an individual block request failing. However, it
/// also increases memory and CPU usage if block validation stalls,
/// or there are some large blocks in the pipeline.
///
/// The block size limit is 2MB, so in theory, this could represent multiple
/// gigabytes of data, if we downloaded arbitrary blocks. However,
/// because we randomly load balance outbound requests, and separate
/// block download from obtaining block hashes, an adversary would
/// have to control a significant fraction of our peers to lead us
/// astray.
///
/// This value is clamped to an implementation-defined lower bound.
/// For reliable checkpoint syncing, Zebra enforces a
/// [`MIN_LOOKAHEAD_LIMIT`](sync::MIN_LOOKAHEAD_LIMIT).
pub lookahead_limit: usize,
}

impl Default for SyncSection {
fn default() -> Self {
Self {
max_concurrent_block_requests: 50,
// TODO: increase to 50, after we implement orchard batching
max_concurrent_block_requests: 25,
lookahead_limit: sync::DEFAULT_LOOKAHEAD_LIMIT,
}
}
Expand Down

0 comments on commit 02dead4

Please sign in to comment.