-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: avoid race condition between pending frames and closing stream #156
Conversation
fdec4f3
to
6c03282
Compare
Would the following alternative approach as well be an option?
Given that the above would not require a new command variant and would not require sending @thomaseizinger what do you think? Am I missing something? |
I like that idea! Much simpler. Will implement! :) |
There is one caveat. |
As far as I can tell, this should be fine. We could even not call |
I am not sure what you are trying to say? The point of this PR is to have FWIW: I attempted to fix this upstream: rust-lang/futures-rs#2746 |
poll_flush
properlyStream::poll_flush
properly
Ah I forgot about this. This is a bit trickier to implement because I currently use |
Stream::poll_flush
properly
@mxinden This is now available for review again. I implemented your suggestion. It is the much better design :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me overall, though one suggestion around using a SelectAll
.
yamux/src/connection.rs
Outdated
for i in (0..self.stream_receivers.len()).rev() { | ||
let (id, mut receiver) = self.stream_receivers.swap_remove(i); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Polling each stream Receiver
even though only one might be Poll::Ready
seems wasteful. Would SelectAll
not be an option? One would wrap each Receiver
such that it returns the StreamId
when the Receiver
returns Poll::Ready(None)
. With that StreamId
we can then call self.on_drop_stream(id)
.
(On consecutive polls the wrapper can return Poll::Ready(None)
and thus it would be cleaned up by the SelectAll
.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a SelectAll
based design before but dropped it because I couldn't detect None
from the outside.
Detecting that requires another data structure for translating between the stream commands and the actual frames in connection (the wrapping you mentioned).
This also needs to work for the Cleanup
and Close
case.
I tried hiding it in a custom collection object but we need access to the stream IntMap
upon drop so that needs even more refactoring but would be a clean design.
I can invest the time if you want but I don't think it is a quick 30min refactoring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wrong, it was actually a super quick refactoring, now that I've seen how tokio's StreamMap
works. They recommend an additional wrapper around Stream
. The trick is to wrap the output of the Stream
again such that you get a tuple of (key, Option<Item>)
which allows you to detect closing of the stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the follow-up. Also great to be aware of StreamMap
.
@@ -16,6 +16,7 @@ nohash-hasher = "0.2" | |||
parking_lot = "0.12" | |||
rand = "0.8.3" | |||
static_assertions = "1" | |||
pin-project = "1.1.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using pin-project
, we could as well implement Unpin
for Stream
.
impl Unpin for Stream {}
That said, I doubt there is any async Rust code-base without pin-project
in its dependency tree already. Thus fine to include it here as well.
Currently, we have a
garbage_collect
function that checks whether any of our streams have been dropped. This can cause a race condition where the channel between aStream
and theConnection
still has pending frames for a stream but dropping a stream causes us to already send aFIN
flag for the stream.We fix this by maintaining a single channel for each stream. When a stream gets dropped, the
Receiver
becomes disconnected. We use this information to queue the correct frame (FIN
vsRST
) into the buffer. At this point, all previous frames have already been processed and the race condition is thus not present.Additionally, this also allows us to implement
Stream::poll_flush
by forwarding to the underlyingSender
. Note that at present day, this only checks whether there is space in the channel, not whether the items have been emitted by theReceiver
.We have a PR upstream that might fix this: rust-lang/futures-rs#2746
Fixes: #117.