-
Notifications
You must be signed in to change notification settings - Fork 172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add SubscriptionSink::pipe_from_try_stream
to support streams that returns Result
#720
Conversation
The rationale for this is that it is more flexible for use cases when `Stream<Item = Result<T, Error>>`. Take for example `tokio_stream::Broadcast` then one would have to something like: ```rust let stream = BroadcastStream::new(rx).take_while(|r| future::ready(r.is_ok())).filter_map(|r| future::ready(r.ok())); ``` Of course it's a bit awkward to return `Result` when the underlying stream can't fail but I think that's fair trade-off here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, lgtm 👍 .
Only nit is the failing gitlab test:
tokio::spawn(sink.pipe_from_stream(stream));
| ---------------- ^^^^^^ the trait `StdError` is not implemented for `anyhow::Error`
Co-authored-by: Tarik Gul <[email protected]>
…nto na-resultify-pipe-from-stream
assert_eq!(sub.next().await.unwrap().unwrap(), 1); | ||
let exp = SubscriptionClosed::new(SubscriptionClosedReason::Server(err.to_string())); | ||
// The server closed down the subscription with the underlying error from the stream. | ||
assert!(matches!(sub.next().await, Some(Err(Error::SubscriptionClosed(close_reason))) if close_reason == exp)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth just calling sub.next().await
once more after this to confirm that it's None
and that the thing definitely won't send any more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, just found a bug let's tackle it another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! A couple of thoughts but nothing blocking; do as you wish with them :)
pipe_from_stream
take stream of resultpipe_from_try_stream
to support streams that returns Result
pipe_from_try_stream
to support streams that returns Result
SubscriptionSink::pipe_from_try_stream
to support streams that returns Result
The rationale for this is that it is more flexible for use cases when
Stream<Item = Result<T, Error>>
.Take for example
tokio_stream::Broadcast
then one would have to something like:Note, I realized that
pipe_from_stream
will work if Error: Serialize bound is satisfied but that's unlikely for the error types but I added a comment about that