Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle undeliverable errors in rxObservable #2178

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,10 @@ private class RxObservableCoroutine<T: Any>(
* Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
* the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
* thrown by subscriber or upstream).
* To make behaviour consistent and least surprising, we always handle fatal exceptions
* by coroutines machinery, anyway, they should not be present in regular program flow,
* thus our goal here is just to expose it as soon as possible.
* To make behaviour consistent and least surprising, we always deliver fatal exceptions to the
* RX global exception handler.
Comment on lines -167 to +168
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was changed in #1638, but seems like the comment was not updated.

*/
subscriber.tryOnError(cause)
if (!handled && cause.isFatal()) {
if (!subscriber.tryOnError(cause) || (!handled && cause.isFatal())) {
Comment on lines -171 to +170
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the handled flag is redundant here after the earlier check for CancellationException, since it could only be handled in the current implementation if it was a CancellationException. There doesn't seem to be any special handling of fatal exceptions by the coroutine. However, I have left that logic as it was here.

handleUndeliverableException(cause, context)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package kotlinx.coroutines.rx2

import io.reactivex.*
import io.reactivex.disposables.*
import io.reactivex.exceptions.*
import kotlinx.coroutines.*
import org.junit.*
Expand Down Expand Up @@ -131,4 +133,46 @@ class ObservableExceptionHandlingTest : TestBase() {
}, { expect(3) })
finish(5)
}

@Test
fun testUnhandledException() = runTest {
expect(1)
var disposable: Disposable? = null
val handler = { e: Throwable ->
assertTrue(e is UndeliverableException && e.cause is TestException)
expect(5)
}
val observable = rxObservable<Nothing>(currentDispatcher()) {
expect(4)
disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
try {
delay(Long.MAX_VALUE)
} finally {
throw TestException() // would not be able to handle it since mono is disposed
}
}
withExceptionHandler(handler) {
observable.subscribe(object : Observer<Nothing> {
override fun onSubscribe(d: Disposable) {
expect(2)
disposable = d
}

override fun onNext(t: Nothing) {
expectUnreached()
}

override fun onError(t: Throwable) {
expectUnreached()
}

override fun onComplete() {
expectUnreached()
}
})
expect(3)
yield() // run coroutine
finish(6)
}
}
}
27 changes: 0 additions & 27 deletions reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,10 @@

package kotlinx.coroutines.rx2

import io.reactivex.*
import io.reactivex.plugins.*
import kotlinx.coroutines.*
import kotlinx.coroutines.CancellationException
import org.junit.*
import org.junit.Test
import java.util.concurrent.*
import kotlin.test.*

class ObservableTest : TestBase() {
Expand Down Expand Up @@ -137,28 +134,4 @@ class ObservableTest : TestBase() {
expect(4)
}
}

@Test
fun testExceptionAfterCancellation() {
// Test that no exceptions were reported to the global EH (it will fail the test if so)
val handler = { e: Throwable ->
assertFalse(e is CancellationException)
}
withExceptionHandler(handler) {
RxJavaPlugins.setErrorHandler {
require(it !is CancellationException)
}
Observable
.interval(1, TimeUnit.MILLISECONDS)
.take(1000)
.switchMapSingle {
rxSingle {
timeBomb().await()
}
}
.blockingSubscribe({}, {})
}
}
Comment on lines -141 to -161
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test seems to have been taken from the issue reported at #252. However it's complicated and not well implemented (I originally tried to improve it in commit 58ad0cd). Also, it was actually testing rxSingle instead of rxObservable, which is why it didn't catch the issue in the first place that this pull request is trying to solve. rxSingle (and the other RX builders) already have a simpler and deterministic test for undeliverable error handling (testUnhandledException()), which I have copied over in the ObservableExceptionHandlingTest test suite, so I deleted this test since should be no further need for it.


private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS).doOnSuccess { throw TestException() }
}
8 changes: 3 additions & 5 deletions reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,10 @@ private class RxObservableCoroutine<T: Any>(
* Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
* the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
* thrown by subscriber or upstream).
* To make behaviour consistent and least surprising, we always handle fatal exceptions
* by coroutines machinery, anyway, they should not be present in regular program flow,
* thus our goal here is just to expose it as soon as possible.
* To make behaviour consistent and least surprising, we always deliver fatal exceptions to the
* RX global exception handler.
*/
subscriber.tryOnError(cause)
if (!handled && cause.isFatal()) {
if (!subscriber.tryOnError(cause) || (!handled && cause.isFatal())) {
handleUndeliverableException(cause, context)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package kotlinx.coroutines.rx3

import io.reactivex.rxjava3.core.*
import io.reactivex.rxjava3.disposables.*
import io.reactivex.rxjava3.exceptions.*
import kotlinx.coroutines.*
import org.junit.*
Expand Down Expand Up @@ -131,4 +133,46 @@ class ObservableExceptionHandlingTest : TestBase() {
}, { expect(3) })
finish(5)
}

@Test
fun testUnhandledException() = runTest {
expect(1)
var disposable: Disposable? = null
val handler = { e: Throwable ->
assertTrue(e is UndeliverableException && e.cause is TestException)
expect(5)
}
val observable = rxObservable<Nothing>(currentDispatcher()) {
expect(4)
disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
try {
delay(Long.MAX_VALUE)
} finally {
throw TestException() // would not be able to handle it since mono is disposed
}
}
withExceptionHandler(handler) {
observable.subscribe(object : Observer<Nothing> {
override fun onSubscribe(d: Disposable) {
expect(2)
disposable = d
}

override fun onNext(t: Nothing) {
expectUnreached()
}

override fun onError(t: Throwable) {
expectUnreached()
}

override fun onComplete() {
expectUnreached()
}
})
expect(3)
yield() // run coroutine
finish(6)
}
}
}
27 changes: 0 additions & 27 deletions reactive/kotlinx-coroutines-rx3/test/ObservableTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,10 @@

package kotlinx.coroutines.rx3

import io.reactivex.rxjava3.core.*
import io.reactivex.rxjava3.plugins.*
import kotlinx.coroutines.*
import kotlinx.coroutines.CancellationException
import org.junit.*
import org.junit.Test
import java.util.concurrent.*
import kotlin.test.*

class ObservableTest : TestBase() {
Expand Down Expand Up @@ -137,28 +134,4 @@ class ObservableTest : TestBase() {
expect(4)
}
}

@Test
fun testExceptionAfterCancellation() {
// Test that no exceptions were reported to the global EH (it will fail the test if so)
val handler = { e: Throwable ->
assertFalse(e is CancellationException)
}
withExceptionHandler(handler) {
RxJavaPlugins.setErrorHandler {
require(it !is CancellationException)
}
Observable
.interval(1, TimeUnit.MILLISECONDS)
.take(1000)
.switchMapSingle {
rxSingle {
timeBomb().await()
}
}
.blockingSubscribe({}, {})
}
}

private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS).doOnSuccess { throw TestException() }
}