From 0ca735851f0a225b6a7582232a1c9847a27fd059 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy <52952525+dkhalanskyjb@users.noreply.github.com> Date: Mon, 25 Mar 2024 14:16:16 +0100 Subject: [PATCH] Fix `Flow.timeout` swallowing the channel closure exception (#4072) Fixes #4071 --- .../common/src/flow/operators/Delay.kt | 1 + .../common/test/flow/operators/TimeoutTest.kt | 11 +++++++++++ 2 files changed, 12 insertions(+) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index cad34a0d55..2a701c0c12 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -394,6 +394,7 @@ private fun Flow.timeoutInternal( value.onSuccess { downStream.emit(it) }.onClosed { + it?.let { throw it } return@onReceiveCatching false } return@onReceiveCatching true diff --git a/kotlinx-coroutines-core/common/test/flow/operators/TimeoutTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/TimeoutTest.kt index a2ca101ef0..0162a216c3 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/TimeoutTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/TimeoutTest.kt @@ -237,6 +237,17 @@ class TimeoutTest : TestBase() { testImmediateTimeout(-1.seconds) } + @Test + fun testClosing() = runTest { + assertFailsWith { + channelFlow { close(TestException()) } + .timeout(Duration.INFINITE) + .collect { + expectUnreached() + } + } + } + private fun testImmediateTimeout(timeout: Duration) { expect(1) val flow = emptyFlow().timeout(timeout)