diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index 52e47227ad..bcf1921594 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -137,6 +137,7 @@ internal abstract class AbstractSendChannel( } override fun offer(element: E): Boolean { + // Temporary migration for offer users who rely on onUndeliveredElement try { return super.offer(element) } catch (e: Throwable) { diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt index 7b6cc0ad9d..f006efc720 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channel.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt @@ -24,7 +24,7 @@ import kotlin.jvm.* public interface SendChannel { /** * Returns `true` if this channel was closed by an invocation of [close]. This means that - * calling [send] or [offer] will result in an exception. + * calling [send] will result in an exception. * * **Note: This is an experimental api.** This property may change its semantics and/or name in the future. */ @@ -51,7 +51,7 @@ public interface SendChannel { * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. * * This function can be used in [select] invocations with the [onSend] clause. - * Use [offer] to try sending to this channel without waiting. + * Use [trySend] to try sending to this channel without waiting. */ public suspend fun send(element: E) @@ -64,23 +64,6 @@ public interface SendChannel { */ public val onSend: SelectClause2> - /** - * Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions, - * and returns `true`. Otherwise, just returns `false`. This is a synchronous variant of [send] which backs off - * in situations when `send` suspends. - * - * Throws an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details). - * - * When `offer` call returns `false` it guarantees that the element was not delivered to the consumer and it - * it does not call `onUndeliveredElement` that was installed for this channel. If the channel was closed, - * then it calls `onUndeliveredElement` before throwing an exception. - * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements. - */ - public fun offer(element: E): Boolean { - val result = trySend(element) - if (result.isSuccess) return true - throw recoverStackTrace(result.exceptionOrNull() ?: return false) - } /** * Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions, @@ -103,7 +86,7 @@ public interface SendChannel { * on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements * are received. * - * A channel that was closed without a [cause] throws a [ClosedSendChannelException] on attempts to [send] or [offer] + * A channel that was closed without a [cause] throws a [ClosedSendChannelException] on attempts to [send] * and [ClosedReceiveChannelException] on attempts to [receive][ReceiveChannel.receive]. * A channel that was closed with non-null [cause] is called a _failed_ channel. Attempts to send or * receive on a failed channel throw the specified [cause] exception. @@ -122,10 +105,11 @@ public interface SendChannel { * * the cause of `close` or `cancel` otherwise. * * Example of usage (exception handling is omitted): + * * ``` * val events = Channel(UNLIMITED) * callbackBasedApi.registerCallback { event -> - * events.offer(event) + * events.trySend(event) * } * * val uiUpdater = launch(Dispatchers.Main, parent = UILifecycle) { @@ -134,7 +118,6 @@ public interface SendChannel { * } * * events.invokeOnClose { callbackBasedApi.stop() } - * * ``` * * **Note: This is an experimental api.** This function may change its semantics, parameters or return type in the future. @@ -146,6 +129,33 @@ public interface SendChannel { */ @ExperimentalCoroutinesApi public fun invokeOnClose(handler: (cause: Throwable?) -> Unit) + + /** + * **Deprecated** offer method. + * + * This method was deprecated in the favour of [trySend]. + * It has proven itself as the most error-prone method in Channel API: + * + * * `Boolean` return type creates the false sense of security, implying that `false` + * is returned instead of throwing an exception. + * * It was used mostly from non-suspending APIs where CancellationException triggered + * internal failures in the application (the most common source of bugs). + * * Due to signature and explicit `if (ch.offer(...))` checks it was easy to + * oversee such error during code review. + * * Its name was not aligned with the rest of the API and tried to mimic Java's queue instead. + * + * See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context. + */ + @Deprecated( + level = DeprecationLevel.WARNING, + message = "Deprecated in the favour of 'trySend' method", + replaceWith = ReplaceWith("trySend(element).isSuccess") + ) // Since 1.5.0 + public fun offer(element: E): Boolean { + val result = trySend(element) + if (result.isSuccess) return true + throw recoverStackTrace(result.exceptionOrNull() ?: return false) + } } /** @@ -188,7 +198,7 @@ public interface ReceiveChannel { * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. * * This function can be used in [select] invocations with the [onReceive] clause. - * Use [poll] to try receiving from this channel without waiting. + * Use [tryReceive] to try receiving from this channel without waiting. */ public suspend fun receive(): E @@ -200,51 +210,6 @@ public interface ReceiveChannel { */ public val onReceive: SelectClause1 - /** - * This function was deprecated since 1.3.0 and is no longer recommended to use - * or to implement in subclasses. - * - * It had the following pitfalls: - * - Didn't allow to distinguish 'null' as "closed channel" from "null as a value" - * - Was throwing if the channel has failed even though its signature may suggest it returns 'null' - * - It didn't really belong to core channel API and can be exposed as an extension instead. - * - * @suppress doc - */ - @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") - @LowPriorityInOverloadResolution - @Deprecated( - message = "Deprecated in favor of receiveCatching", - level = DeprecationLevel.ERROR, - replaceWith = ReplaceWith("receiveCatching().getOrNull()") - ) // Warning since 1.3.0, error in 1.5.0, will be hidden in 1.6.0 - public suspend fun receiveOrNull(): E? = receiveCatching().getOrNull() - - /** - * This function was deprecated since 1.3.0 and is no longer recommended to use - * or to implement in subclasses. - * See [receiveOrNull] documentation. - * - * @suppress **Deprecated**: in favor of onReceiveCatching extension. - */ - @Deprecated( - message = "Deprecated in favor of onReceiveCatching extension", - level = DeprecationLevel.ERROR, - replaceWith = ReplaceWith("onReceiveCatching") - ) // Warning since 1.3.0, error in 1.5.0, will be hidden or removed in 1.6.0 - public val onReceiveOrNull: SelectClause1 - get() { - return object : SelectClause1 { - @InternalCoroutinesApi - override fun registerSelectClause1(select: SelectInstance, block: suspend (E?) -> R) { - onReceiveCatching.registerSelectClause1(select) { - it.exceptionOrNull()?.let { throw it } - block(it.getOrNull()) - } - } - } - } - /** * Retrieves and removes an element from this channel if it's not empty, or suspends the caller while this channel is empty. * This method returns [ChannelResult] with the value of an element successfully retrieved from the channel @@ -262,7 +227,7 @@ public interface ReceiveChannel { * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. * * This function can be used in [select] invocations with the [onReceiveCatching] clause. - * Use [poll] to try receiving from this channel without waiting. + * Use [tryReceive] to try receiving from this channel without waiting. */ public suspend fun receiveCatching(): ChannelResult @@ -273,17 +238,6 @@ public interface ReceiveChannel { */ public val onReceiveCatching: SelectClause1> - /** - * Retrieves and removes an element from this channel if it's not empty or returns `null` if the channel is empty - * or is [is closed for `receive`][isClosedForReceive] without a cause. - * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_. - */ - public fun poll(): E? { - val result = tryReceive() - if (result.isSuccess) return result.getOrThrow() - throw recoverStackTrace(result.exceptionOrNull() ?: return null) - } - /** * Retrieves and removes an element from this channel if it's not empty, returning a [successful][ChannelResult.success] * result, returns [failed][ChannelResult.failed] result if the channel is empty, and [closed][ChannelResult.closed] @@ -325,6 +279,75 @@ public interface ReceiveChannel { */ @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x") public fun cancel(cause: Throwable? = null): Boolean + + /** + * **Deprecated** poll method. + * + * This method was deprecated in the favour of [tryReceive]. + * It has proven itself as error-prone method in Channel API: + * + * * Nullable return type creates the false sense of security, implying that `null` + * is returned instead of throwing an exception. + * * It was used mostly from non-suspending APIs where CancellationException triggered + * internal failures in the application (the most common source of bugs). + * * Its name was not aligned with the rest of the API and tried to mimic Java's queue instead. + * + * See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context. + */ + @Deprecated(level = DeprecationLevel.WARNING, + message = "Deprecated in the favour of 'tryReceive'", + replaceWith = ReplaceWith("tryReceive().getOrNull()") + ) // Since 1.5.0 + public fun poll(): E? { + val result = tryReceive() + if (result.isSuccess) return result.getOrThrow() + throw recoverStackTrace(result.exceptionOrNull() ?: return null) + } + + /** + * This function was deprecated since 1.3.0 and is no longer recommended to use + * or to implement in subclasses. + * + * It had the following pitfalls: + * - Didn't allow to distinguish 'null' as "closed channel" from "null as a value" + * - Was throwing if the channel has failed even though its signature may suggest it returns 'null' + * - It didn't really belong to core channel API and can be exposed as an extension instead. + * + * @suppress doc + */ + @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") + @LowPriorityInOverloadResolution + @Deprecated( + message = "Deprecated in favor of receiveCatching", + level = DeprecationLevel.ERROR, + replaceWith = ReplaceWith("receiveCatching().getOrNull()") + ) // Warning since 1.3.0, error in 1.5.0, will be hidden in 1.6.0 + public suspend fun receiveOrNull(): E? = receiveCatching().getOrNull() + + /** + * This function was deprecated since 1.3.0 and is no longer recommended to use + * or to implement in subclasses. + * See [receiveOrNull] documentation. + * + * @suppress **Deprecated**: in favor of onReceiveCatching extension. + */ + @Deprecated( + message = "Deprecated in favor of onReceiveCatching extension", + level = DeprecationLevel.ERROR, + replaceWith = ReplaceWith("onReceiveCatching") + ) // Warning since 1.3.0, error in 1.5.0, will be hidden or removed in 1.6.0 + public val onReceiveOrNull: SelectClause1 + get() { + return object : SelectClause1 { + @InternalCoroutinesApi + override fun registerSelectClause1(select: SelectInstance, block: suspend (E?) -> R) { + onReceiveCatching.registerSelectClause1(select) { + it.exceptionOrNull()?.let { throw it } + block(it.getOrNull()) + } + } + } + } } /** @@ -544,14 +567,14 @@ public interface ChannelIterator { * * * When `capacity` is [Channel.UNLIMITED] — it creates a channel with effectively unlimited buffer. * This channel has a linked-list buffer of unlimited capacity (limited only by available memory). - * [Sending][send] to this channel never suspends, and [offer] always returns `true`. + * [Sending][send] to this channel never suspends, and [trySend] always succeeds. * * * When `capacity` is [Channel.CONFLATED] — it creates a _conflated_ channel - * This channel buffers at most one element and conflates all subsequent `send` and `offer` invocations, + * This channel buffers at most one element and conflates all subsequent `send` and `trySend` invocations, * so that the receiver always gets the last element sent. * Back-to-back sent elements are conflated — only the last sent element is received, * while previously sent elements **are lost**. - * [Sending][send] to this channel never suspends, and [offer] always returns `true`. + * [Sending][send] to this channel never suspends, and [trySend] always succeeds. * * * When `capacity` is positive but less than [UNLIMITED] — it creates an array-based channel with the specified capacity. * This channel has an array buffer of a fixed `capacity`. @@ -598,8 +621,6 @@ public interface ChannelIterator { * * * When [send][SendChannel.send] operation throws an exception because it was cancelled before it had a chance to actually * send the element or because the channel was [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel]. - * * When [offer][SendChannel.offer] operation throws an exception when - * the channel was [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel]. * * When [receive][ReceiveChannel.receive], [receiveOrNull][ReceiveChannel.receiveOrNull], or [hasNext][ChannelIterator.hasNext] * operation throws an exception when it had retrieved the element from the * channel but was cancelled before the code following the receive call resumed. diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt index c84afb2fe0..8283bcb166 100644 --- a/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt @@ -17,7 +17,7 @@ import kotlin.jvm.* * Back-to-send sent elements are _conflated_ -- only the the most recently sent value is received, * while previously sent elements **are lost**. * Every subscriber immediately receives the most recently sent element. - * Sender to this broadcast channel never suspends and [offer] always returns `true`. + * Sender to this broadcast channel never suspends and [trySend] always succeeds. * * A secondary constructor can be used to create an instance of this class that already holds a value. * This channel is also created by `BroadcastChannel(Channel.CONFLATED)` factory function invocation. diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt index 0e686447c1..e1e2b140f5 100644 --- a/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt @@ -9,11 +9,11 @@ import kotlinx.coroutines.internal.* import kotlinx.coroutines.selects.* /** - * Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations, + * Channel that buffers at most one element and conflates all subsequent `send` and `trySend` invocations, * so that the receiver always gets the most recently sent element. * Back-to-send sent elements are _conflated_ -- only the most recently sent element is received, * while previously sent elements **are lost**. - * Sender to this channel never suspends and [offer] always returns `true`. + * Sender to this channel never suspends and [trySend] always succeeds. * * This channel is created by `Channel(Channel.CONFLATED)` factory function invocation. */ diff --git a/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt b/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt index 831752739c..b5f607b230 100644 --- a/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt @@ -9,7 +9,7 @@ import kotlinx.coroutines.selects.* /** * Channel with linked-list buffer of a unlimited capacity (limited only by available memory). - * Sender to this channel never suspends and [offer] always returns `true`. + * Sender to this channel never suspends and [trySend] always succeeds. * * This channel is created by `Channel(Channel.UNLIMITED)` factory function invocation. * diff --git a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt index ac9e66b0d5..490b221b58 100644 --- a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt @@ -92,7 +92,7 @@ import kotlin.native.concurrent.* * * To migrate [BroadcastChannel] usage to [SharedFlow], start by replacing usages of the `BroadcastChannel(capacity)` * constructor with `MutableSharedFlow(0, extraBufferCapacity=capacity)` (broadcast channel does not replay - * values to new subscribers). Replace [send][BroadcastChannel.send] and [offer][BroadcastChannel.offer] calls + * values to new subscribers). Replace [send][BroadcastChannel.send] and [trySend][BroadcastChannel.trySend] calls * with [emit][MutableStateFlow.emit] and [tryEmit][MutableStateFlow.tryEmit], and convert subscribers' code to flow operators. * * ### Concurrency diff --git a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt index fc8aa02f20..74d3314075 100644 --- a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt @@ -107,7 +107,7 @@ import kotlin.native.concurrent.* * * To migrate [ConflatedBroadcastChannel] usage to [StateFlow], start by replacing usages of the `ConflatedBroadcastChannel()` * constructor with `MutableStateFlow(initialValue)`, using `null` as an initial value if you don't have one. - * Replace [send][ConflatedBroadcastChannel.send] and [offer][ConflatedBroadcastChannel.offer] calls + * Replace [send][ConflatedBroadcastChannel.send] and [trySend][ConflatedBroadcastChannel.trySend] calls * with updates to the state flow's [MutableStateFlow.value], and convert subscribers' code to flow operators. * You can use the [filterNotNull] operator to mimic behavior of a `ConflatedBroadcastChannel` without initial value. * diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt index 6e5f3f11aa..c924c09025 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt @@ -65,7 +65,7 @@ internal suspend fun FlowCollector.combineInternal( // Received the second value from the same flow in the same epoch -- bail out if (lastReceivedEpoch[index] == currentEpoch) break lastReceivedEpoch[index] = currentEpoch - element = resultChannel.poll() ?: break + element = resultChannel.tryReceive().getOrNull() ?: break } // Process batch result if there is enough data diff --git a/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt index 3900c2db90..632fd2928b 100644 --- a/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt @@ -86,31 +86,31 @@ class ArrayChannelTest : TestBase() { } @Test - fun testOfferAndPoll() = runTest { + fun testTryOp() = runTest { val q = Channel(1) - assertTrue(q.offer(1)) + assertTrue(q.trySend(1).isSuccess) expect(1) launch { expect(3) - assertEquals(1, q.poll()) + assertEquals(1, q.tryReceive().getOrNull()) expect(4) - assertNull(q.poll()) + assertNull(q.tryReceive().getOrNull()) expect(5) assertEquals(2, q.receive()) // suspends expect(9) - assertEquals(3, q.poll()) + assertEquals(3, q.tryReceive().getOrNull()) expect(10) - assertNull(q.poll()) + assertNull(q.tryReceive().getOrNull()) expect(11) } expect(2) yield() expect(6) - assertTrue(q.offer(2)) + assertTrue(q.trySend(2).isSuccess) expect(7) - assertTrue(q.offer(3)) + assertTrue(q.trySend(3).isSuccess) expect(8) - assertFalse(q.offer(4)) + assertFalse(q.trySend(4).isSuccess) yield() finish(12) } @@ -157,7 +157,7 @@ class ArrayChannelTest : TestBase() { val capacity = 42 val channel = Channel(capacity) repeat(4) { - channel.offer(-1) + channel.trySend(-1) } repeat(4) { channel.receiveCatching().getOrNull() diff --git a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt index a64284aec1..8962acc3dc 100644 --- a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt @@ -15,8 +15,8 @@ class BasicOperationsTest : TestBase() { } @Test - fun testOfferAfterClose() = runTest { - TestChannelKind.values().forEach { kind -> testOffer(kind) } + fun testTrySendAfterClose() = runTest { + TestChannelKind.values().forEach { kind -> testTrySend(kind) } } @Test @@ -39,7 +39,7 @@ class BasicOperationsTest : TestBase() { } } expect(1) - channel.offer(42) + channel.trySend(42) expect(2) channel.close(AssertionError()) finish(4) @@ -80,28 +80,6 @@ class BasicOperationsTest : TestBase() { } } - private suspend fun testReceiveCatchingException(kind: TestChannelKind) = coroutineScope { - val channel = kind.create() - val d = async(NonCancellable) { - channel.receive() - } - - yield() - channel.close(TestException()) - assertTrue(channel.isClosedForReceive) - - assertFailsWith { channel.poll() } - try { - channel.receiveCatching().getOrThrow() - expectUnreached() - } catch (e: TestException) { - // Expected - } - - d.join() - assertTrue(d.getCancellationException().cause is TestException) - } - @Suppress("ReplaceAssertBooleanWithAssertEquality") private suspend fun testReceiveCatching(kind: TestChannelKind) = coroutineScope { reset() @@ -131,22 +109,21 @@ class BasicOperationsTest : TestBase() { finish(6) } - private suspend fun testOffer(kind: TestChannelKind) = coroutineScope { + private suspend fun testTrySend(kind: TestChannelKind) = coroutineScope { val channel = kind.create() val d = async { channel.send(42) } yield() channel.close() assertTrue(channel.isClosedForSend) - try { - channel.offer(2) - fail() - } catch (e: ClosedSendChannelException) { - if (!kind.isConflated) { - assertEquals(42, channel.receive()) + channel.trySend(2) + .onSuccess { expectUnreached() } + .onFailure { + assertTrue { it is ClosedSendChannelException} + if (!kind.isConflated) { + assertEquals(42, channel.receive()) + } } - } - d.await() } diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelBufferOverflowTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelBufferOverflowTest.kt index 41f60479f2..0b9a0fdbe4 100644 --- a/kotlinx-coroutines-core/common/test/channels/ChannelBufferOverflowTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ChannelBufferOverflowTest.kt @@ -11,30 +11,30 @@ class ChannelBufferOverflowTest : TestBase() { @Test fun testDropLatest() = runTest { val c = Channel(2, BufferOverflow.DROP_LATEST) - assertTrue(c.offer(1)) - assertTrue(c.offer(2)) - assertTrue(c.offer(3)) // overflows, dropped + assertTrue(c.trySend(1).isSuccess) + assertTrue(c.trySend(2).isSuccess) + assertTrue(c.trySend(3).isSuccess) // overflows, dropped c.send(4) // overflows dropped assertEquals(1, c.receive()) - assertTrue(c.offer(5)) - assertTrue(c.offer(6)) // overflows, dropped + assertTrue(c.trySend(5).isSuccess) + assertTrue(c.trySend(6).isSuccess) // overflows, dropped assertEquals(2, c.receive()) assertEquals(5, c.receive()) - assertEquals(null, c.poll()) + assertEquals(null, c.tryReceive().getOrNull()) } @Test fun testDropOldest() = runTest { val c = Channel(2, BufferOverflow.DROP_OLDEST) - assertTrue(c.offer(1)) - assertTrue(c.offer(2)) - assertTrue(c.offer(3)) // overflows, keeps 2, 3 + assertTrue(c.trySend(1).isSuccess) + assertTrue(c.trySend(2).isSuccess) + assertTrue(c.trySend(3).isSuccess) // overflows, keeps 2, 3 c.send(4) // overflows, keeps 3, 4 assertEquals(3, c.receive()) - assertTrue(c.offer(5)) - assertTrue(c.offer(6)) // overflows, keeps 5, 6 + assertTrue(c.trySend(5).isSuccess) + assertTrue(c.trySend(6).isSuccess) // overflows, keeps 5, 6 assertEquals(5, c.receive()) assertEquals(6, c.receive()) - assertEquals(null, c.poll()) + assertEquals(null, c.tryReceive().getOrNull()) } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/common/test/channels/ConflatedBroadcastChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ConflatedBroadcastChannelTest.kt index 856a66fbd2..a8c2a29c1b 100644 --- a/kotlinx-coroutines-core/common/test/channels/ConflatedBroadcastChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ConflatedBroadcastChannelTest.kt @@ -42,7 +42,7 @@ class ConflatedBroadcastChannelTest : TestBase() { launch(start = CoroutineStart.UNDISPATCHED) { expect(2) val sub = broadcast.openSubscription() - assertNull(sub.poll()) + assertNull(sub.tryReceive().getOrNull()) expect(3) assertEquals("one", sub.receive()) // suspends expect(6) diff --git a/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt index 87194f72f9..370fd5b9a2 100644 --- a/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt @@ -12,14 +12,14 @@ open class ConflatedChannelTest : TestBase() { Channel(Channel.CONFLATED) @Test - fun testBasicConflationOfferPoll() { + fun testBasicConflationOfferTryReceive() { val q = createConflatedChannel() - assertNull(q.poll()) - assertTrue(q.offer(1)) - assertTrue(q.offer(2)) - assertTrue(q.offer(3)) - assertEquals(3, q.poll()) - assertNull(q.poll()) + assertNull(q.tryReceive().getOrNull()) + assertTrue(q.trySend(1).isSuccess) + assertTrue(q.trySend(2).isSuccess) + assertTrue(q.trySend(3).isSuccess) + assertEquals(3, q.tryReceive().getOrNull()) + assertNull(q.tryReceive().getOrNull()) } @Test diff --git a/kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt index cdec8a776e..501affb4d9 100644 --- a/kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt @@ -12,12 +12,12 @@ class LinkedListChannelTest : TestBase() { fun testBasic() = runTest { val c = Channel(Channel.UNLIMITED) c.send(1) - check(c.offer(2)) + assertTrue(c.trySend(2).isSuccess) c.send(3) check(c.close()) check(!c.close()) assertEquals(1, c.receive()) - assertEquals(2, c.poll()) + assertEquals(2, c.tryReceive().getOrNull()) assertEquals(3, c.receiveCatching().getOrNull()) assertNull(c.receiveCatching().getOrNull()) } diff --git a/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt index ab0292a831..c83813e40e 100644 --- a/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt @@ -80,26 +80,26 @@ class RendezvousChannelTest : TestBase() { } @Test - fun testOfferAndPool() = runTest { + fun testTrySendTryReceive() = runTest { val q = Channel(Channel.RENDEZVOUS) - assertFalse(q.offer(1)) + assertFalse(q.trySend(1).isSuccess) expect(1) launch { expect(3) - assertNull(q.poll()) + assertNull(q.tryReceive().getOrNull()) expect(4) assertEquals(2, q.receive()) expect(7) - assertNull(q.poll()) + assertNull(q.tryReceive().getOrNull()) yield() expect(9) - assertEquals(3, q.poll()) + assertEquals(3, q.tryReceive().getOrNull()) expect(10) } expect(2) yield() expect(5) - assertTrue(q.offer(2)) + assertTrue(q.trySend(2).isSuccess) expect(6) yield() expect(8) diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt index f93d039933..4763d13b05 100644 --- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt @@ -166,15 +166,15 @@ class ChannelBuildersFlowTest : TestBase() { var expected = 0 launch { - assertTrue(channel.offer(1)) // Handed to the coroutine - assertTrue(channel.offer(2)) // Buffered - assertFalse(channel.offer(3)) // Failed to offer + assertTrue(channel.trySend(1).isSuccess) // Handed to the coroutine + assertTrue(channel.trySend(2).isSuccess) // Buffered + assertFalse(channel.trySend(3).isSuccess) // Failed to offer channel.send(3) yield() assertEquals(1, expected) - assertTrue(channel.offer(4)) // Handed to the coroutine - assertTrue(channel.offer(5)) // Buffered - assertFalse(channel.offer(6)) // Failed to offer + assertTrue(channel.trySend(4).isSuccess) // Handed to the coroutine + assertTrue(channel.trySend(5).isSuccess) // Buffered + assertFalse(channel.trySend(6).isSuccess) // Failed to offer channel.send(6) assertEquals(2, expected) } diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt index 31a929b2d8..f197a214f5 100644 --- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt @@ -12,9 +12,9 @@ class ChannelFlowTest : TestBase() { @Test fun testRegular() = runTest { val flow = channelFlow { - assertTrue(offer(1)) - assertTrue(offer(2)) - assertTrue(offer(3)) + assertTrue(trySend(1).isSuccess) + assertTrue(trySend(2).isSuccess) + assertTrue(trySend(3).isSuccess) } assertEquals(listOf(1, 2, 3), flow.toList()) } @@ -22,9 +22,9 @@ class ChannelFlowTest : TestBase() { @Test fun testBuffer() = runTest { val flow = channelFlow { - assertTrue(offer(1)) - assertTrue(offer(2)) - assertFalse(offer(3)) + assertTrue(trySend(1).isSuccess) + assertTrue(trySend(2).isSuccess) + assertFalse(trySend(3).isSuccess) }.buffer(1) assertEquals(listOf(1, 2), flow.toList()) } @@ -32,10 +32,10 @@ class ChannelFlowTest : TestBase() { @Test fun testConflated() = runTest { val flow = channelFlow { - assertTrue(offer(1)) - assertTrue(offer(2)) - assertTrue(offer(3)) - assertTrue(offer(4)) + assertTrue(trySend(1).isSuccess) + assertTrue(trySend(2).isSuccess) + assertTrue(trySend(3).isSuccess) + assertTrue(trySend(4).isSuccess) }.buffer(Channel.CONFLATED) assertEquals(listOf(1, 4), flow.toList()) // two elements in the middle got conflated } @@ -43,7 +43,7 @@ class ChannelFlowTest : TestBase() { @Test fun testFailureCancelsChannel() = runTest { val flow = channelFlow { - offer(1) + trySend(1) invokeOnClose { expect(2) } diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/ShareInFusionTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/ShareInFusionTest.kt index 371d01472e..85a17ba0fe 100644 --- a/kotlinx-coroutines-core/common/test/flow/sharing/ShareInFusionTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/sharing/ShareInFusionTest.kt @@ -42,7 +42,7 @@ class ShareInFusionTest : TestBase() { val flow = channelFlow { // send a batch of 10 elements using [offer] for (i in 1..10) { - assertTrue(offer(i)) // offer must succeed, because buffer + assertTrue(trySend(i).isSuccess) // offer must succeed, because buffer } send(0) // done }.buffer(10) // request a buffer of 10 @@ -53,4 +53,4 @@ class ShareInFusionTest : TestBase() { .collect { i -> expect(i + 1) } finish(12) } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt index 42cdb1e19f..db69e2bc06 100644 --- a/kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt @@ -167,11 +167,11 @@ class ShareInTest : TestBase() { subs += shared .onEach { value -> // only the first threshold subscribers get the value when (i) { - in 1..threshold -> log.offer("sub$i: $value") + in 1..threshold -> log.trySend("sub$i: $value") else -> expectUnreached() } } - .onCompletion { log.offer("sub$i: completion") } + .onCompletion { log.trySend("sub$i: completion") } .launchIn(this) checkStartTransition(i) } @@ -210,4 +210,4 @@ class ShareInTest : TestBase() { stop() } } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/src/channels/Channels.kt b/kotlinx-coroutines-core/jvm/src/channels/Channels.kt index ea0b639e90..0df8278b77 100644 --- a/kotlinx-coroutines-core/jvm/src/channels/Channels.kt +++ b/kotlinx-coroutines-core/jvm/src/channels/Channels.kt @@ -50,7 +50,7 @@ import kotlinx.coroutines.* ) public fun SendChannel.sendBlocking(element: E) { // fast path - if (offer(element)) + if (trySend(element).isSuccess) return // slow path runBlocking { diff --git a/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt b/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt index 099e70b3b9..6c23982ee7 100644 --- a/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt +++ b/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt @@ -21,13 +21,13 @@ public enum class TickerMode { * ``` * val channel = ticker(delay = 100) * delay(350) // 250 ms late - * println(channel.poll()) // prints Unit - * println(channel.poll()) // prints null + * println(channel.tryReceive().getOrNull()) // prints Unit + * println(channel.tryReceive().getOrNull()) // prints null * * delay(50) - * println(channel.poll()) // prints Unit, delay was adjusted + * println(channel.tryReceive().getOrNull()) // prints Unit, delay was adjusted * delay(50) - * println(channel.poll()) // prints null, we'are not late relatively to previous element + * println(channel.tryReceive().getOrNull()) // prints null, we're not late relatively to previous element * ``` */ FIXED_PERIOD, diff --git a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testCancelledOffer.txt b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testCancelledOffer.txt deleted file mode 100644 index 010dab389d..0000000000 --- a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testCancelledOffer.txt +++ /dev/null @@ -1,10 +0,0 @@ -kotlinx.coroutines.JobCancellationException: Job was cancelled; job=JobImpl{Cancelling}@2a06d350 - at _COROUTINE._BOUNDARY._(CoroutineDebugging.kt) - at kotlinx.coroutines.channels.AbstractSendChannel.offer(AbstractChannel.kt:170) - at kotlinx.coroutines.channels.ChannelCoroutine.offer(ChannelCoroutine.kt) - at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest$testCancelledOffer$1.invokeSuspend(StackTraceRecoveryChannelsTest.kt:153) -Caused by: kotlinx.coroutines.JobCancellationException: Job was cancelled; job=JobImpl{Cancelling}@2a06d350 - at kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:599) - at kotlinx.coroutines.Job$DefaultImpls.cancel$default(Job.kt:164) - at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest$testCancelledOffer$1.invokeSuspend(StackTraceRecoveryChannelsTest.kt:151) - at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt index 56f1e28313..06839f4a04 100644 --- a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt +++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt @@ -39,7 +39,7 @@ class ReusableCancellableContinuationTest : TestBase() { repeat(iterations) { suspender { - assertTrue(channel.offer(it)) + assertTrue(channel.trySend(it).isSuccess) } } channel.close() diff --git a/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt b/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt index e755b17d91..49c93c7fe2 100644 --- a/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt +++ b/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt @@ -41,7 +41,7 @@ class RunInterruptibleTest : TestBase() { val job = launch { runInterruptible(Dispatchers.IO) { expect(2) - latch.offer(Unit) + latch.trySend(Unit) try { Thread.sleep(10_000L) expectUnreached() diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt index a6a5340389..86adfee049 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt @@ -15,7 +15,7 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() { // total counters private var sendCnt = 0 - private var offerFailedCnt = 0 + private var trySendFailedCnt = 0 private var receivedCnt = 0 private var undeliveredCnt = 0 @@ -23,7 +23,7 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() { private var lastReceived = 0 private var dSendCnt = 0 private var dSendExceptionCnt = 0 - private var dOfferFailedCnt = 0 + private var dTrySendFailedCnt = 0 private var dReceivedCnt = 0 private val dUndeliveredCnt = AtomicInteger() @@ -43,30 +43,30 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() { joinAll(j1, j2) // All elements must be either received or undelivered (IN every run) - if (dSendCnt - dOfferFailedCnt != dReceivedCnt + dUndeliveredCnt.get()) { + if (dSendCnt - dTrySendFailedCnt != dReceivedCnt + dUndeliveredCnt.get()) { println(" Send: $dSendCnt") - println("Send Exception: $dSendExceptionCnt") - println(" Offer failed: $dOfferFailedCnt") + println("Send exception: $dSendExceptionCnt") + println("trySend failed: $dTrySendFailedCnt") println(" Received: $dReceivedCnt") println(" Undelivered: ${dUndeliveredCnt.get()}") error("Failed") } - offerFailedCnt += dOfferFailedCnt + trySendFailedCnt += dTrySendFailedCnt receivedCnt += dReceivedCnt undeliveredCnt += dUndeliveredCnt.get() // clear for next run dSendCnt = 0 dSendExceptionCnt = 0 - dOfferFailedCnt = 0 + dTrySendFailedCnt = 0 dReceivedCnt = 0 dUndeliveredCnt.set(0) } // Stats - println(" Send: $sendCnt") - println(" Offer failed: $offerFailedCnt") - println(" Received: $receivedCnt") - println(" Undelivered: $undeliveredCnt") - assertEquals(sendCnt - offerFailedCnt, receivedCnt + undeliveredCnt) + println(" Send: $sendCnt") + println("trySend failed: $trySendFailedCnt") + println(" Received: $receivedCnt") + println(" Undelivered: $undeliveredCnt") + assertEquals(sendCnt - trySendFailedCnt, receivedCnt + undeliveredCnt) } private suspend fun sendOne(channel: Channel) { @@ -75,11 +75,11 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() { try { when (Random.nextInt(2)) { 0 -> channel.send(i) - 1 -> if (!channel.offer(i)) { - dOfferFailedCnt++ + 1 -> if (!channel.trySend(i).isSuccess) { + dTrySendFailedCnt++ } } - } catch(e: Throwable) { + } catch (e: Throwable) { assertTrue(e is CancellationException) // the only exception possible in this test dSendExceptionCnt++ throw e diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt index 8f5224db79..1233432615 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt @@ -68,7 +68,7 @@ class ChannelUndeliveredElementStressTest(private val kind: TestChannelKind) : T try { block() } finally { - if (!done.offer(true)) + if (!done.trySend(true).isSuccess) error(IllegalStateException("failed to offer to done channel")) } } diff --git a/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt index eb7be57500..2b3c05bcc6 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt @@ -29,7 +29,7 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() { launch(Dispatchers.Default + CoroutineName("Sender$senderId")) { repeat(nEvents) { i -> if (i % nSenders == senderId) { - broadcast.offer(i) + broadcast.trySend(i) sentTotal.incrementAndGet() yield() } @@ -63,7 +63,7 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() { try { withTimeout(timeLimit) { senders.forEach { it.join() } - broadcast.offer(nEvents) // last event to signal receivers termination + broadcast.trySend(nEvents) // last event to signal receivers termination receivers.forEach { it.join() } } } catch (e: CancellationException) { @@ -86,4 +86,4 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() { cancel() value } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt index fd26144faf..793d7e44f5 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt @@ -5,11 +5,8 @@ package kotlinx.coroutines.channels import kotlinx.coroutines.* -import org.junit.After -import org.junit.Test -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.atomic.AtomicReference -import kotlin.coroutines.* +import org.junit.* +import java.util.concurrent.atomic.* class ConflatedChannelCloseStressTest : TestBase() { @@ -37,12 +34,9 @@ class ConflatedChannelCloseStressTest : TestBase() { var x = senderId try { while (isActive) { - try { - curChannel.get().offer(x) + curChannel.get().trySend(x).onSuccess { x += nSenders sent.incrementAndGet() - } catch (e: ClosedSendChannelException) { - // ignore } } } finally { diff --git a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt index 6ce2f20d5a..fbc28a1855 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt @@ -48,7 +48,7 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() delayChannel.cancel() delay(5100) - assertFailsWith { delayChannel.poll() } + assertFailsWith { delayChannel.tryReceive().getOrThrow() } } } @@ -159,9 +159,9 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() } } -fun ReceiveChannel.checkEmpty() = assertNull(poll()) +fun ReceiveChannel.checkEmpty() = assertNull(tryReceive().getOrNull()) fun ReceiveChannel.checkNotEmpty() { - assertNotNull(poll()) - assertNull(poll()) + assertNotNull(tryReceive().getOrNull()) + assertNull(tryReceive().getOrNull()) } diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt index ae2554ad4a..2d8c0ebc75 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt @@ -137,25 +137,6 @@ class StackTraceRecoveryChannelsTest : TestBase() { deferred.await() } - // See https://github.com/Kotlin/kotlinx.coroutines/issues/950 - @Test - fun testCancelledOffer() = runTest { - expect(1) - val job = Job() - val actor = actor(job, Channel.UNLIMITED) { - consumeEach { - expectUnreached() // is cancelled before offer - } - } - job.cancel() - try { - actor.offer(1) - } catch (e: Exception) { - verifyStackTrace("channels/${name.methodName}", e) - finish(2) - } - } - private suspend fun Channel.sendWithContext(ctx: CoroutineContext) = withContext(ctx) { sendInChannel() yield() // TCE diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt index 1a381547ba..9afcab537d 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt @@ -261,7 +261,7 @@ class StackTraceRecoveryTest : TestBase() { private suspend fun awaitCallback(channel: Channel) { suspendCancellableCoroutine { cont -> - channel.offer(Callback(cont)) + channel.trySend(Callback(cont)) } yield() // nop to make sure it is not a tail call } diff --git a/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt index e3db2626ce..f1be284cae 100644 --- a/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt +++ b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt @@ -36,7 +36,7 @@ class CallbackFlowTest : TestBase() { fun testThrowingConsumer() = runTest { var i = 0 val api = CallbackApi { - runCatching { it.offer(++i) } + it.trySend(++i) } val flow = callbackFlow { @@ -77,13 +77,13 @@ class CallbackFlowTest : TestBase() { var i = 0 val api = CallbackApi { if (i < 5) { - it.offer(++i) + it.trySend(++i) } else { it.close(RuntimeException()) } } - val flow = callbackFlow() { + val flow = callbackFlow { api.start(channel) awaitClose { api.stop() diff --git a/kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt index fbd5c0d8f3..74cc17836b 100644 --- a/kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt +++ b/kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt @@ -63,11 +63,12 @@ abstract class ChannelLincheckTestBase( } @Operation - fun offer(@Param(name = "value") value: Int): Any = try { - c.offer(value) - } catch (e: NumberedCancellationException) { - e.testResult - } + fun trySend(@Param(name = "value") value: Int): Any = c.trySend(value) + .onSuccess { return true } + .onFailure { + return if (it is NumberedCancellationException) it.testResult + else false + } // TODO: this operation should be (and can be!) linearizable, but is not // @Operation @@ -85,11 +86,10 @@ abstract class ChannelLincheckTestBase( } @Operation - fun poll(): Any? = try { - c.poll() - } catch (e: NumberedCancellationException) { - e.testResult - } + fun tryReceive(): Any? = + c.tryReceive() + .onSuccess { return it } + .onFailure { return if (it is NumberedCancellationException) it.testResult else null } // TODO: this operation should be (and can be!) linearizable, but is not // @Operation @@ -131,7 +131,7 @@ abstract class SequentialIntChannelBase(private val capacity: Int) : VerifierSta private val buffer = ArrayList() private var closedMessage: String? = null - suspend fun send(x: Int): Any = when (val offerRes = offer(x)) { + suspend fun send(x: Int): Any = when (val offerRes = trySend(x)) { true -> Unit false -> suspendCancellableCoroutine { cont -> senders.add(cont to x) @@ -139,7 +139,7 @@ abstract class SequentialIntChannelBase(private val capacity: Int) : VerifierSta else -> offerRes } - fun offer(element: Int): Any { + fun trySend(element: Int): Any { if (closedMessage !== null) return closedMessage!! if (capacity == CONFLATED) { if (resumeFirstReceiver(element)) return true @@ -163,11 +163,11 @@ abstract class SequentialIntChannelBase(private val capacity: Int) : VerifierSta return false } - suspend fun receive(): Any = poll() ?: suspendCancellableCoroutine { cont -> + suspend fun receive(): Any = tryReceive() ?: suspendCancellableCoroutine { cont -> receivers.add(cont) } - fun poll(): Any? { + fun tryReceive(): Any? { if (buffer.isNotEmpty()) { val el = buffer.removeAt(0) resumeFirstSender().also { @@ -221,4 +221,4 @@ private fun CancellableContinuation.resume(res: T): Boolean { val token = tryResume(res) ?: return false completeResume(token) return true -} \ No newline at end of file +} diff --git a/reactive/kotlinx-coroutines-jdk9/test/PublisherAsFlowTest.kt b/reactive/kotlinx-coroutines-jdk9/test/PublisherAsFlowTest.kt index 97f106b3c5..b5b2a0a2b8 100644 --- a/reactive/kotlinx-coroutines-jdk9/test/PublisherAsFlowTest.kt +++ b/reactive/kotlinx-coroutines-jdk9/test/PublisherAsFlowTest.kt @@ -70,7 +70,7 @@ class PublisherAsFlowTest : TestBase() { send(it + 1) expect(it + 1) } - assertFalse { offer(-1) } + assertFalse { trySend(-1).isSuccess } } publisher.asFlow().collect { diff --git a/reactive/kotlinx-coroutines-reactive/src/Channel.kt b/reactive/kotlinx-coroutines-reactive/src/Channel.kt index 854a8829fc..d9c9b91a2c 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Channel.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Channel.kt @@ -109,7 +109,7 @@ private class SubscriptionChannel( override fun onNext(t: T) { _requested.decrementAndGet() - offer(t) + trySend(t) // Safe to ignore return value here, expectedly racing with cancellation } override fun onComplete() { diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index d2cdbab881..383a17d836 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -89,7 +89,7 @@ public class PublisherCoroutine( public override suspend fun send(element: T) { // fast-path -- try send without suspension - if (offer(element)) return + if (trySend(element).isSuccess) return // slow-path does suspend return sendSuspend(element) } diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt index cb8de7a636..f0245388bf 100644 --- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt @@ -132,7 +132,7 @@ private class ReactiveSubscriber( override fun onNext(value: T) { // Controlled by requestSize - require(channel.offer(value)) { "Element $value was not added to channel because it was full, $channel" } + require(channel.trySend(value).isSuccess) { "Element $value was not added to channel because it was full, $channel" } } override fun onComplete() { diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt index e2c86c97ff..7a0e0fac38 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt @@ -71,7 +71,7 @@ class PublisherAsFlowTest : TestBase() { send(it + 1) expect(it + 1) } - assertFalse { offer(-1) } + assertFalse { trySend(-1).isSuccess } } publisher.asFlow().collect { diff --git a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt index 35f3391eb1..3bc50c8d57 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt @@ -84,11 +84,11 @@ private class SubscriptionChannel : } override fun onSuccess(t: T) { - offer(t) + trySend(t) // Safe to ignore return value here, expectedly racing with cancellation } override fun onNext(t: T) { - offer(t) + trySend(t) // Safe to ignore return value here, expectedly racing with cancellation } override fun onComplete() { diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index 56ce30c370..09c5dc1d9f 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -84,7 +84,7 @@ private class RxObservableCoroutine( public override suspend fun send(element: T) { // fast-path -- try send without suspension - if (offer(element)) return + if (trySend(element).isSuccess) return // slow-path does suspend return sendSuspend(element) } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt index 159f3729c8..0253fcedd8 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt @@ -26,7 +26,7 @@ class ObservableSourceAsFlowStressTest : TestBase() { val latch = Channel(1) var i = 0 val observable = Observable.interval(100L, TimeUnit.MICROSECONDS) - .doOnNext { if (++i > 100) latch.offer(Unit) } + .doOnNext { if (++i > 100) latch.trySend(Unit) } val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default)) latch.receive() job.cancelAndJoin() diff --git a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt index 76333f2eb8..ad780f7504 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt @@ -69,11 +69,11 @@ private class SubscriptionChannel : } override fun onSuccess(t: T) { - offer(t) + trySend(t) // Safe to ignore return value here, expectedly racing with cancellation } override fun onNext(t: T) { - offer(t) + trySend(t) // Safe to ignore return value here, expectedly racing with cancellation } override fun onComplete() { diff --git a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt index a17e32dd31..55794f9adf 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt @@ -78,7 +78,7 @@ private class RxObservableCoroutine( public override suspend fun send(element: T) { // fast-path -- try send without suspension - if (offer(element)) return + if (trySend(element).isSuccess) return // slow-path does suspend return sendSuspend(element) } diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt index 431a7a789e..01d6a20e36 100644 --- a/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt @@ -27,7 +27,7 @@ class ObservableSourceAsFlowStressTest : TestBase() { val latch = Channel(1) var i = 0 val observable = Observable.interval(100L, TimeUnit.MICROSECONDS) - .doOnNext { if (++i > 100) latch.offer(Unit) } + .doOnNext { if (++i > 100) latch.trySend(Unit) } val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default)) latch.receive() job.cancelAndJoin() diff --git a/ui/coroutines-guide-ui.md b/ui/coroutines-guide-ui.md index 7673c8f2b9..d5bd2320b7 100644 --- a/ui/coroutines-guide-ui.md +++ b/ui/coroutines-guide-ui.md @@ -268,7 +268,7 @@ fun Node.onClick(action: suspend (MouseEvent) -> Unit) { } // install a listener to offer events to this actor onMouseClicked = EventHandler { event -> - eventActor.offer(event) + eventActor.trySend(event) } } ``` @@ -276,12 +276,12 @@ fun Node.onClick(action: suspend (MouseEvent) -> Unit) { > You can get the full code [here](kotlinx-coroutines-javafx/test/guide/example-ui-actor-02.kt). The key idea that underlies an integration of an actor coroutine and a regular event handler is that -there is an [offer][SendChannel.offer] function on [SendChannel] that does not wait. It sends an element to the actor immediately, -if it is possible, or discards an element otherwise. An `offer` actually returns a `Boolean` result which we ignore here. +there is an [trySend][SendChannel.trySend] function on [SendChannel] that does not wait. It sends an element to the actor immediately, +if it is possible, or discards an element otherwise. A `trySend` actually returns a `ChanneResult` instance which we ignore here. Try clicking repeatedly on a circle in this version of the code. The clicks are just ignored while the countdown animation is running. This happens because the actor is busy with an animation and does not receive from its channel. -By default, an actor's mailbox is backed by `RendezvousChannel`, whose `offer` operation succeeds only when +By default, an actor's mailbox is backed by `RendezvousChannel`, whose `trySend` operation succeeds only when the `receive` is active. > On Android, there is `View` sent in OnClickListener, so we send the `View` to the actor as a signal. @@ -295,7 +295,7 @@ fun View.onClick(action: suspend (View) -> Unit) { } // install a listener to activate this actor setOnClickListener { - eventActor.offer(it) + eventActor.trySend(it) } } ``` @@ -319,9 +319,9 @@ fun Node.onClick(action: suspend (MouseEvent) -> Unit) { val eventActor = GlobalScope.actor(Dispatchers.Main, capacity = Channel.CONFLATED) { // <--- Changed here for (event in channel) action(event) // pass event to action } - // install a listener to offer events to this actor + // install a listener to send events to this actor onMouseClicked = EventHandler { event -> - eventActor.offer(event) + eventActor.trySend(event) } } ``` @@ -359,7 +359,7 @@ fun Node.onClick(action: suspend (MouseEvent) -> Unit) { for (event in channel) action(event) // pass event to action } onMouseClicked = EventHandler { event -> - eventActor.offer(event) + eventActor.trySend(event) } } --> @@ -623,7 +623,7 @@ After delay [actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/actor.html -[SendChannel.offer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/offer.html +[SendChannel.trySend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/try-send.html [SendChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/index.html [Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html [Channel.CONFLATED]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/-c-o-n-f-l-a-t-e-d.html diff --git a/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt b/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt index 1cbf9b6fe9..ebeaa3b84b 100644 --- a/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt +++ b/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt @@ -4,10 +4,9 @@ package kotlinx.coroutines.javafx -import javafx.beans.value.ChangeListener -import javafx.beans.value.ObservableValue +import javafx.beans.value.* import kotlinx.coroutines.* -import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* /** @@ -27,11 +26,11 @@ import kotlinx.coroutines.flow.* @ExperimentalCoroutinesApi // Since 1.3.x public fun ObservableValue.asFlow(): Flow = callbackFlow { val listener = ChangeListener { _, _, newValue -> - try { - offer(newValue) - } catch (e: CancellationException) { - // In case the event fires after the channel is closed - } + /* + * Do not propagate the exception to the ObservableValue, it + * already should've been handled by the downstream + */ + trySend(newValue) } addListener(listener) send(value) diff --git a/ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt b/ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt index 6964050102..bc40b0fdfb 100644 --- a/ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt +++ b/ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt @@ -83,4 +83,20 @@ class JavaFxObservableAsFlowTest : TestBase() { } } + @Test + fun testIntermediateCrash() = runTest { + if (!initPlatform()) { + println("Skipping JavaFxTest in headless environment") + return@runTest // ignore test in headless environments + } + + val property = SimpleIntegerProperty(0) + + assertFailsWith { + property.asFlow().onEach { + yield() + throw TestException() + }.collect() + } + } } diff --git a/ui/kotlinx-coroutines-javafx/test/guide/example-ui-actor-02.kt b/ui/kotlinx-coroutines-javafx/test/guide/example-ui-actor-02.kt index 51e94779e7..ec8a09f9bd 100644 --- a/ui/kotlinx-coroutines-javafx/test/guide/example-ui-actor-02.kt +++ b/ui/kotlinx-coroutines-javafx/test/guide/example-ui-actor-02.kt @@ -67,6 +67,6 @@ fun Node.onClick(action: suspend (MouseEvent) -> Unit) { } // install a listener to offer events to this actor onMouseClicked = EventHandler { event -> - eventActor.offer(event) + eventActor.trySend(event) } } diff --git a/ui/kotlinx-coroutines-javafx/test/guide/example-ui-actor-03.kt b/ui/kotlinx-coroutines-javafx/test/guide/example-ui-actor-03.kt index 81371678a9..aa152b7937 100644 --- a/ui/kotlinx-coroutines-javafx/test/guide/example-ui-actor-03.kt +++ b/ui/kotlinx-coroutines-javafx/test/guide/example-ui-actor-03.kt @@ -65,8 +65,8 @@ fun Node.onClick(action: suspend (MouseEvent) -> Unit) { val eventActor = GlobalScope.actor(Dispatchers.Main, capacity = Channel.CONFLATED) { // <--- Changed here for (event in channel) action(event) // pass event to action } - // install a listener to offer events to this actor + // install a listener to send events to this actor onMouseClicked = EventHandler { event -> - eventActor.offer(event) + eventActor.trySend(event) } } diff --git a/ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-01.kt b/ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-01.kt index ea5ac90a4b..0c89ea70f7 100644 --- a/ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-01.kt +++ b/ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-01.kt @@ -55,7 +55,7 @@ fun Node.onClick(action: suspend (MouseEvent) -> Unit) { for (event in channel) action(event) // pass event to action } onMouseClicked = EventHandler { event -> - eventActor.offer(event) + eventActor.trySend(event) } } diff --git a/ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-02.kt b/ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-02.kt index 504f2ee6c7..6e8b984a20 100644 --- a/ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-02.kt +++ b/ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-02.kt @@ -55,7 +55,7 @@ fun Node.onClick(action: suspend (MouseEvent) -> Unit) { for (event in channel) action(event) // pass event to action } onMouseClicked = EventHandler { event -> - eventActor.offer(event) + eventActor.trySend(event) } } diff --git a/ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-03.kt b/ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-03.kt index 0e1153673a..3ff5d7d5d0 100644 --- a/ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-03.kt +++ b/ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-03.kt @@ -55,7 +55,7 @@ fun Node.onClick(action: suspend (MouseEvent) -> Unit) { for (event in channel) action(event) // pass event to action } onMouseClicked = EventHandler { event -> - eventActor.offer(event) + eventActor.trySend(event) } }