diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index d682e07619e..3776c072d36 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -53,7 +53,7 @@ pub type SetupData = (Outbound, Arc>); /// responding to block gossip by attempting to download and validate advertised /// blocks. pub struct Inbound { - // invariant: address_book, outbound, downloads are Some if network_setup is None + // invariant: address_book and downloads are Some if network_setup or verifier are None // // why not use an enum for the inbound state? because it would mean // match-wrapping the body of Service::call rather than just expect()ing @@ -62,28 +62,29 @@ pub struct Inbound { // Setup /// A oneshot channel used to receive the address_book and outbound services /// after the network is set up. + /// + /// `None` after the network is set up. network_setup: Option>, + /// A service that verifies downloaded blocks. Given to `downloads` + /// after the network is set up. + /// + /// `None` after the network is set up and `downloads` is created. + verifier: Option, + // Services - /// A list of peer addresses. + /// A service that maintains a list of peer addresses. + /// + /// `None` until the network is set up. address_book: Option>>, - /// A service that downloads and verifies gossipped blocks. - downloads: Option, Timeout, State>>>>, - - /// A service that forwards requests to connected peers, and returns their - /// responses. + /// A stream that downloads and verifies gossipped blocks. /// - /// Only used for readiness checks, and via `downloads`. - outbound: Option, + /// `None` until the network is set up. + downloads: Option, Timeout, State>>>>, /// A service that manages cached blockchain state. state: State, - - /// A service that verifies downloaded blocks. - /// - /// Only used for readiness checks, and via `downloads`. - verifier: Verifier, } impl Inbound { @@ -94,11 +95,10 @@ impl Inbound { ) -> Self { Self { network_setup: Some(network_setup), + verifier: Some(verifier), address_book: None, downloads: None, - outbound: None, state, - verifier, } } } @@ -119,14 +119,18 @@ impl Service for Inbound { use oneshot::error::TryRecvError; match rx.try_recv() { Ok((outbound, address_book)) => { - self.outbound = Some(outbound.clone()); self.address_book = Some(address_book); - self.network_setup = None; self.downloads = Some(Box::pin(Downloads::new( Timeout::new(outbound, BLOCK_DOWNLOAD_TIMEOUT), - Timeout::new(self.verifier.clone(), BLOCK_VERIFY_TIMEOUT), + Timeout::new( + self.verifier + .take() + .expect("verifier is Some when network_setup is Some"), + BLOCK_VERIFY_TIMEOUT, + ), self.state.clone(), ))); + self.network_setup = None; } Err(TryRecvError::Empty) => { self.network_setup = Some(rx); @@ -141,23 +145,21 @@ impl Service for Inbound { }; } - // Clean up completed download tasks + // Clean up completed download tasks, ignoring their results if let Some(downloads) = self.downloads.as_mut() { while let Poll::Ready(Some(_)) = downloads.as_mut().poll_next(cx) {} } // Now report readiness based on readiness of the inner services, if they're available. - // XXX do we want to propagate backpressure from the network here? - match ( - self.state.poll_ready(cx), - self.outbound - .as_mut() - .map(|svc| svc.poll_ready(cx)) - .unwrap_or(Poll::Ready(Ok(()))), - ) { - (Poll::Ready(Err(e)), _) | (_, Poll::Ready(Err(e))) => Poll::Ready(Err(e)), - (Poll::Pending, _) | (_, Poll::Pending) => Poll::Pending, - (Poll::Ready(Ok(())), Poll::Ready(Ok(()))) => Poll::Ready(Ok(())), + // + // TODO: do we want to propagate backpressure from the download queue or its outbound network here? + // currently, the download queue waits for the outbound network in the download future, and + // drops new requests after it reaches a hard-coded limit. This is the "load shed directly" + // pattern from #1618. + match self.state.poll_ready(cx) { + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), } }