Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update to Tokio 0.3 #476

Merged
merged 21 commits into from
Oct 27, 2020
Merged

update to Tokio 0.3 #476

merged 21 commits into from
Oct 27, 2020

Conversation

hawkw
Copy link
Member

@hawkw hawkw commented Oct 26, 2020

This branch updates Tower to Tokio 0.3.

Unlike #474, this branch uses Tokio 0.3's synchronization primitives,
rather than continuing to depend on Tokio 0.2. I think that we ought to
try to use Tokio 0.3's channels whenever feasible, because the 0.2
channels have pathological memory usage patterns in some cases (see
tokio-rs/tokio#2637). @LucioFranco let me know what you think of the
approach used here and we can compare notes!

For the most part, this was a pretty mechanical change: updating
versions in Cargo.toml, tracking feature flag changes, renaming
tokio::time::delay to sleep, and so on. Tokio's channel receivers
also lost their poll_recv methods, but we can easily replicate that by
enabling the "stream" feature and using poll_next instead.

The one actually significant change is that tokio::sync::mpsc::Sender
lost its poll_ready method, which impacts the way tower::buffer is
implemeted. When the buffer's channel is full, we want to exert
backpressure in poll_ready, so that callers such as load balancers
could choose to call another service rather than waiting for buffer
capacity. Previously, we did this by calling poll_ready on the
underlying channel sender.

Unfortunately, this can't be done easily using Tokio 0.3's bounded MPSC
channel, because it no longer exposes a polling-based interface, only an
async fn ready, which borrows the sender. Therefore, we implement our
own bounded MPSC on top of the unbounded channel, using a semaphore to
limit how many items are in the channel.

I factored out the code for polling a semaphore acquire future from
limit::concurrency into its own module, and reused it in Buffer.

Additionally, the buffer tests needed to be updated, because they
currently don't actually poll the buffer service before calling it. This
violates the Service contract, and the new code actually fails as a
result.

Closes #473
Closes #474

Co-authored-by: Lucio Franco [email protected]

hawkw added 9 commits October 26, 2020 13:49
@LucioFranco this is the basic idea; there are a couple other things
that don't compile yet.

Signed-off-by: Eliza Weisman <[email protected]>
Signed-off-by: Eliza Weisman <[email protected]>
Signed-off-by: Eliza Weisman <[email protected]>
Signed-off-by: Eliza Weisman <[email protected]>
Signed-off-by: Eliza Weisman <[email protected]>
Signed-off-by: Eliza Weisman <[email protected]>
Signed-off-by: Eliza Weisman <[email protected]>
@hawkw hawkw self-assigned this Oct 26, 2020
Copy link
Member

@LucioFranco LucioFranco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM just a few tracing things left over, ill close my PR :)

Poll::Ready(Err(self.get_worker_error()))
} else {
Poll::Ready(Ok(()))
tracing::info!("poll");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had issues with this test previously, but also by the looks of these tracing statements you did too? Probably should remove these :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh whoops, didn't mean to leave those in!

Comment on lines 52 to 56
if let Poll::Ready(_) = {
let closed = this.tx.as_mut().expect("illegal state").closed();
tokio::pin!(closed);
closed.poll(cx)
} {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is a bit hard to read, can we move out some of this logic from this block?

@LucioFranco LucioFranco mentioned this pull request Oct 27, 2020
@hawkw
Copy link
Member Author

hawkw commented Oct 27, 2020

ill close my PR :)
@LucioFranco if there's anything you got that I missed, we could also merge one branch into the other?

Signed-off-by: Eliza Weisman <[email protected]>
@LucioFranco
Copy link
Member

@hawkw only last thing is my PR did some stuff with feature flags could you ensure you just copy that? beyond the tokio 0.2 sync stuff it fixed a few things and added a full flag.

@hawkw hawkw requested a review from LucioFranco October 27, 2020 17:01
hawkw added 3 commits October 27, 2020 10:03
Signed-off-by: Eliza Weisman <[email protected]>
Signed-off-by: Eliza Weisman <[email protected]>
// If the oneshot sender is closed, then the receiver is dropped,
// and nobody cares about the response. If this is the case, we
// should continue to the next request.
if !msg.tx.is_closed() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

n.b. that tokio 0.3.2 is adding back oneshot::Sender::poll_closed, but I don't think we actually need to poll here, since we immediately consume the sender if it isn't closed. we don't need to register interest in it closing, since we're not going to yield until we're done with that sender...so is_closed is probably slightly more efficient, too.

Signed-off-by: Eliza Weisman <[email protected]>
Copy link
Member

@LucioFranco LucioFranco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome stuff @hawkw thanks for getting this done!

// rather than just calling `is_closed` on it, since we want to be
// notified if the receiver is dropped.
let closed = {
// TODO(eliza): once `tokio` 0.3.2 is released, we can change this back
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open an issue for this?

@hawkw hawkw merged commit ddc64e8 into master Oct 27, 2020
@hawkw hawkw deleted the eliza/tokio-0.3 branch October 27, 2020 18:21
hawkw added a commit to linkerd/linkerd2-proxy that referenced this pull request Feb 17, 2021
…922)

The proxy currently has its own implementation of a `tower` `Service`
that makes an inner service `Clone`able by driving it in a spawned task
and buffering requests on a channel. This also exists upstream, as
`tower::buffer`.

We implemented our own version for a couple of reasons: to avoid an
upstream issue where memory was leaked when a buffered request was
cancelled, and to implement an idle timeout when the buffered service
has been unready for too long. However, it's no longer necessary to
reimplement our own buffer service for these reasons: the upstream bug
was fixed in `tower` 0.4 (see tower-rs/tower#476, tower-rs/tower#480,
and tower-rs/tower#556); and we no longer actually use the buffer idle
timeout (instead, we idle out unresponsive services with the separate
`Failfast` middleware, note that `push_spawn_buffer_with_idle_timeout`
is never actually used).

Therefore, we can remove our _sui generis_ implementation in favour of
`tower::buffer` from upstream. This eliminates dead code for the idle
timeout, which we never actually use, and reduces duplication (since
`tonic` uses `tower::buffer` internally, its code is already compiled
into the proxy). It also reduces the amount of code I'm personally
responsible for maintaining in two separate places ;)

Since the `linkerd-buffer` crate erases the type of the buffered
service, while `tower::buffer` does not, I've changed the
`push_spawn_buffer`/`spawn_buffer` helpers to also include a
`BoxService` layer. This required adding a `BoxServiceLayer` type, since
`BoxService::layer` returns a `LayerFn` with an unnameable type.

Also, this change ran into issues due to a compiler bug where generators
(async blocks) sometimes forget concrete lifetimes,
rust-lang/rust#64552. In order to resolve this, I had to remove the
outermost `async` blocks from the OpenCensus and identity daemon tasks.
These async blocks were used only for emitting a tracing event when the
task is started, so it wasn't a big deal to remove them; I moved the
trace events into the actual daemon task functions, and used a `tracing`
span to propagate the remote addresses which aren't known inside the
daemon task functions.

Signed-off-by: Eliza Weisman <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

update to tokio v0.3
2 participants