Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inconsistent handling of undeliverable errors in RX builders #2173

Closed
1zaman opened this issue Jul 30, 2020 · 2 comments
Closed

Inconsistent handling of undeliverable errors in RX builders #2173

1zaman opened this issue Jul 30, 2020 · 2 comments
Assignees
Labels

Comments

@1zaman
Copy link
Contributor

1zaman commented Jul 30, 2020

While looking at the implementations of the RX coroutines builders in the kotlinx-coroutines-rx* modules, I noticed some inconsistencies in the handling of undeliverable errors between the rxObservable implementation on the one hand, and the rxSingle, rxMaybe, and rxCompletable implementations on the other.

In the rxSingle, rxMaybe, and rxCompletable implementations, when the coroutine is cancelled with an exception, it first tries to deliver it to the observer, and if that's not possible then it is reported to the RX global exception handler. This behaviour is the same regardless of whether the error is fatal or not.

Whereas in the rxObservable implementation, it drops undeliverable non-fatal errors, but always reports fatal errors to the RX global exception handler after attempting to deliver it to the observer, regardless of whether the observer could handle it or not.

The rxFlowable/publish implementation seems to be a combination of both, as it reports all undeliverable or fatal exceptions to the global exception handler. This seems to be the best and safest approach.

@1zaman
Copy link
Contributor Author

1zaman commented Jul 30, 2020

So the different handling of fatal exceptions in rxObservable seems to have been due to a potential issue with rxFlowable that was reported in #1297, where the subscriber throws an exception from the onNext callback, which leaves the operators that were processing the emission queue in an inconsistent state, and thus it does not handle the exception when the coroutine tries to emit it after being cancelled due to it. The standard subscriber that's used when subscribing with lambda callbacks catches any non-fatal exceptions that are emitted by the onNext callback and forwards them to onError, but does not catch fatal exceptions, thus causing this issue. The standard flowable builders don't catch any exceptions that are emitted by the subscriber, and just let fatal exceptions propagate and crash the app, so they don't encounter this problem.

This problem seems to be specific to the rxFlowable/publish implementation due to it's usage of a locked emission queue, and not present in rxObservable, but the fix was applied consistently to both. However, it does not apply in the rxSingle, rxMaybe, and rxCompletable implementations, since they don't have any non-terminal emissions that can throw an exception while the coroutine is still running, and any exception from a terminal emission can be caught and handled right there by passing it to the global exception handler.

This explains the special handing of fatal exceptions in rxObservable and rxFlowable. However, it still results in inconsistency with the other RX builders in the case where the coroutine itself throws a fatal exception. We can consider changing the other implementations to have the same behaviour as rxObservable for consistency.

This would also be in line with the standard RX builders, which also let fatal exceptions thrown by the emitter function propagate to the uncaught exception handler. However, it seems that coroutines don't have the concept of fatal exceptions that should not be caught, so all exceptions are caught and emitted by them, and therefore the current behaviour of rxObservable in handling them when the coroutine completes by reporting them to the RX global exception handler makes sense I suppose

1zaman added a commit to 1zaman/kotlinx.coroutines that referenced this issue Jul 31, 2020
If an error can't be delivered to the observer due to the observer
already having disposed the subscription, then it should be reported
to the RX global exception handler. This was already being done in
the implementations for rxSingle, rxMaybe, and rxCompletable, but
not in rxObservable.

Copied the test over from the other implementations as well.

Also removed the previous test in ObservableTest, which actually was
testing rxSingle instead of rxObservable, and was too complicated
and dependent on race conditions (both rxSingle and rxObservable now
have a simple and deterministic test for undeliverable error
handling). It was also not well implemented, and I originally tried
to fix it to be more robust in commit 58ad0cd.

Fixes Kotlin#2173.
@qwwdfsad qwwdfsad self-assigned this Aug 10, 2020
@dkhalanskyjb
Copy link
Collaborator

Thank you for raising this!

Yes, your analysis is spot-on, the root cause for the original problem was that onNext rethrows fatal exceptions but not the other ones and that we consistently send the errors flying around to onError, which landed in the emission queue.

Looking at the way RxJava treats fatal exceptions, it doesn't seem though like we could behave consistently with them in spirit: they attempt to let fatal exceptions bubble up to the surface as fast as possible, except in scenarios when the fatal exception is being passed to onError, in which case RxJava treats these exceptions like any others. Especially with the recent introduction of trySend, a non-throwing attempt at sending an element through a channel, we won't be able to arbitrarily throw fatal exceptions without violating the existing contracts.

However, it seems like there is a simple solution imposed by the Reactive Streams specification itself: https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2.13 It says that in case any methods of Subscriber throw, the publisher must report the errors not to onError but somewhere else. Following this, we can neatly solve the original problem: the error doesn't become lost in the emission queue, as it gets passed to the handler for undeliverable exceptions.

pablobaxter pushed a commit to pablobaxter/kotlinx.coroutines that referenced this issue Sep 14, 2022
* Fixed `PublisherCoroutine`, `rxObservable`, and
  `Flow.toPublisher` ignoring cancellations.
* Fatal exceptions are not treated in a special manner by us
  anymore. Instead, we follow the requirement in the reactive
  streams specification that, in case some method of `Subscriber`
  throws, that subscriber MUST be considered canceled, and the
  exception MUST be reported in someplace other than `onError`.
* Fixed `trySend` sometimes throwing in `PublisherCoroutine` and
  `rxObservable`.
* When an exception happens inside a cancellation handler, we now
  consistently throw the original exception passed to the handler,
  with the new exception added as suppressed.
* Fixed `PublisherCoroutine` and `rxObservable` claiming that the
  channel is not closed for send for some time after `close()` has
  finished.
* Fixed publishers sometimes signalling `onComplete()` after
  cancellation even though their streams are not finite.

Fixes Kotlin#2173
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants