-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consistently handle exceptions in reactive streams #2646
Conversation
I will be adding some tests to this PR but would appreciate some feedback on the implementation in the meantime. |
f7c53ae
to
ed1cf6d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's amazing and thoughtful work from you, thanks!
ce6cc5f
to
bee8e90
Compare
* Fixed `PublisherCoroutine` and `rxObservable` ignoring cancellations. * Fatal exceptions are not treated in a special manner by us anymore. Instead, we follow the requirement in the reactive streams specification that, in case some method of `Subscriber` throws, that subscriber MUST be considered cancelled, and the exception MUST be reported in some place other than `onError`. * Fixed `trySend` sometimes throwing in `PublisherCoroutine` and `rxObservable`. * When an exception happens inside a cancellation handler, we now consistently throw the original exception passed to the handler, with the new exception added as suppressed.
bee8e90
to
8f9c70d
Compare
* Fixed `doLockedNext` not releasing the lock in `PublisherCoroutine` if `null` is emitted * Fixed `flux`, `publish`, `rxObservable` and `rxFlowable` incorrectly reporting `isClosedForSend == true` just after closing the channel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to go
subscriber.onComplete() | ||
} else { | ||
@Suppress("INVISIBLE_MEMBER") | ||
val unwrappedCause = unwrap(cause) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#2551 filed a use-case
* Fixed `PublisherCoroutine`, `rxObservable`, and `Flow.toPublisher` ignoring cancellations. * Fatal exceptions are not treated in a special manner by us anymore. Instead, we follow the requirement in the reactive streams specification that, in case some method of `Subscriber` throws, that subscriber MUST be considered canceled, and the exception MUST be reported in someplace other than `onError`. * Fixed `trySend` sometimes throwing in `PublisherCoroutine` and `rxObservable`. * When an exception happens inside a cancellation handler, we now consistently throw the original exception passed to the handler, with the new exception added as suppressed. * Fixed `PublisherCoroutine` and `rxObservable` claiming that the channel is not closed for send for some time after `close()` has finished. * Fixed publishers sometimes signalling `onComplete()` after cancellation even though their streams are not finite. Fixes Kotlin#2173
PublisherCoroutine
,rxObservable
, andFlow.toPublisher
ignoring cancellations.anymore. Instead, we follow the requirement in the reactive
streams specification that, in case some method of
Subscriber
throws, that subscriber MUST be considered canceled, and the
exception MUST be reported in someplace other than
onError
.trySend
sometimes throwing inPublisherCoroutine
andrxObservable
.consistently throw the original exception passed to the handler,
with the new exception added as suppressed.
PublisherCoroutine
andrxObservable
claiming that thechannel is not closed for send for some time after
close()
hasfinished.
onComplete()
aftercancellation even though their streams are not finite.
Fixes #2173