Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(futures-util/stream): implement stream.unzip adapter #2256

Closed
wants to merge 3 commits into from

Conversation

binier
Copy link
Contributor

@binier binier commented Nov 5, 2020

The way this works is by putting the base stream in the Arc and sharing that Arc in UnzipLeft and UnzipRight streams.

Initially only UnzipLeft is polling the base stream, emits first tuple item as it's result and sends the other one through mpsc::channel to the UnzipRight. At the end of the stream, UnzipLeft sends Poll::Ready(None) through the channel to left the UnzipRight know stream is finished.

  • In case UnzipLeft is dropped, first it drops mpsc::Sender and then wakes up the UnzipRight task, which is a way of telling to UnzipRight to take over polling the base stream (Since try_recv called by right stream will error with TryRecvError::Disconnected if sender is dropped).
  • In case UnzipRight is dropped, right_waker.upgrade() will fail and return None since right_waker is a Weak type of Arc. If that happens, send simply won't happen.
  • If both is dropped, base stream will drop too.

There is unsafe code when converting base stream from Pin<&mut Arc<Stream>> to Pin<&mut Stream> in order to poll it, but since we are accessing base stream only from one of the streams at a time, it's safe right?

closes: #2234

@binier
Copy link
Contributor Author

binier commented Nov 5, 2020

I want to add some tests though, but not sure where to add them, since tests for example for adaptor zip is in disabled tests...

Comment on lines +9 to +12
/// SAFETY: safe because only one of two unzipped streams is guaranteed
/// to be accessing underlying stream. This is guaranteed by mpsc. Right
/// stream will access underlying stream only if Sender (or left stream)
/// is dropped in which case try_recv returns disconnected error.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct. You can call UnzipLeft(Right)::poll_next and UnzipRight(Left)::is_terminated at the same time.

Copy link
Contributor Author

@binier binier Nov 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right! But I can simply maintain is_done boolean in UnzipRight and set it to true if I receive Poll::Ready(None) right? For example Fuse<S> seems to do the same.

Or is there a case when: is_terminated() returns true for underline stream but it still hasn't emitted Poll::Ready(None) and won't ever emit it?

If thats the case, would UnzipLeft be dropped if it is_terminated? So would it be sufficient if is_terminated() returned:

  • is_done if left is not dropped.
  • stream.is_terminated() if left is dropped, since it's now safe to call that method.

@taiki-e
Copy link
Member

taiki-e commented Nov 7, 2020

Given that Iterator::unzip is like .collect::<(Vec<_>, Vec<_>)>, I would prefer to separate functionality like Shared Stream from Unzip.

I want to add some tests though, but not sure where to add them, since tests for example for adaptor zip is in disabled tests...

I recommend adding to futures/tests/stream.rs or futures/tests/stream_unzip.rs (new file).

@binier
Copy link
Contributor Author

binier commented Nov 7, 2020

I recommend adding to futures/tests/stream.rs or futures/tests/stream_unzip.rs (new file).

Thanks! Will add tests there.

Given that Iterator::unzip is like .collect::<(Vec<_>, Vec<_>)>, I would prefer to separate functionality like Shared Stream from Unzip.

Hm... If I understand this right, you mean that unzip should not separate streams, but I didn't quite get the alternative. What would the api look like for the unzip that you are talking about?
EDIT: never mind... Got what you are saying. Looked again in the Iterator::unzip docs and instead of generating two iterators (like what I am doing), it simply consumes iterator and gives an output.

What do you suggest I do with the code? Should I change it into share() or fork() operator instead? If yes what should api look like?

`UnzipRight::is_terminated(&self)` was accessing base stream, which should **never** happen, unless left is dropped. As a fix, manually maintain `is_done` field.
@binier binier closed this Nov 14, 2020
@taiki-e taiki-e added the A-stream Area: futures::stream label Jan 6, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-stream Area: futures::stream
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Missing StreamExt::unzip
2 participants