Skip to content

Commit

Permalink
Deprecate SendChannel.offer and replace its usages along the code base
Browse files Browse the repository at this point in the history
  • Loading branch information
qwwdfsad committed Apr 12, 2021
1 parent 80862a5 commit dedd448
Show file tree
Hide file tree
Showing 48 changed files with 180 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ internal abstract class AbstractSendChannel<E>(
}

override fun offer(element: E): Boolean {
// Temporary migration for offer users who rely on onUndeliveredElement
try {
return super.offer(element)
} catch (e: Throwable) {
Expand Down
60 changes: 34 additions & 26 deletions kotlinx-coroutines-core/common/src/channels/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import kotlin.jvm.*
public interface SendChannel<in E> {
/**
* Returns `true` if this channel was closed by an invocation of [close]. This means that
* calling [send] or [offer] will result in an exception.
* calling [send] will result in an exception.
*
* **Note: This is an experimental api.** This property may change its semantics and/or name in the future.
*/
Expand All @@ -51,7 +51,7 @@ public interface SendChannel<in E> {
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
* This function can be used in [select] invocations with the [onSend] clause.
* Use [offer] to try sending to this channel without waiting.
* Use [trySend] to try sending to this channel without waiting.
*/
public suspend fun send(element: E)

Expand All @@ -64,23 +64,6 @@ public interface SendChannel<in E> {
*/
public val onSend: SelectClause2<E, SendChannel<E>>

/**
* Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions,
* and returns `true`. Otherwise, just returns `false`. This is a synchronous variant of [send] which backs off
* in situations when `send` suspends.
*
* Throws an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details).
*
* When `offer` call returns `false` it guarantees that the element was not delivered to the consumer and it
* it does not call `onUndeliveredElement` that was installed for this channel. If the channel was closed,
* then it calls `onUndeliveredElement` before throwing an exception.
* See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
*/
public fun offer(element: E): Boolean {
val result = trySend(element)
if (result.isSuccess) return true
throw recoverStackTrace(result.exceptionOrNull() ?: return false)
}

/**
* Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions,
Expand All @@ -103,7 +86,7 @@ public interface SendChannel<in E> {
* on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
* are received.
*
* A channel that was closed without a [cause] throws a [ClosedSendChannelException] on attempts to [send] or [offer]
* A channel that was closed without a [cause] throws a [ClosedSendChannelException] on attempts to [send]
* and [ClosedReceiveChannelException] on attempts to [receive][ReceiveChannel.receive].
* A channel that was closed with non-null [cause] is called a _failed_ channel. Attempts to send or
* receive on a failed channel throw the specified [cause] exception.
Expand All @@ -125,7 +108,7 @@ public interface SendChannel<in E> {
* ```
* val events = Channel(UNLIMITED)
* callbackBasedApi.registerCallback { event ->
* events.offer(event)
* events.trySend(event)
* }
*
* val uiUpdater = launch(Dispatchers.Main, parent = UILifecycle) {
Expand All @@ -146,6 +129,33 @@ public interface SendChannel<in E> {
*/
@ExperimentalCoroutinesApi
public fun invokeOnClose(handler: (cause: Throwable?) -> Unit)

/**
* **Deprecated** offer method.
*
* This method was deprecated in the favour of [trySend].
* It has proven itself as the most error-prone method in Channel API:
*
* * `Boolean` return type creates the false sense of security, implying that `false`
* is returned instead of throwing an exception.
* * It was used mostly from non-suspending APIs where CancellationException triggered
* internal failures in the application (the most common source of bugs).
* * Due to signature and explicit `if (ch.offer(...))` checks it was easy to
* oversee such error during code review.
* * Its name was not aligned with the rest of the API and tried to mimic Java's queue instead.
*
* See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context.
*/
@Deprecated(
level = DeprecationLevel.WARNING,
message = "Deprecated in the favour of 'trySend' method",
replaceWith = ReplaceWith("trySend(element).isSuccess")
)
public fun offer(element: E): Boolean {
val result = trySend(element)
if (result.isSuccess) return true
throw recoverStackTrace(result.exceptionOrNull() ?: return false)
}
}

/**
Expand Down Expand Up @@ -544,14 +554,14 @@ public interface ChannelIterator<out E> {
*
* * When `capacity` is [Channel.UNLIMITED] &mdash; it creates a channel with effectively unlimited buffer.
* This channel has a linked-list buffer of unlimited capacity (limited only by available memory).
* [Sending][send] to this channel never suspends, and [offer] always returns `true`.
* [Sending][send] to this channel never suspends, and [trySend] always succeeds.
*
* * When `capacity` is [Channel.CONFLATED] &mdash; it creates a _conflated_ channel
* This channel buffers at most one element and conflates all subsequent `send` and `offer` invocations,
* This channel buffers at most one element and conflates all subsequent `send` and `trySend` invocations,
* so that the receiver always gets the last element sent.
* Back-to-back sent elements are conflated &mdash; only the last sent element is received,
* while previously sent elements **are lost**.
* [Sending][send] to this channel never suspends, and [offer] always returns `true`.
* [Sending][send] to this channel never suspends, and [trySend] always succeeds.
*
* * When `capacity` is positive but less than [UNLIMITED] &mdash; it creates an array-based channel with the specified capacity.
* This channel has an array buffer of a fixed `capacity`.
Expand Down Expand Up @@ -598,8 +608,6 @@ public interface ChannelIterator<out E> {
*
* * When [send][SendChannel.send] operation throws an exception because it was cancelled before it had a chance to actually
* send the element or because the channel was [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel].
* * When [offer][SendChannel.offer] operation throws an exception when
* the channel was [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel].
* * When [receive][ReceiveChannel.receive], [receiveOrNull][ReceiveChannel.receiveOrNull], or [hasNext][ChannelIterator.hasNext]
* operation throws an exception when it had retrieved the element from the
* channel but was cancelled before the code following the receive call resumed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import kotlin.jvm.*
* Back-to-send sent elements are _conflated_ -- only the the most recently sent value is received,
* while previously sent elements **are lost**.
* Every subscriber immediately receives the most recently sent element.
* Sender to this broadcast channel never suspends and [offer] always returns `true`.
* Sender to this broadcast channel never suspends and [trySend] always succeeds.
*
* A secondary constructor can be used to create an instance of this class that already holds a value.
* This channel is also created by `BroadcastChannel(Channel.CONFLATED)` factory function invocation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*

/**
* Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations,
* Channel that buffers at most one element and conflates all subsequent `send` and `trySend` invocations,
* so that the receiver always gets the most recently sent element.
* Back-to-send sent elements are _conflated_ -- only the most recently sent element is received,
* while previously sent elements **are lost**.
* Sender to this channel never suspends and [offer] always returns `true`.
* Sender to this channel never suspends and [trySend] always succeeds.
*
* This channel is created by `Channel(Channel.CONFLATED)` factory function invocation.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import kotlinx.coroutines.selects.*

/**
* Channel with linked-list buffer of a unlimited capacity (limited only by available memory).
* Sender to this channel never suspends and [offer] always returns `true`.
* Sender to this channel never suspends and [trySend] always succeeds.
*
* This channel is created by `Channel(Channel.UNLIMITED)` factory function invocation.
*
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ import kotlin.native.concurrent.*
*
* To migrate [BroadcastChannel] usage to [SharedFlow], start by replacing usages of the `BroadcastChannel(capacity)`
* constructor with `MutableSharedFlow(0, extraBufferCapacity=capacity)` (broadcast channel does not replay
* values to new subscribers). Replace [send][BroadcastChannel.send] and [offer][BroadcastChannel.offer] calls
* values to new subscribers). Replace [send][BroadcastChannel.send] and [trySend][BroadcastChannel.trySend] calls
* with [emit][MutableStateFlow.emit] and [tryEmit][MutableStateFlow.tryEmit], and convert subscribers' code to flow operators.
*
* ### Concurrency
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/flow/StateFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ import kotlin.native.concurrent.*
*
* To migrate [ConflatedBroadcastChannel] usage to [StateFlow], start by replacing usages of the `ConflatedBroadcastChannel()`
* constructor with `MutableStateFlow(initialValue)`, using `null` as an initial value if you don't have one.
* Replace [send][ConflatedBroadcastChannel.send] and [offer][ConflatedBroadcastChannel.offer] calls
* Replace [send][ConflatedBroadcastChannel.send] and [trySend][ConflatedBroadcastChannel.trySend] calls
* with updates to the state flow's [MutableStateFlow.value], and convert subscribers' code to flow operators.
* You can use the [filterNotNull] operator to mimic behavior of a `ConflatedBroadcastChannel` without initial value.
*
Expand Down
12 changes: 6 additions & 6 deletions kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ class ArrayChannelTest : TestBase() {
}

@Test
fun testOfferAndPoll() = runTest {
fun testTryOp() = runTest {
val q = Channel<Int>(1)
assertTrue(q.offer(1))
assertTrue(q.trySend(1).isSuccess)
expect(1)
launch {
expect(3)
Expand All @@ -106,11 +106,11 @@ class ArrayChannelTest : TestBase() {
expect(2)
yield()
expect(6)
assertTrue(q.offer(2))
assertTrue(q.trySend(2).isSuccess)
expect(7)
assertTrue(q.offer(3))
assertTrue(q.trySend(3).isSuccess)
expect(8)
assertFalse(q.offer(4))
assertFalse(q.trySend(4).isSuccess)
yield()
finish(12)
}
Expand Down Expand Up @@ -157,7 +157,7 @@ class ArrayChannelTest : TestBase() {
val capacity = 42
val channel = Channel<Int>(capacity)
repeat(4) {
channel.offer(-1)
channel.trySend(-1)
}
repeat(4) {
channel.receiveCatching().getOrNull()
Expand Down
23 changes: 11 additions & 12 deletions kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ class BasicOperationsTest : TestBase() {
}

@Test
fun testOfferAfterClose() = runTest {
TestChannelKind.values().forEach { kind -> testOffer(kind) }
fun testTrySendAfterClose() = runTest {
TestChannelKind.values().forEach { kind -> testTrySend(kind) }
}

@Test
Expand All @@ -39,7 +39,7 @@ class BasicOperationsTest : TestBase() {
}
}
expect(1)
channel.offer(42)
channel.trySend(42)
expect(2)
channel.close(AssertionError())
finish(4)
Expand Down Expand Up @@ -131,22 +131,21 @@ class BasicOperationsTest : TestBase() {
finish(6)
}

private suspend fun testOffer(kind: TestChannelKind) = coroutineScope {
private suspend fun testTrySend(kind: TestChannelKind) = coroutineScope {
val channel = kind.create<Int>()
val d = async { channel.send(42) }
yield()
channel.close()

assertTrue(channel.isClosedForSend)
try {
channel.offer(2)
fail()
} catch (e: ClosedSendChannelException) {
if (!kind.isConflated) {
assertEquals(42, channel.receive())
channel.trySend(2)
.onSuccess { expectUnreached() }
.onFailure {
assertTrue { it is ClosedSendChannelException}
if (!kind.isConflated) {
assertEquals(42, channel.receive())
}
}
}

d.await()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ class ChannelBufferOverflowTest : TestBase() {
@Test
fun testDropLatest() = runTest {
val c = Channel<Int>(2, BufferOverflow.DROP_LATEST)
assertTrue(c.offer(1))
assertTrue(c.offer(2))
assertTrue(c.offer(3)) // overflows, dropped
assertTrue(c.trySend(1).isSuccess)
assertTrue(c.trySend(2).isSuccess)
assertTrue(c.trySend(3).isSuccess) // overflows, dropped
c.send(4) // overflows dropped
assertEquals(1, c.receive())
assertTrue(c.offer(5))
assertTrue(c.offer(6)) // overflows, dropped
assertTrue(c.trySend(5).isSuccess)
assertTrue(c.trySend(6).isSuccess) // overflows, dropped
assertEquals(2, c.receive())
assertEquals(5, c.receive())
assertEquals(null, c.poll())
Expand All @@ -26,15 +26,15 @@ class ChannelBufferOverflowTest : TestBase() {
@Test
fun testDropOldest() = runTest {
val c = Channel<Int>(2, BufferOverflow.DROP_OLDEST)
assertTrue(c.offer(1))
assertTrue(c.offer(2))
assertTrue(c.offer(3)) // overflows, keeps 2, 3
assertTrue(c.trySend(1).isSuccess)
assertTrue(c.trySend(2).isSuccess)
assertTrue(c.trySend(3).isSuccess) // overflows, keeps 2, 3
c.send(4) // overflows, keeps 3, 4
assertEquals(3, c.receive())
assertTrue(c.offer(5))
assertTrue(c.offer(6)) // overflows, keeps 5, 6
assertTrue(c.trySend(5).isSuccess)
assertTrue(c.trySend(6).isSuccess) // overflows, keeps 5, 6
assertEquals(5, c.receive())
assertEquals(6, c.receive())
assertEquals(null, c.poll())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ open class ConflatedChannelTest : TestBase() {
fun testBasicConflationOfferPoll() {
val q = createConflatedChannel<Int>()
assertNull(q.poll())
assertTrue(q.offer(1))
assertTrue(q.offer(2))
assertTrue(q.offer(3))
assertTrue(q.trySend(1).isSuccess)
assertTrue(q.trySend(2).isSuccess)
assertTrue(q.trySend(3).isSuccess)
assertEquals(3, q.poll())
assertNull(q.poll())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class LinkedListChannelTest : TestBase() {
fun testBasic() = runTest {
val c = Channel<Int>(Channel.UNLIMITED)
c.send(1)
check(c.offer(2))
assertTrue(c.trySend(2).isSuccess)
c.send(3)
check(c.close())
check(!c.close())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class RendezvousChannelTest : TestBase() {
@Test
fun testOfferAndPool() = runTest {
val q = Channel<Int>(Channel.RENDEZVOUS)
assertFalse(q.offer(1))
assertFalse(q.trySend(1).isSuccess)
expect(1)
launch {
expect(3)
Expand All @@ -99,7 +99,7 @@ class RendezvousChannelTest : TestBase() {
expect(2)
yield()
expect(5)
assertTrue(q.offer(2))
assertTrue(q.trySend(2).isSuccess)
expect(6)
yield()
expect(8)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,15 @@ class ChannelBuildersFlowTest : TestBase() {

var expected = 0
launch {
assertTrue(channel.offer(1)) // Handed to the coroutine
assertTrue(channel.offer(2)) // Buffered
assertFalse(channel.offer(3)) // Failed to offer
assertTrue(channel.trySend(1).isSuccess) // Handed to the coroutine
assertTrue(channel.trySend(2).isSuccess) // Buffered
assertFalse(channel.trySend(3).isSuccess) // Failed to offer
channel.send(3)
yield()
assertEquals(1, expected)
assertTrue(channel.offer(4)) // Handed to the coroutine
assertTrue(channel.offer(5)) // Buffered
assertFalse(channel.offer(6)) // Failed to offer
assertTrue(channel.trySend(4).isSuccess) // Handed to the coroutine
assertTrue(channel.trySend(5).isSuccess) // Buffered
assertFalse(channel.trySend(6).isSuccess) // Failed to offer
channel.send(6)
assertEquals(2, expected)
}
Expand Down
Loading

0 comments on commit dedd448

Please sign in to comment.