Skip to content

Commit

Permalink
Migrate to pin project lite (#595)
Browse files Browse the repository at this point in the history
* REMOVE ME updates peak_wema test to pass

* adds pin_project_lite dependency

* uses pin_project_lite for load::Constant

* uses pin_project_lite for load::PencingRequestsDiscover

* uses pin_project_lite for load::PeakEwma

* uses pin_project_lite for load::Completion

* uses pin_project_lite for tests::support::IntoStream

Turns IntoStream into a regular struct because pin_project_lite does not and will support tuple structs.

https://github.com/taiki-e/pin-project-lite/blob/416be96f7777862c68b567c92a91887f69a8c2b3/src/lib.rs#L401-L408

* refactors opaque_future into a regular struct

This enables migration to pin_project_lite, which does not and will not support tuple structs
https://github.com/taiki-e/pin-project-lite/blob/416be96f7777862c68b567c92a91887f69a8c2b3/src/lib.rs#L401-L408

* migrates opaque_future to use pin_project_lite

* removes tuple variant from load_shed::ResponseState enum

* migrates load_shed::future to pin_project_lite

* removes tuple variant from filter::future::State

* migrates filter::future to pin_project_lite

Note: the doc comment on AsyncResponseFuture::service was also reduced to a regular comment.

This is a known limitation of pin_project_lite that the they have labeled as "help wanted".
taiki-e/pin-project-lite#3 (comment)

* migrates retry::Retry to pin_project_lite

* refactors retry::future::State to enable pin_project_lite

pin_project_lite has the current limitation of nto supporting doc comments
taiki-e/pin-project-lite#3 (comment)

pin_project_lite does not and will not support tuple variants
https://github.com/taiki-e/pin-project-lite/blob/416be96f7777862c68b567c92a91887f69a8c2b3/src/lib.rs#L401-L408

* migrates retry::future to pin_project_lite

* migrates spawn_ready::make to pin_project_lite

* refactors buffer::future::ResponseState to allow pin_project_lite

* migrates buffer::future to pin_project_lite

* refactors util::AndThenFuture to allow pin_project_lite

* migrates util::AndThenFuture to pin_project_lite

* migrates hedge::Future to pin_project_lite

* migrates hedge::select::ResponseFuture to pin_project_lite

* refactors hedge::delay enum for pin_project_lite

* refactors reconnect::future enum for pin_project_lite

* refactors oneshot::State enum for pin_project_lite

* migrates util::oneshot to pin_project_lite

* migrates reconnect::future to pin_project_lite

* migrates hedge::delay to pin_project_lite

* migrates hedge::latency to pin_project_lite

* migrates discover::list to pin_project_lite

* migrates timeout::future to pin_project_lite

* migrates balance::pool to pin_project_lite

* migrates balance::p2c::make to pin_project_lite

* migrates balance::p2c::service to pin_project_lite

* migrates call_all::ordered to pin_project_lite

* migrates call_all::common to pin_project_lite

* migrates call_all::unordered to pin_project_lite

* migrates util::optional::future to pin_project_lite

* migrates limit::concurrency::future to pin_project_lite

* migrates tower-balance example to pin_project_lite

* applies cargo fmt

* migrates tower-test to pin_project_lite

* fixes cargo hack check

peak_wma and pending_requests will now properly compile without the "discover" feature enabled.

* fixes lint rename warning on nightly

broken_intra_doc_links has been renamed to rustdoc::broken_intra_doc_links

* migrates buffer::Worker to pin_project_lite

pin_project_lite does support PinnedDrop
https://github.com/taiki-e/pin-project-lite/pull/25/files

However, it does not support generic trait bounds on the PinnedDrop impl.

To workaround this, I removed the T::Error bound from the Worker struct definition,
and moved `close_semaphore` to a a new impl without that trait bound.

* fixes abort_on_drop test

This test was also failing on master.

* applies cargo fmt
  • Loading branch information
Michael-J-Ward authored Jul 28, 2021
1 parent 7776019 commit ee131aa
Show file tree
Hide file tree
Showing 49 changed files with 668 additions and 510 deletions.
2 changes: 1 addition & 1 deletion tower-layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
rust_2018_idioms,
unreachable_pub
)]
#![deny(broken_intra_doc_links)]
#![deny(rustdoc::broken_intra_doc_links)]

//! Layer traits and extensions.
//!
Expand Down
2 changes: 1 addition & 1 deletion tower-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
rust_2018_idioms,
unreachable_pub
)]
#![deny(broken_intra_doc_links)]
#![deny(rustdoc::broken_intra_doc_links)]

//! Definition of the core `Service` trait to Tower
//!
Expand Down
2 changes: 1 addition & 1 deletion tower-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ tokio = { version = "1.0", features = ["sync"] }
tokio-test = "0.4"
tower-layer = { version = "0.3", path = "../tower-layer" }
tower-service = { version = "0.3" }
pin-project = "1"
pin-project-lite = "0.2"

[dev-dependencies]
tokio = { version = "1.0", features = ["macros"] }
2 changes: 1 addition & 1 deletion tower-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
unreachable_pub
)]
#![allow(elided_lifetimes_in_paths)]
#![deny(broken_intra_doc_links)]
#![deny(rustdoc::broken_intra_doc_links)]

//! Mock `Service` that can be used in tests.
Expand Down
15 changes: 8 additions & 7 deletions tower-test/src/mock/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use crate::mock::error::{self, Error};
use futures_util::ready;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use tokio::sync::oneshot;

use std::{
Expand All @@ -11,12 +11,13 @@ use std::{
task::{Context, Poll},
};

/// Future of the `Mock` response.
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
rx: Option<Rx<T>>,
pin_project! {
/// Future of the `Mock` response.
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
rx: Option<Rx<T>>,
}
}

type Rx<T> = oneshot::Receiver<Result<T, Error>>;
Expand Down
1 change: 1 addition & 0 deletions tower/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ tokio = { version = "1", optional = true, features = ["sync"] }
tokio-stream = { version = "0.1.0", optional = true }
tokio-util = { version = "0.6.3", default-features = false, optional = true }
tracing = { version = "0.1.2", optional = true }
pin-project-lite = "0.2.7"

[dev-dependencies]
futures = "0.3"
Expand Down
19 changes: 14 additions & 5 deletions tower/examples/tower-balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use futures_core::{Stream, TryStream};
use futures_util::{stream, stream::StreamExt, stream::TryStreamExt};
use hdrhistogram::Histogram;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use rand::{self, Rng};
use std::hash::Hash;
use std::time::Duration;
Expand Down Expand Up @@ -78,8 +78,17 @@ type Error = Box<dyn std::error::Error + Send + Sync>;

type Key = usize;

#[pin_project]
struct Disco<S>(Vec<(Key, S)>);
pin_project! {
struct Disco<S> {
services: Vec<(Key, S)>
}
}

impl<S> Disco<S> {
fn new(services: Vec<(Key, S)>) -> Self {
Self { services }
}
}

impl<S> Stream for Disco<S>
where
Expand All @@ -88,7 +97,7 @@ where
type Item = Result<Change<Key, S>, Error>;

fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.project().0.pop() {
match self.project().services.pop() {
Some((k, service)) => Poll::Ready(Some(Ok(Change::Insert(k, service)))),
None => {
// there may be more later
Expand All @@ -105,7 +114,7 @@ fn gen_disco() -> impl Discover<
impl Service<Req, Response = Rsp, Error = Error, Future = impl Send> + Send,
>,
> + Send {
Disco(
Disco::new(
MAX_ENDPOINT_LATENCIES
.iter()
.enumerate()
Expand Down
21 changes: 11 additions & 10 deletions tower/src/balance/p2c/make.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::Balance;
use crate::discover::Discover;
use futures_core::ready;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::hash::Hash;
use std::marker::PhantomData;
use std::{
Expand Down Expand Up @@ -29,15 +29,16 @@ pub struct MakeBalance<S, Req> {
_marker: PhantomData<fn(Req)>,
}

/// A [`Balance`] in the making.
///
/// [`Balance`]: crate::balance::p2c::Balance
#[pin_project]
#[derive(Debug)]
pub struct MakeFuture<F, Req> {
#[pin]
inner: F,
_marker: PhantomData<fn(Req)>,
pin_project! {
/// A [`Balance`] in the making.
///
/// [`Balance`]: crate::balance::p2c::Balance
#[derive(Debug)]
pub struct MakeFuture<F, Req> {
#[pin]
inner: F,
_marker: PhantomData<fn(Req)>,
}
}

impl<S, Req> MakeBalance<S, Req> {
Expand Down
27 changes: 14 additions & 13 deletions tower/src/balance/p2c/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::load::Load;
use crate::ready_cache::{error::Failed, ReadyCache};
use futures_core::ready;
use futures_util::future::{self, TryFutureExt};
use pin_project::pin_project;
use pin_project_lite::pin_project;
use rand::{rngs::SmallRng, Rng, SeedableRng};
use std::hash::Hash;
use std::marker::PhantomData;
Expand Down Expand Up @@ -59,18 +59,19 @@ where
}
}

/// A Future that becomes satisfied when an `S`-typed service is ready.
///
/// May fail due to cancelation, i.e., if [`Discover`] removes the service from the service set.
#[pin_project]
#[derive(Debug)]
struct UnreadyService<K, S, Req> {
key: Option<K>,
#[pin]
cancel: oneshot::Receiver<()>,
service: Option<S>,

_req: PhantomData<Req>,
pin_project! {
/// A Future that becomes satisfied when an `S`-typed service is ready.
///
/// May fail due to cancelation, i.e., if [`Discover`] removes the service from the service set.
#[derive(Debug)]
struct UnreadyService<K, S, Req> {
key: Option<K>,
#[pin]
cancel: oneshot::Receiver<()>,
service: Option<S>,

_req: PhantomData<Req>,
}
}

enum Error<E> {
Expand Down
37 changes: 19 additions & 18 deletions tower/src/balance/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::discover::Change;
use crate::load::Load;
use crate::make::MakeService;
use futures_core::{ready, Stream};
use pin_project::pin_project;
use pin_project_lite::pin_project;
use slab::Slab;
use std::{
fmt,
Expand All @@ -42,23 +42,24 @@ enum Level {
High,
}

/// A wrapper around `MakeService` that discovers a new service when load is high, and removes a
/// service when load is low. See [`Pool`].
#[pin_project]
pub struct PoolDiscoverer<MS, Target, Request>
where
MS: MakeService<Target, Request>,
{
maker: MS,
#[pin]
making: Option<MS::Future>,
target: Target,
load: Level,
services: Slab<()>,
died_tx: tokio::sync::mpsc::UnboundedSender<usize>,
#[pin]
died_rx: tokio::sync::mpsc::UnboundedReceiver<usize>,
limit: Option<usize>,
pin_project! {
/// A wrapper around `MakeService` that discovers a new service when load is high, and removes a
/// service when load is low. See [`Pool`].
pub struct PoolDiscoverer<MS, Target, Request>
where
MS: MakeService<Target, Request>,
{
maker: MS,
#[pin]
making: Option<MS::Future>,
target: Target,
load: Level,
services: Slab<()>,
died_tx: tokio::sync::mpsc::UnboundedSender<usize>,
#[pin]
died_rx: tokio::sync::mpsc::UnboundedReceiver<usize>,
limit: Option<usize>,
}
}

impl<MS, Target, Request> fmt::Debug for PoolDiscoverer<MS, Target, Request>
Expand Down
51 changes: 31 additions & 20 deletions tower/src/buffer/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,50 @@
use super::{error::Closed, message};
use futures_core::ready;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

/// Future that completes when the buffered service eventually services the submitted request.
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
state: ResponseState<T>,
pin_project! {
/// Future that completes when the buffered service eventually services the submitted request.
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
state: ResponseState<T>,
}
}

#[pin_project(project = ResponseStateProj)]
#[derive(Debug)]
enum ResponseState<T> {
Failed(Option<crate::BoxError>),
Rx(#[pin] message::Rx<T>),
Poll(#[pin] T),
pin_project! {
#[project = ResponseStateProj]
#[derive(Debug)]
enum ResponseState<T> {
Failed {
error: Option<crate::BoxError>,
},
Rx {
#[pin]
rx: message::Rx<T>,
},
Poll {
#[pin]
fut: T,
},
}
}

impl<T> ResponseFuture<T> {
pub(crate) fn new(rx: message::Rx<T>) -> Self {
ResponseFuture {
state: ResponseState::Rx(rx),
state: ResponseState::Rx { rx },
}
}

pub(crate) fn failed(err: crate::BoxError) -> Self {
ResponseFuture {
state: ResponseState::Failed(Some(err)),
state: ResponseState::Failed { error: Some(err) },
}
}
}
Expand All @@ -53,15 +64,15 @@ where

loop {
match this.state.as_mut().project() {
ResponseStateProj::Failed(e) => {
return Poll::Ready(Err(e.take().expect("polled after error")));
ResponseStateProj::Failed { error } => {
return Poll::Ready(Err(error.take().expect("polled after error")));
}
ResponseStateProj::Rx(rx) => match ready!(rx.poll(cx)) {
Ok(Ok(f)) => this.state.set(ResponseState::Poll(f)),
ResponseStateProj::Rx { rx } => match ready!(rx.poll(cx)) {
Ok(Ok(fut)) => this.state.set(ResponseState::Poll { fut }),
Ok(Err(e)) => return Poll::Ready(Err(e.into())),
Err(_) => return Poll::Ready(Err(Closed::new().into())),
},
ResponseStateProj::Poll(fut) => return fut.poll(cx).map_err(Into::into),
ResponseStateProj::Poll { fut } => return fut.poll(cx).map_err(Into::into),
}
}
}
Expand Down
Loading

0 comments on commit ee131aa

Please sign in to comment.