Skip to content

Commit

Permalink
Remove services that are never called from Inbound
Browse files Browse the repository at this point in the history
Uses the `ServiceExt::oneshot` design pattern from ZcashFoundation#1593.
  • Loading branch information
teor2345 committed Jan 22, 2021
1 parent f8a1152 commit 2be077b
Showing 1 changed file with 33 additions and 31 deletions.
64 changes: 33 additions & 31 deletions zebrad/src/components/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub type SetupData = (Outbound, Arc<Mutex<AddressBook>>);
/// responding to block gossip by attempting to download and validate advertised
/// blocks.
pub struct Inbound {
// invariant: address_book, outbound, downloads are Some if network_setup is None
// invariant: address_book and downloads are Some if network_setup or verifier are None
//
// why not use an enum for the inbound state? because it would mean
// match-wrapping the body of Service::call rather than just expect()ing
Expand All @@ -62,28 +62,29 @@ pub struct Inbound {
// Setup
/// A oneshot channel used to receive the address_book and outbound services
/// after the network is set up.
///
/// `None` after the network is set up.
network_setup: Option<oneshot::Receiver<SetupData>>,

/// A service that verifies downloaded blocks. Given to `downloads`
/// after the network is set up.
///
/// `None` after the network is set up and `downloads` is created.
verifier: Option<Verifier>,

// Services
/// A list of peer addresses.
/// A service that maintains a list of peer addresses.
///
/// `None` until the network is set up.
address_book: Option<Arc<Mutex<zn::AddressBook>>>,

/// A service that downloads and verifies gossipped blocks.
downloads: Option<Pin<Box<Downloads<Timeout<Outbound>, Timeout<Verifier>, State>>>>,

/// A service that forwards requests to connected peers, and returns their
/// responses.
/// A stream that downloads and verifies gossipped blocks.
///
/// Only used for readiness checks, and via `downloads`.
outbound: Option<Outbound>,
/// `None` until the network is set up.
downloads: Option<Pin<Box<Downloads<Timeout<Outbound>, Timeout<Verifier>, State>>>>,

/// A service that manages cached blockchain state.
state: State,

/// A service that verifies downloaded blocks.
///
/// Only used for readiness checks, and via `downloads`.
verifier: Verifier,
}

impl Inbound {
Expand All @@ -94,11 +95,10 @@ impl Inbound {
) -> Self {
Self {
network_setup: Some(network_setup),
verifier: Some(verifier),
address_book: None,
downloads: None,
outbound: None,
state,
verifier,
}
}
}
Expand All @@ -119,14 +119,18 @@ impl Service<zn::Request> for Inbound {
use oneshot::error::TryRecvError;
match rx.try_recv() {
Ok((outbound, address_book)) => {
self.outbound = Some(outbound.clone());
self.address_book = Some(address_book);
self.network_setup = None;
self.downloads = Some(Box::pin(Downloads::new(
Timeout::new(outbound, BLOCK_DOWNLOAD_TIMEOUT),
Timeout::new(self.verifier.clone(), BLOCK_VERIFY_TIMEOUT),
Timeout::new(
self.verifier
.take()
.expect("verifier is Some when network_setup is Some"),
BLOCK_VERIFY_TIMEOUT,
),
self.state.clone(),
)));
self.network_setup = None;
}
Err(TryRecvError::Empty) => {
self.network_setup = Some(rx);
Expand All @@ -141,23 +145,21 @@ impl Service<zn::Request> for Inbound {
};
}

// Clean up completed download tasks
// Clean up completed download tasks, ignoring their results
if let Some(downloads) = self.downloads.as_mut() {
while let Poll::Ready(Some(_)) = downloads.as_mut().poll_next(cx) {}
}

// Now report readiness based on readiness of the inner services, if they're available.
// XXX do we want to propagate backpressure from the network here?
match (
self.state.poll_ready(cx),
self.outbound
.as_mut()
.map(|svc| svc.poll_ready(cx))
.unwrap_or(Poll::Ready(Ok(()))),
) {
(Poll::Ready(Err(e)), _) | (_, Poll::Ready(Err(e))) => Poll::Ready(Err(e)),
(Poll::Pending, _) | (_, Poll::Pending) => Poll::Pending,
(Poll::Ready(Ok(())), Poll::Ready(Ok(()))) => Poll::Ready(Ok(())),
//
// TODO: do we want to propagate backpressure from the download queue or its outbound network here?
// currently, the download queue waits for the outbound network in the download future, and
// drops new requests after it reaches a hard-coded limit. This is the "load shed directly"
// pattern from #1618.
match self.state.poll_ready(cx) {
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
}
}

Expand Down

0 comments on commit 2be077b

Please sign in to comment.