Skip to content

Commit

Permalink
Make exception after cancellation test more robust
Browse files Browse the repository at this point in the history
Ensure that the RX error handler actually gets invoked
  • Loading branch information
1zaman committed Jul 29, 2020
1 parent 3cc9b94 commit 58ad0cd
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 18 deletions.
21 changes: 12 additions & 9 deletions reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
package kotlinx.coroutines.rx2

import io.reactivex.*
import io.reactivex.plugins.*
import io.reactivex.exceptions.*
import kotlinx.coroutines.*
import kotlinx.coroutines.CancellationException
import org.junit.*
import org.junit.Test
import java.util.concurrent.*
import java.util.concurrent.atomic.*
import kotlin.test.*

class ObservableTest : TestBase() {
Expand Down Expand Up @@ -140,24 +141,26 @@ class ObservableTest : TestBase() {

@Test
fun testExceptionAfterCancellation() {
// Test that no exceptions were reported to the global EH (it will fail the test if so)
val handler = { e: Throwable ->
assertFalse(e is CancellationException)
// Test that no exceptions were reported to the coroutine EH (it will fail the test if so)
val coroutineExceptionHandler = CoroutineExceptionHandler { _, _ -> expectUnreached() }
val rxExceptionHandlerInvocations = AtomicInteger()
val rxExceptionHandler: (Throwable) -> Unit = { error ->
assertTrue(error is UndeliverableException)
assertTrue(error.cause is TestException)
rxExceptionHandlerInvocations.getAndIncrement()
}
withExceptionHandler(handler) {
RxJavaPlugins.setErrorHandler {
require(it !is CancellationException)
}
withExceptionHandler(rxExceptionHandler) {
Observable
.interval(1, TimeUnit.MILLISECONDS)
.take(1000)
.switchMapSingle {
rxSingle {
rxSingle(coroutineExceptionHandler) {
timeBomb().await()
}
}
.blockingSubscribe({}, {})
}
assertTrue(rxExceptionHandlerInvocations.get() > 0)
}

private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS).doOnSuccess { throw TestException() }
Expand Down
21 changes: 12 additions & 9 deletions reactive/kotlinx-coroutines-rx3/test/ObservableTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
package kotlinx.coroutines.rx3

import io.reactivex.rxjava3.core.*
import io.reactivex.rxjava3.plugins.*
import io.reactivex.rxjava3.exceptions.*
import kotlinx.coroutines.*
import kotlinx.coroutines.CancellationException
import org.junit.*
import org.junit.Test
import java.util.concurrent.*
import java.util.concurrent.atomic.*
import kotlin.test.*

class ObservableTest : TestBase() {
Expand Down Expand Up @@ -140,24 +141,26 @@ class ObservableTest : TestBase() {

@Test
fun testExceptionAfterCancellation() {
// Test that no exceptions were reported to the global EH (it will fail the test if so)
val handler = { e: Throwable ->
assertFalse(e is CancellationException)
// Test that no exceptions were reported to the coroutine EH (it will fail the test if so)
val coroutineExceptionHandler = CoroutineExceptionHandler { _, _ -> expectUnreached() }
val rxExceptionHandlerInvocations = AtomicInteger()
val rxExceptionHandler: (Throwable) -> Unit = { error ->
assertTrue(error is UndeliverableException)
assertTrue(error.cause is TestException)
rxExceptionHandlerInvocations.getAndIncrement()
}
withExceptionHandler(handler) {
RxJavaPlugins.setErrorHandler {
require(it !is CancellationException)
}
withExceptionHandler(rxExceptionHandler) {
Observable
.interval(1, TimeUnit.MILLISECONDS)
.take(1000)
.switchMapSingle {
rxSingle {
rxSingle(coroutineExceptionHandler) {
timeBomb().await()
}
}
.blockingSubscribe({}, {})
}
assertTrue(rxExceptionHandlerInvocations.get() > 0)
}

private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS).doOnSuccess { throw TestException() }
Expand Down

0 comments on commit 58ad0cd

Please sign in to comment.