Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get rid of BeforeResumeCancelHandler #3744

Merged
merged 5 commits into from
May 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 7 additions & 17 deletions kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ internal open class CancellableContinuationImpl<in T>(
private fun callSegmentOnCancellation(segment: Segment<*>, cause: Throwable?) {
val index = _decisionAndIndex.value.index
check(index != NO_INDEX) { "The index for Segment.onCancellation(..) is broken" }
callCancelHandlerSafely { segment.onCancellation(index, cause) }
callCancelHandlerSafely { segment.onCancellation(index, cause, context) }
}

fun callOnCancellation(onCancellation: (cause: Throwable) -> Unit, cause: Throwable) {
Expand Down Expand Up @@ -376,8 +376,7 @@ internal open class CancellableContinuationImpl<in T>(
* [segment] and [index] in this [CancellableContinuationImpl].
*
* The only difference is that `segment.onCancellation(..)` is never
* called if this continuation is already completed; thus,
* the semantics is similar to [BeforeResumeCancelHandler].
* called if this continuation is already completed;
*
* ```
* invokeOnCancellation { cause ->
Expand Down Expand Up @@ -436,9 +435,8 @@ internal open class CancellableContinuationImpl<in T>(
* Continuation was already completed, and might already have cancel handler.
*/
if (state.cancelHandler != null) multipleHandlersError(handler, state)
// BeforeResumeCancelHandler and Segment.invokeOnCancellation(..)
// do NOT need to be called on completed continuation.
if (handler is BeforeResumeCancelHandler || handler is Segment<*>) return
// Segment.invokeOnCancellation(..) does NOT need to be called on completed continuation.
if (handler is Segment<*>) return
handler as CancelHandler
if (state.cancelled) {
// Was already cancelled while being dispatched -- invoke the handler directly
Expand All @@ -451,10 +449,10 @@ internal open class CancellableContinuationImpl<in T>(
else -> {
/*
* Continuation was already completed normally, but might get cancelled while being dispatched.
* Change its state to CompletedContinuation, unless we have BeforeResumeCancelHandler which
* Change its state to CompletedContinuation, unless we have Segment which
* does not need to be called in this case.
*/
if (handler is BeforeResumeCancelHandler || handler is Segment<*>) return
if (handler is Segment<*>) return
handler as CancelHandler
val update = CompletedContinuation(state, cancelHandler = handler)
if (_state.compareAndSet(state, update)) return // quit on cas success
Expand Down Expand Up @@ -489,7 +487,7 @@ internal open class CancellableContinuationImpl<in T>(
proposedUpdate
}
!resumeMode.isCancellableMode && idempotent == null -> proposedUpdate // cannot be cancelled in process, all is fine
onCancellation != null || (state is CancelHandler && state !is BeforeResumeCancelHandler) || idempotent != null ->
onCancellation != null || state is CancelHandler || idempotent != null ->
// mark as CompletedContinuation if special cases are present:
// Cancellation handlers that shall be called after resume or idempotent resume
CompletedContinuation(proposedUpdate, state as? CancelHandler, onCancellation, idempotent)
Expand Down Expand Up @@ -636,14 +634,6 @@ private object Active : NotCompleted {
*/
internal abstract class CancelHandler : CancelHandlerBase(), NotCompleted

/**
* Base class for all [CancellableContinuation.invokeOnCancellation] handlers that don't need to be invoked
* if continuation is cancelled after resumption, during dispatch, because the corresponding resources
* were already released before calling `resume`. This cancel handler is called only before `resume`.
* It avoids allocation of [CompletedContinuation] instance during resume on JVM.
*/
internal abstract class BeforeResumeCancelHandler : CancelHandler()

// Wrapper for lambdas, for the performance sake CancelHandler can be subclassed directly
private class InvokeOnCancel( // Clashes with InvokeOnCancellation
private val handler: CompletionHandler
Expand Down
151 changes: 42 additions & 109 deletions kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -172,34 +172,10 @@ internal open class BufferedChannel<E>(
segment: ChannelSegment<E>,
index: Int
) {
if (onUndeliveredElement == null) {
invokeOnCancellation(segment, index)
} else {
when (this) {
is CancellableContinuation<*> -> {
invokeOnCancellation(SenderWithOnUndeliveredElementCancellationHandler(segment, index, context).asHandler)
}
is SelectInstance<*> -> {
disposeOnCompletion(SenderWithOnUndeliveredElementCancellationHandler(segment, index, context))
}
is SendBroadcast -> {
cont.invokeOnCancellation(SenderWithOnUndeliveredElementCancellationHandler(segment, index, cont.context).asHandler)
}
else -> error("unexpected sender: $this")
}
}
}

private inner class SenderWithOnUndeliveredElementCancellationHandler(
private val segment: ChannelSegment<E>,
private val index: Int,
private val context: CoroutineContext
) : BeforeResumeCancelHandler(), DisposableHandle {
override fun dispose() {
segment.onSenderCancellationWithOnUndeliveredElement(index, context)
}

override fun invoke(cause: Throwable?) = dispose()
// To distinguish cancelled senders and receivers,
// senders equip the index value with an additional marker,
// adding `SEGMENT_SIZE` to the value.
invokeOnCancellation(segment, index + SEGMENT_SIZE)
}

private fun onClosedSendOnNoWaiterSuspend(element: E, cont: CancellableContinuation<Unit>) {
Expand Down Expand Up @@ -1594,7 +1570,7 @@ internal open class BufferedChannel<E>(
* and [SelectInstance.trySelect]. When the channel becomes closed,
* [tryResumeHasNextOnClosedChannel] should be used instead.
*/
private inner class BufferedChannelIterator : ChannelIterator<E>, BeforeResumeCancelHandler(), Waiter {
private inner class BufferedChannelIterator : ChannelIterator<E>, Waiter {
/**
* Stores the element retrieved by [hasNext] or
* a special [CHANNEL_CLOSED] token if this channel is closed.
Expand All @@ -1607,20 +1583,7 @@ internal open class BufferedChannel<E>(
* continuation. The [tryResumeHasNext] and [tryResumeHasNextOnClosedChannel]
* function resume this continuation when the [hasNext] invocation should complete.
*/
private var continuation: CancellableContinuation<Boolean>? = null

// When `hasNext()` suspends, the location where the continuation
// is stored is specified via the segment and the index in it.
// We need this information in the cancellation handler below.
private var segment: Segment<*>? = null
private var index = -1

/**
* Invoked on cancellation, [BeforeResumeCancelHandler] implementation.
*/
override fun invoke(cause: Throwable?) {
segment?.onCancellation(index, null)
}
private var continuation: CancellableContinuationImpl<Boolean>? = null

// `hasNext()` is just a special receive operation.
override suspend fun hasNext(): Boolean =
Expand Down Expand Up @@ -1680,11 +1643,7 @@ internal open class BufferedChannel<E>(
}

override fun invokeOnCancellation(segment: Segment<*>, index: Int) {
this.segment = segment
this.index = index
// It is possible that this `hasNext()` invocation is already
// resumed, and the `continuation` field is already updated to `null`.
this.continuation?.invokeOnCancellation(this.asHandler)
this.continuation?.invokeOnCancellation(segment, index)
}

private fun onClosedHasNextNoWaiterSuspend() {
Expand Down Expand Up @@ -2826,67 +2785,51 @@ internal class ChannelSegment<E>(id: Long, prev: ChannelSegment<E>?, channel: Bu
// # Cancellation Support #
// ########################

override fun onCancellation(index: Int, cause: Throwable?) {
onCancellation(index)
}

fun onSenderCancellationWithOnUndeliveredElement(index: Int, context: CoroutineContext) {
// Read the element first. If the operation has not been successfully resumed
// (this cancellation may be caused by prompt cancellation during dispatching),
// it is guaranteed that the element is presented.
override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) {
// To distinguish cancelled senders and receivers, senders equip the index value with
// an additional marker, adding `SEGMENT_SIZE` to the value.
val isSender = index >= SEGMENT_SIZE
// Unwrap the index.
@Suppress("NAME_SHADOWING") val index = if (isSender) index - SEGMENT_SIZE else index
// Read the element, which may be needed further to call `onUndeliveredElement`.
val element = getElement(index)
// Perform the cancellation; `onCancellationImpl(..)` return `true` if the
// cancelled operation had not been resumed. In this case, the `onUndeliveredElement`
// lambda should be called.
if (onCancellation(index)) {
channel.onUndeliveredElement!!.callUndeliveredElement(element, context)
}
}

/**
* Returns `true` if the request is successfully cancelled,
* and no rendezvous has happened. We need this knowledge
* to keep [BufferedChannel.onUndeliveredElement] correct.
*/
@Suppress("ConvertTwoComparisonsToRangeCheck")
fun onCancellation(index: Int): Boolean {
// Count the global index of this cell and read
// the current counters of send and receive operations.
val globalIndex = id * SEGMENT_SIZE + index
val s = channel.sendersCounter
val r = channel.receiversCounter
// Update the cell state trying to distinguish whether
// the cancelled coroutine is sender or receiver.
var isSender: Boolean
var isReceiver: Boolean
while (true) { // CAS-loop
// Update the cell state.
while (true) {
// CAS-loop
// Read the current state of the cell.
val cur = data[index * 2 + 1].value
val cur = getState(index)
when {
// The cell stores a waiter.
cur is Waiter || cur is WaiterEB -> {
// Is the cancelled request send for sure?
isSender = globalIndex < s && globalIndex >= r
// Is the cancelled request receiver for sure?
isReceiver = globalIndex < r && globalIndex >= s
// If the cancelled coroutine neither sender
// nor receiver, clean the element slot and finish.
// An opposite operation will resume this request
// and update the cell state eventually.
if (!isSender && !isReceiver) {
cleanElement(index)
return true
}
// The cancelled request is either send or receive.
// Update the cell state correspondingly.
val update = if (isSender) INTERRUPTED_SEND else INTERRUPTED_RCV
if (data[index * 2 + 1].compareAndSet(cur, update)) break
if (casState(index, cur, update)) {
// The waiter has been successfully cancelled.
// Clean the element slot and invoke `onSlotCleaned()`,
// which may cause deleting the whole segment from the linked list.
// In case the cancelled request is receiver, it is critical to ensure
// that the `expandBuffer()` attempt that processes this cell is completed,
// so `onCancelledRequest(..)` waits for its completion before invoking `onSlotCleaned()`.
cleanElement(index)
onCancelledRequest(index, !isSender)
// Call `onUndeliveredElement` if needed.
if (isSender) {
channel.onUndeliveredElement?.callUndeliveredElement(element, context)
}
return
}
}
// The cell already indicates that the operation is cancelled.
cur === INTERRUPTED_SEND || cur === INTERRUPTED_RCV -> {
// Clean the element slot to avoid memory leaks and finish.
// Clean the element slot to avoid memory leaks,
// invoke `onUndeliveredElement` if needed, and finish
cleanElement(index)
return true
// Call `onUndeliveredElement` if needed.
if (isSender) {
channel.onUndeliveredElement?.callUndeliveredElement(element, context)
}
return
}
// An opposite operation is resuming this request;
// wait until the cell state updates.
Expand All @@ -2897,23 +2840,13 @@ internal class ChannelSegment<E>(id: Long, prev: ChannelSegment<E>?, channel: Bu
cur === RESUMING_BY_EB || cur === RESUMING_BY_RCV -> continue
// This request was successfully resumed, so this cancellation
// is caused by the prompt cancellation feature and should be ignored.
cur === DONE_RCV || cur === BUFFERED -> return false
cur === DONE_RCV || cur === BUFFERED -> return
// The cell state indicates that the channel is closed;
// this cancellation should be ignored.
cur === CHANNEL_CLOSED -> {
return false
}
cur === CHANNEL_CLOSED -> return
else -> error("unexpected state: $cur")
}
}
// Clean the element slot and invoke `onSlotCleaned()`,
// which may cause deleting the whole segment from the linked list.
// In case the cancelled request is receiver, it is critical to ensure
// that the `expandBuffer()` attempt that processes this cell is completed,
// so `onCancelledRequest(..)` waits for its completion before invoking `onSlotCleaned()`.
cleanElement(index)
onCancelledRequest(index, isReceiver)
return true
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package kotlinx.coroutines.internal

import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlin.coroutines.*
import kotlin.jvm.*

/**
Expand Down Expand Up @@ -230,8 +231,14 @@ internal abstract class Segment<S : Segment<S>>(
* This function is invoked on continuation cancellation when this segment
* with the specified [index] are installed as cancellation handler via
* `SegmentDisposable.disposeOnCancellation(Segment, Int)`.
*
* @param index the index under which the sement registered itself in the continuation.
* Indicies are opaque and arithmetics or numeric intepretation is not allowed on them,
* as they may encode additional metadata.
* @param cause the cause of the cancellation, with the same semantics as [CancellableContinuation.invokeOnCancellation]
* @param context the context of the cancellable continuation the segment was registered in
*/
abstract fun onCancellation(index: Int, cause: Throwable?)
abstract fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext)

/**
* Invoked on each slot clean-up; should not be invoked twice for the same slot.
Expand Down
Loading