diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index e99d628af44..5310126aa78 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -530,13 +530,6 @@ impl Service for StateService { type Future = Pin> + Send + 'static>>; - // ## Correctness: - // - // This function must not return Poll::Pending, unless: - // 1. We remove all instances of `call_all` on the state service, or fix the leaked - // service reservation in the `CallAll` implementation: - // https://github.com/tower-rs/tower/blob/master/tower/src/util/call_all/common.rs#L112 - // 2. We schedule the current task for wakeup via the `Context` fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { let now = Instant::now(); diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index 7041d66bc86..e40f4c73c44 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -7,7 +7,7 @@ use std::{ use futures::{ future::{FutureExt, TryFutureExt}, - stream::{Stream, TryStreamExt}, + stream::Stream, }; use tokio::sync::oneshot; use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, ServiceExt}; @@ -52,12 +52,6 @@ pub type SetupData = (Outbound, Arc>); /// behind the current tip, while the `Inbound` service is *externally driven*, /// responding to block gossip by attempting to download and validate advertised /// blocks. -/// -/// ## Correctness -/// -/// The `state` service must not return `Poll::Pending`. If it does, a bug in the -/// `ServiceExt::call_all` implementation might cause the `state` buffer to fill -/// up, and make Zebra hang. pub struct Inbound { // invariants: // * Before setup: address_book and downloads are None, and the *_setup members are Some @@ -160,18 +154,14 @@ impl Service for Inbound { } // TODO: - // * do we want to propagate backpressure from the download queue or its outbound network here? - // currently, the download queue waits for the outbound network in the download future, and - // drops new requests after it reaches a hard-coded limit. This is the "load shed directly" - // pattern from #1618. - // * if we want to propagate backpressure, add a ReadyCache to ensure that each poll_ready - // has a matching call. See #1593 for details. - - // Ignore state readiness, to avoid reserving its buffer slots. - // We can't use a state ReadyCache, because call_all uses state directly. - // We can't call state.poll_ready, because: - // * call_all also calls poll_ready - // * some requests don't use the state + // * do we want to propagate backpressure from the download queue or its outbound network? + // currently, the download queue waits for the outbound network in the download future, + // and drops new requests after it reaches a hard-coded limit. This is the + // "load shed directly" pattern from #1618. + // * currently, the state service is always ready, unless its buffer is full. + // So we might also want to propagate backpressure from its buffer. + // * if we want to propagate backpressure, add a ReadyCache for each service, to ensure + // that each poll_ready has a matching call. See #1593 for details. Poll::Ready(Ok(())) } @@ -194,34 +184,26 @@ impl Service for Inbound { zn::Request::BlocksByHash(hashes) => { // Correctness: // - // We don't need to use ServiceExt::oneshot here, because - // call_all uses poll_ready internally. - // - // The state must not return Poll::Pending, because call_all - // leaks a buffer reservation every time that happens + // We can't use `call_all` here, because it leaks buffer slots: // https://github.com/tower-rs/tower/blob/master/tower/src/util/call_all/common.rs#L112 - let state = self.state.clone(); - let requests = futures::stream::iter( - hashes - .into_iter() - .map(|hash| zs::Request::Block(hash.into())), - ); - - state - .call_all(requests) - .try_filter_map(|rsp| { - futures::future::ready(match rsp { - zs::Response::Block(Some(block)) => Ok(Some(block)), + let mut state = self.state.clone(); + async move { + let mut blocks = Vec::new(); + for hash in hashes { + let request = zs::Request::Block(hash.into()); + // we can't use ServiceExt::oneshot here, due to lifetime issues + match state.ready_and().await?.call(request).await? { + zs::Response::Block(Some(block)) => blocks.push(block), // `zcashd` ignores missing blocks in GetData responses, // rather than including them in a trailing `NotFound` // message - zs::Response::Block(None) => Ok(None), + zs::Response::Block(None) => {} _ => unreachable!("wrong response from state"), - }) - }) - .try_collect::>() - .map_ok(zn::Response::Blocks) - .boxed() + } + } + Ok(zn::Response::Blocks(blocks)) + } + .boxed() } zn::Request::TransactionsByHash(_transactions) => { // `zcashd` returns a list of found transactions, followed by a