From 6ed1bd580db91d1f793d0767ebe9b8c78008f18b Mon Sep 17 00:00:00 2001 From: David Khol Date: Sat, 24 Jun 2023 14:02:55 +0900 Subject: [PATCH] Throw unconsumed events if the scope is cancelled --- .../kotlin/app/cash/turbine/Turbine.kt | 1 + .../kotlin/app/cash/turbine/flow.kt | 3 ++- .../app/cash/turbine/FlowInScopeTest.kt | 27 +++++++++++++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/src/commonMain/kotlin/app/cash/turbine/Turbine.kt b/src/commonMain/kotlin/app/cash/turbine/Turbine.kt index a191be8e..138f450a 100644 --- a/src/commonMain/kotlin/app/cash/turbine/Turbine.kt +++ b/src/commonMain/kotlin/app/cash/turbine/Turbine.kt @@ -211,6 +211,7 @@ internal class ChannelTurbine( var cause: Throwable? = null while (true) { val event = channel.takeEventUnsafe() ?: break + if (event is Event.Error && event.throwable is CancellationException) break if (!(ignoreTerminalEvents && event.isTerminal)) unconsumed += event if (event is Event.Error) { cause = event.throwable diff --git a/src/commonMain/kotlin/app/cash/turbine/flow.kt b/src/commonMain/kotlin/app/cash/turbine/flow.kt index 104fccda..1f948647 100644 --- a/src/commonMain/kotlin/app/cash/turbine/flow.kt +++ b/src/commonMain/kotlin/app/cash/turbine/flow.kt @@ -195,7 +195,8 @@ private fun testInInternal(flow: Flow, timeout: Duration?, scope: Corouti if (debug) println("Scope ending ${exception ?: ""}") // Only validate events were consumed if the scope is exiting normally. - if (exception == null) { + // CancellationException also indicates _normal_ cancellation of a coroutine. + if (exception == null || exception is CancellationException) { turbine.ensureAllEventsConsumed() } } diff --git a/src/commonTest/kotlin/app/cash/turbine/FlowInScopeTest.kt b/src/commonTest/kotlin/app/cash/turbine/FlowInScopeTest.kt index b9e65230..9c6b11e1 100644 --- a/src/commonTest/kotlin/app/cash/turbine/FlowInScopeTest.kt +++ b/src/commonTest/kotlin/app/cash/turbine/FlowInScopeTest.kt @@ -12,7 +12,9 @@ import kotlin.time.Duration.Companion.seconds import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletionHandlerException import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.Dispatchers.Default +import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay @@ -96,6 +98,31 @@ class FlowInScopeTest { assertFalse(collecting) } + @Test fun unconsumedItemThrowsWhenCancelledExternally() = runTestTurbine { + // We have to use an exception handler rather than assertFailsWith because runTest also uses + // one which defers throwing until its block completes. + val exceptionHandler = RecordingExceptionHandler() + launch(start = CoroutineStart.UNDISPATCHED) { + withContext(exceptionHandler) { + flow { + emit("item!") + emitAll(neverFlow()) // Avoid emitting complete + }.testIn(this) + } + }.cancel() + val exception = exceptionHandler.exceptions.removeFirst() + assertTrue(exception is CompletionHandlerException) + val cause = exception.cause + assertTrue(cause is AssertionError) + assertEquals( + """ + |Unconsumed events found: + | - Item(item!) + """.trimMargin(), + cause.message, + ) + } + @Test fun unconsumedItemThrows() = runTestTurbine { // We have to use an exception handler rather than assertFailsWith because runTest also uses // one which defers throwing until its block completes.