Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update to Tokio 0.3 #476

Merged
merged 21 commits into from
Oct 27, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions tower-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ edition = "2018"

[dependencies]
futures-util = { version = "0.3", default-features = false }
tokio = { version = "0.2", features = ["sync"]}
tokio = { version = "0.3", features = ["sync"]}
tower-layer = { version = "0.3", path = "../tower-layer" }
tokio-test = "0.2"
tokio-test = "0.3"
tower-service = { version = "0.3" }
pin-project = "0.4.17"

[dev-dependencies]
tokio = { version = "0.2", features = ["macros"] }
tokio = { version = "0.3", features = ["macros"] }
12 changes: 6 additions & 6 deletions tower/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ edition = "2018"
[features]
default = ["log"]
log = ["tracing/log"]
balance = ["discover", "load", "ready-cache", "make", "rand", "slab"]
buffer = ["tokio/sync", "tokio/rt-core"]
balance = ["discover", "load", "ready-cache", "make", "rand", "slab", "tokio/stream"]
buffer = ["tokio/sync", "tokio/rt", "tokio/stream"]
discover = []
filter = []
hedge = ["util", "filter", "futures-util", "hdrhistogram", "tokio/time"]
Expand All @@ -38,7 +38,7 @@ make = ["tokio/io-std"]
ready-cache = ["futures-util", "indexmap", "tokio/sync"]
reconnect = ["make", "tokio/io-std"]
retry = ["tokio/time"]
spawn-ready = ["futures-util", "tokio/sync", "tokio/rt-core"]
spawn-ready = ["futures-util", "tokio/sync", "tokio/rt"]
steer = ["futures-util"]
timeout = ["tokio/time"]
util = ["futures-util"]
Expand All @@ -55,14 +55,14 @@ hdrhistogram = { version = "6.0", optional = true }
indexmap = { version = "1.0.2", optional = true }
rand = { version = "0.7", features = ["small_rng"], optional = true }
slab = { version = "0.4", optional = true }
tokio = { version = "0.2", optional = true, features = ["sync"] }
tokio = { version = "0.3", optional = true, features = ["sync"] }

[dev-dependencies]
futures-util = { version = "0.3", default-features = false, features = ["alloc", "async-await"] }
hdrhistogram = "6.0"
quickcheck = { version = "0.9", default-features = false }
tokio = { version = "0.2", features = ["macros", "stream", "sync", "test-util" ] }
tokio-test = "0.2"
tokio = { version = "0.3", features = ["macros", "stream", "sync", "test-util", "rt-multi-thread"] }
tokio-test = "0.3"
tower-test = { version = "0.3", path = "../tower-test" }
tracing-subscriber = "0.1.1"
# env_logger = { version = "0.5.3", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion tower/examples/tower-balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ fn gen_disco() -> impl Discover<
let latency = Duration::from_millis(rand::thread_rng().gen_range(0, maxms));

async move {
time::delay_until(start + latency).await;
time::sleep_until(start + latency).await;
let latency = start.elapsed();
Ok(Rsp { latency, instance })
}
Expand Down
2 changes: 1 addition & 1 deletion tower/src/balance/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();

while let Poll::Ready(Some(sid)) = this.died_rx.as_mut().poll_recv(cx) {
while let Poll::Ready(Some(sid)) = this.died_rx.as_mut().poll_next(cx) {
this.services.remove(sid);
tracing::trace!(
pool.services = this.services.len(),
Expand Down
1 change: 1 addition & 0 deletions tower/src/buffer/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub(crate) struct Message<Request, Fut> {
pub(crate) request: Request,
pub(crate) tx: Tx<Fut>,
pub(crate) span: tracing::Span,
pub(super) _permit: crate::semaphore::Permit,
}

/// Response sender
Expand Down
74 changes: 49 additions & 25 deletions tower/src/buffer/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use super::{
worker::{Handle, Worker},
};

use crate::semaphore::Semaphore;
use futures_core::ready;
use std::task::{Context, Poll};
use tokio::sync::{mpsc, oneshot};
Expand All @@ -17,7 +18,19 @@ pub struct Buffer<T, Request>
where
T: Service<Request>,
{
tx: mpsc::Sender<Message<Request, T::Future>>,
// Note: this actually _is_ bounded, but rather than using Tokio's unbounded
// channel, we use tokio's semaphore separately to implement the bound.
tx: mpsc::UnboundedSender<Message<Request, T::Future>>,
// When the buffer's channel is full, we want to exert backpressure in
// `poll_ready`, so that callers such as load balancers could choose to call
// another service rather than waiting for buffer capacity.
//
// Unfortunately, this can't be done easily using Tokio's bounded MPSC
// channel, because it doesn't expose a polling-based interface, only an
// `async fn ready`, which borrows the sender. Therefore, we implement our
// own bounded MPSC on top of the unbounded channel, using a semaphore to
// limit how many items are in the channel.
semaphore: Semaphore,
handle: Handle,
}

Expand Down Expand Up @@ -50,10 +63,9 @@ where
T::Error: Send + Sync,
Request: Send + 'static,
{
let (tx, rx) = mpsc::channel(bound);
let (handle, worker) = Worker::new(service, rx);
let (service, worker) = Self::pair(service, bound);
tokio::spawn(worker);
Buffer { tx, handle }
service
}

/// Creates a new `Buffer` wrapping `service`, but returns the background worker.
Expand All @@ -67,9 +79,17 @@ where
T::Error: Send + Sync,
Request: Send + 'static,
{
let (tx, rx) = mpsc::channel(bound);
let (tx, rx) = mpsc::unbounded_channel();
let (handle, worker) = Worker::new(service, rx);
(Buffer { tx, handle }, worker)
let semaphore = Semaphore::new(bound);
(
Buffer {
tx,
handle,
semaphore,
},
worker,
)
}

fn get_worker_error(&self) -> crate::BoxError {
Expand All @@ -87,12 +107,18 @@ where
type Future = ResponseFuture<T::Future>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// If the inner service has errored, then we error here.
if let Err(_) = ready!(self.tx.poll_ready(cx)) {
Poll::Ready(Err(self.get_worker_error()))
} else {
Poll::Ready(Ok(()))
tracing::info!("poll");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had issues with this test previously, but also by the looks of these tracing statements you did too? Probably should remove these :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh whoops, didn't mean to leave those in!

if self.tx.is_closed() {
tracing::trace!("closed");
// If the inner service has errored, then we error here.
return Poll::Ready(Err(self.get_worker_error()));
}

tracing::trace!("poll sem");
ready!(self.semaphore.poll_ready(cx));

tracing::trace!("acquired");
Poll::Ready(Ok(()))
}

fn call(&mut self, request: Request) -> Self::Future {
Expand All @@ -107,20 +133,17 @@ where
// towards that span since the worker would have no way of entering it.
let span = tracing::Span::current();
tracing::trace!(parent: &span, "sending request to buffer worker");
match self.tx.try_send(Message { request, span, tx }) {
Err(mpsc::error::TrySendError::Closed(_)) => {
ResponseFuture::failed(self.get_worker_error())
}
Err(mpsc::error::TrySendError::Full(_)) => {
// When `mpsc::Sender::poll_ready` returns `Ready`, a slot
// in the channel is reserved for the handle. Other `Sender`
// handles may not send a message using that slot. This
// guarantees capacity for `request`.
//
// Given this, the only way to hit this code path is if
// `poll_ready` has not been called & `Ready` returned.
panic!("buffer full; poll_ready must be called first");
}
let _permit = self
.semaphore
.take_permit()
.expect("buffer full; poll_ready must be called first");
match self.tx.send(Message {
request,
span,
tx,
_permit,
}) {
Err(_) => ResponseFuture::failed(self.get_worker_error()),
Ok(_) => ResponseFuture::new(rx),
}
}
Expand All @@ -134,6 +157,7 @@ where
Self {
tx: self.tx.clone(),
handle: self.handle.clone(),
semaphore: self.semaphore.clone(),
}
}
}
20 changes: 10 additions & 10 deletions tower/src/buffer/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::mpsc;
use tokio::{stream::Stream, sync::mpsc};
use tower_service::Service;

/// Task that handles processing the buffer. This type should not be used
Expand All @@ -28,7 +28,7 @@ where
T::Error: Into<crate::BoxError>,
{
current_message: Option<Message<Request, T::Future>>,
rx: mpsc::Receiver<Message<Request, T::Future>>,
rx: mpsc::UnboundedReceiver<Message<Request, T::Future>>,
service: T,
finish: bool,
failed: Option<ServiceError>,
Expand All @@ -48,7 +48,7 @@ where
{
pub(crate) fn new(
service: T,
rx: mpsc::Receiver<Message<Request, T::Future>>,
rx: mpsc::UnboundedReceiver<Message<Request, T::Future>>,
) -> (Handle, Worker<T, Request>) {
let handle = Handle {
inner: Arc::new(Mutex::new(None)),
Expand Down Expand Up @@ -80,11 +80,11 @@ where
}

tracing::trace!("worker polling for next message");
if let Some(mut msg) = self.current_message.take() {
// poll_closed returns Poll::Ready is the receiver is dropped.
// Returning Pending means it is still alive, so we should still
// use it.
if msg.tx.poll_closed(cx).is_pending() {
if let Some(msg) = self.current_message.take() {
// If the oneshot sender is closed, then the receiver is dropped,
// and nobody cares about the response. If this is the case, we
// should continue to the next request.
if !msg.tx.is_closed() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

n.b. that tokio 0.3.2 is adding back oneshot::Sender::poll_closed, but I don't think we actually need to poll here, since we immediately consume the sender if it isn't closed. we don't need to register interest in it closing, since we're not going to yield until we're done with that sender...so is_closed is probably slightly more efficient, too.

tracing::trace!("resuming buffered request");
return Poll::Ready(Some((msg, false)));
}
Expand All @@ -93,8 +93,8 @@ where
}

// Get the next request
while let Some(mut msg) = ready!(Pin::new(&mut self.rx).poll_recv(cx)) {
if msg.tx.poll_closed(cx).is_pending() {
while let Some(msg) = ready!(Pin::new(&mut self.rx).poll_next(cx)) {
if !msg.tx.is_closed() {
tracing::trace!("processing new request");
return Poll::Ready(Some((msg, true)));
}
Expand Down
6 changes: 3 additions & 3 deletions tower/src/hedge/delay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ where
#[pin_project(project = StateProj)]
#[derive(Debug)]
enum State<Request, F> {
Delaying(#[pin] tokio::time::Delay, Option<Request>),
Delaying(#[pin] tokio::time::Sleep, Option<Request>),
Called(#[pin] F),
}

Expand Down Expand Up @@ -70,10 +70,10 @@ where
}

fn call(&mut self, request: Request) -> Self::Future {
let deadline = tokio::time::Instant::now() + self.policy.delay(&request);
let delay = self.policy.delay(&request);
ResponseFuture {
service: Some(self.service.clone()),
state: State::Delaying(tokio::time::delay_until(deadline), Some(request)),
state: State::Delaying(tokio::time::sleep(delay), Some(request)),
}
}
}
Expand Down
7 changes: 1 addition & 6 deletions tower/src/hedge/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
//! Pre-emptively retry requests which have been outstanding for longer
//! than a given latency percentile.

#![warn(
missing_debug_implementations,
missing_docs,
rust_2018_idioms,
unreachable_pub
)]
#![warn(missing_debug_implementations, missing_docs, unreachable_pub)]

use crate::filter::Filter;
use futures_util::future;
Expand Down
3 changes: 3 additions & 0 deletions tower/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ pub use tower_layer::Layer;
#[doc(inline)]
pub use tower_service::Service;

#[cfg(any(feature = "buffer", feature = "limit"))]
mod semaphore;

#[allow(unreachable_pub)]
mod sealed {
pub trait Sealed<T> {}
Expand Down
6 changes: 3 additions & 3 deletions tower/src/limit/concurrency/future.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
//! Future types
//!
use crate::semaphore::Permit;
use futures_core::ready;
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::OwnedSemaphorePermit;

/// Future for the `ConcurrencyLimit` service.
#[pin_project]
Expand All @@ -16,11 +16,11 @@ pub struct ResponseFuture<T> {
#[pin]
inner: T,
// Keep this around so that it is dropped when the future completes
_permit: OwnedSemaphorePermit,
_permit: Permit,
}

impl<T> ResponseFuture<T> {
pub(crate) fn new(inner: T, _permit: OwnedSemaphorePermit) -> ResponseFuture<T> {
pub(crate) fn new(inner: T, _permit: Permit) -> ResponseFuture<T> {
ResponseFuture { inner, _permit }
}
}
Expand Down
Loading