Skip to content

Commit

Permalink
~ a different approach that actually works on JS, simplifies code
Browse files Browse the repository at this point in the history
  • Loading branch information
elizarov committed Dec 4, 2020
1 parent af95e9f commit a5e8496
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 65 deletions.
5 changes: 2 additions & 3 deletions kotlinx-coroutines-core/common/src/Await.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package kotlinx.coroutines

import kotlinx.atomicfu.*
import kotlinx.coroutines.channels.*
import kotlin.coroutines.*

/**
Expand Down Expand Up @@ -75,7 +74,7 @@ private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
val nodes = Array(deferreds.size) { i ->
val deferred = deferreds[i]
deferred.start() // To properly await lazily started deferreds
AwaitAllNode(cont, deferred).apply {
AwaitAllNode(cont).apply {
handle = deferred.invokeOnCompletion(asHandler)
}
}
Expand All @@ -101,7 +100,7 @@ private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
override fun toString(): String = "DisposeHandlersOnCancel[$nodes]"
}

private inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>, job: Job) : JobNode<Job>(job) {
private inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>) : JobNode() {
lateinit var handle: DisposableHandle

private val _disposer = atomic<DisposeHandlersOnCancel?>(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ internal open class CancellableContinuationImpl<in T>(
val parent = delegate.context[Job] ?: return // fast path 3 -- don't do anything without parent
val handle = parent.invokeOnCompletion(
onCancelling = true,
handler = ChildContinuation(parent, this).asHandler
handler = ChildContinuation(this).asHandler
)
parentHandle = handle
// now check our state _after_ registering (could have completed while we were registering)
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/Job.kt
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ public interface ChildHandle : DisposableHandle {
* ```
*/
internal fun Job.disposeOnCompletion(handle: DisposableHandle): DisposableHandle =
invokeOnCompletion(handler = DisposeOnCompletion(this, handle).asHandler)
invokeOnCompletion(handler = DisposeOnCompletion(handle).asHandler)

/**
* Cancels the job and suspends the invoking coroutine until the cancelled job is complete.
Expand Down
101 changes: 47 additions & 54 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
// fast-path method to finalize normally completed coroutines without children
// returns true if complete, and afterCompletion(update) shall be called
private fun tryFinalizeSimpleState(state: Incomplete, update: Any?): Boolean {
assert { state is Empty || state is JobNode<*> } // only simple state without lists where children can concurrently add
assert { state is Empty || state is JobNode } // only simple state without lists where children can concurrently add
assert { update !is CompletedExceptionally } // only for normal completion
if (!_state.compareAndSet(state, update.boxIncomplete())) return false
onCancelling(null) // simple state is not a failure
Expand All @@ -313,7 +313,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
* 2) Invoke completion handlers: .join(), callbacks etc.
* It's important to invoke them only AFTER exception handling and everything else, see #208
*/
if (state is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case)
if (state is JobNode) { // SINGLE/SINGLE+ state -- one completion handler (common case)
try {
state.invoke(cause)
} catch (ex: Throwable) {
Expand All @@ -327,7 +327,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
private fun notifyCancelling(list: NodeList, cause: Throwable) {
// first cancel our own children
onCancelling(cause)
notifyHandlers<JobCancellingNode<*>>(list, cause)
notifyHandlers<JobCancellingNode>(list, cause)
// then cancel parent
cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
}
Expand Down Expand Up @@ -359,9 +359,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
}

private fun NodeList.notifyCompletion(cause: Throwable?) =
notifyHandlers<JobNode<*>>(this, cause)
notifyHandlers<JobNode>(this, cause)

private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) {
private inline fun <reified T: JobNode> notifyHandlers(list: NodeList, cause: Throwable?) {
var exception: Throwable? = null
list.forEach<T> { node ->
try {
Expand Down Expand Up @@ -453,21 +453,22 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
invokeImmediately: Boolean,
handler: CompletionHandler
): DisposableHandle {
var nodeCache: JobNode<*>? = null
// Create node upfront -- for common cases it jus sets lateinit JobNode.job field,
// for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok.
val node: JobNode = makeNode(handler, onCancelling)
loopOnState { state ->
when (state) {
is Empty -> { // EMPTY_X state -- no completion handlers
if (state.isActive) {
// try move to SINGLE state
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
if (_state.compareAndSet(state, node)) return node
} else
promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
}
is Incomplete -> {
val list = state.list
if (list == null) { // SINGLE/SINGLE+
promoteSingleToNodeList(state as JobNode<*>)
promoteSingleToNodeList(state as JobNode)
} else {
var rootCause: Throwable? = null
var handle: DisposableHandle = NonDisposableHandle
Expand All @@ -479,7 +480,6 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
// or we are adding a child to a coroutine that is not completing yet
if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) {
// Note: add node the list while holding lock on state (make sure it cannot change)
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
if (!addLastAtomic(state, list, node)) return@loopOnState // retry
// just return node if we don't have to invoke handler (not cancelling yet)
if (rootCause == null) return node
Expand All @@ -493,7 +493,6 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
if (invokeImmediately) handler.invokeIt(rootCause)
return handle
} else {
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
if (addLastAtomic(state, list, node)) return node
}
}
Expand All @@ -508,19 +507,20 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
}
}

private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode<*> =
if (onCancelling) {
(handler as? JobCancellingNode<*>)
?.takeIf { it.job === this }
?: InvokeOnCancelling(this, handler)
private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode {
val node = if (onCancelling) {
(handler as? JobCancellingNode)
?: InvokeOnCancelling(handler)
} else {
(handler as? JobNode<*>)
(handler as? JobNode)
?.also { assert { it !is JobCancellingNode } }
?.takeIf { it.job === this }
?: InvokeOnCompletion(this, handler)
?: InvokeOnCompletion(handler)
}
node.job = this
return node
}

private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode<*>) =
private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode) =
list.addLastIf(node) { this.state === expect }

private fun promoteEmptyToNodeList(state: Empty) {
Expand All @@ -530,7 +530,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
_state.compareAndSet(state, update)
}

private fun promoteSingleToNodeList(state: JobNode<*>) {
private fun promoteSingleToNodeList(state: JobNode) {
// try to promote it to list (SINGLE+ state)
state.addOneIfEmpty(NodeList())
// it must be in SINGLE+ state or state has changed (node could have need removed from state)
Expand All @@ -556,7 +556,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren

private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
// We have to invoke join() handler only on cancellation, on completion we will be resumed regularly without handlers
cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler))
cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(cont).asHandler))
}

public final override val onJoin: SelectClause0
Expand All @@ -576,7 +576,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
}
if (startInternal(state) == 0) {
// slow-path -- register waiter for completion
select.disposeOnSelect(invokeOnCompletion(handler = SelectJoinOnCompletion(this, select, block).asHandler))
select.disposeOnSelect(invokeOnCompletion(handler = SelectJoinOnCompletion(select, block).asHandler))
return
}
}
Expand All @@ -585,11 +585,11 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
/**
* @suppress **This is unstable API and it is subject to change.**
*/
internal fun removeNode(node: JobNode<*>) {
internal fun removeNode(node: JobNode) {
// remove logic depends on the state of the job
loopOnState { state ->
when (state) {
is JobNode<*> -> { // SINGE/SINGLE+ state -- one completion handler
is JobNode -> { // SINGE/SINGLE+ state -- one completion handler
if (state !== node) return // a different job node --> we were already removed
// try remove and revert back to empty state
if (_state.compareAndSet(state, EMPTY_ACTIVE)) return
Expand Down Expand Up @@ -773,7 +773,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
private fun getOrPromoteCancellingList(state: Incomplete): NodeList? = state.list ?:
when (state) {
is Empty -> NodeList() // we can allocate new empty list that'll get integrated into Cancelling state
is JobNode<*> -> {
is JobNode -> {
// SINGLE/SINGLE+ must be promoted to NodeList first, because otherwise we cannot
// correctly capture a reference to it
promoteSingleToNodeList(state)
Expand Down Expand Up @@ -852,7 +852,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
* Otherwise, there can be a race between (completed state -> handled exception and newly attached child/join)
* which may miss unhandled exception.
*/
if ((state is Empty || state is JobNode<*>) && state !is ChildHandleNode && proposedUpdate !is CompletedExceptionally) {
if ((state is Empty || state is JobNode) && state !is ChildHandleNode && proposedUpdate !is CompletedExceptionally) {
if (tryFinalizeSimpleState(state, proposedUpdate)) {
// Completed successfully on fast path -- return updated state
return proposedUpdate
Expand Down Expand Up @@ -967,7 +967,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
* If child is attached when the job is already being cancelled, such child will receive immediate notification on
* cancellation, but parent *will* wait for that child before completion and will handle its exception.
*/
return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(this, child).asHandler) as ChildHandle
return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(child).asHandler) as ChildHandle
}

/**
Expand Down Expand Up @@ -1150,7 +1150,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
private val state: Finishing,
private val child: ChildHandleNode,
private val proposedUpdate: Any?
) : JobNode<Job>(child.childJob) {
) : JobNode() {
override fun invoke(cause: Throwable?) {
parent.continueCompleting(state, child, proposedUpdate)
}
Expand Down Expand Up @@ -1228,7 +1228,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
* thrown and not a JobCancellationException.
*/
val cont = AwaitContinuation(uCont.intercepted(), this)
cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler))
cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler))
cont.getResult()
}

Expand All @@ -1255,7 +1255,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
}
if (startInternal(state) == 0) {
// slow-path -- register waiter for completion
select.disposeOnSelect(invokeOnCompletion(handler = SelectAwaitOnCompletion(this, select, block).asHandler))
select.disposeOnSelect(invokeOnCompletion(handler = SelectAwaitOnCompletion(select, block).asHandler))
return
}
}
Expand Down Expand Up @@ -1345,12 +1345,14 @@ internal interface Incomplete {
val list: NodeList? // is null only for Empty and JobNode incomplete state objects
}

internal abstract class JobNode<out J : Job>(
@JvmField val job: J
) : CompletionHandlerBase(), DisposableHandle, Incomplete {
internal abstract class JobNode : CompletionHandlerBase(), DisposableHandle, Incomplete {
/**
* Initialized by [JobSupport.makeNode].
*/
lateinit var job: JobSupport
override val isActive: Boolean get() = true
override val list: NodeList? get() = null
override fun dispose() = (job as JobSupport).removeNode(this)
override fun dispose() = job.removeNode(this)
override fun toString() = "$classSimpleName@$hexAddress[job@${job.hexAddress}]"
}

Expand All @@ -1363,7 +1365,7 @@ internal class NodeList : LockFreeLinkedListHead(), Incomplete {
append(state)
append("}[")
var first = true
this@NodeList.forEach<JobNode<*>> { node ->
this@NodeList.forEach<JobNode> { node ->
if (first) first = false else append(", ")
append(node)
}
Expand All @@ -1382,23 +1384,20 @@ internal class InactiveNodeList(
}

private class InvokeOnCompletion(
job: Job,
private val handler: CompletionHandler
) : JobNode<Job>(job) {
) : JobNode() {
override fun invoke(cause: Throwable?) = handler.invoke(cause)
}

private class ResumeOnCompletion(
job: Job,
private val continuation: Continuation<Unit>
) : JobNode<Job>(job) {
) : JobNode() {
override fun invoke(cause: Throwable?) = continuation.resume(Unit)
}

private class ResumeAwaitOnCompletion<T>(
job: JobSupport,
private val continuation: CancellableContinuationImpl<T>
) : JobNode<JobSupport>(job) {
) : JobNode() {
override fun invoke(cause: Throwable?) {
val state = job.state
assert { state !is Incomplete }
Expand All @@ -1414,28 +1413,25 @@ private class ResumeAwaitOnCompletion<T>(
}

internal class DisposeOnCompletion(
job: Job,
private val handle: DisposableHandle
) : JobNode<Job>(job) {
) : JobNode() {
override fun invoke(cause: Throwable?) = handle.dispose()
}

private class SelectJoinOnCompletion<R>(
job: JobSupport,
private val select: SelectInstance<R>,
private val block: suspend () -> R
) : JobNode<JobSupport>(job) {
) : JobNode() {
override fun invoke(cause: Throwable?) {
if (select.trySelect())
block.startCoroutineCancellable(select.completion)
}
}

private class SelectAwaitOnCompletion<T, R>(
job: JobSupport,
private val select: SelectInstance<R>,
private val block: suspend (T) -> R
) : JobNode<JobSupport>(job) {
) : JobNode() {
override fun invoke(cause: Throwable?) {
if (select.trySelect())
job.selectAwaitCompletion(select, block)
Expand All @@ -1448,12 +1444,11 @@ private class SelectAwaitOnCompletion<T, R>(
* Marker for node that shall be invoked on in _cancelling_ state.
* **Note: may be invoked multiple times.**
*/
internal abstract class JobCancellingNode<out J : Job>(job: J) : JobNode<J>(job)
internal abstract class JobCancellingNode : JobNode()

private class InvokeOnCancelling(
job: Job,
private val handler: CompletionHandler
) : JobCancellingNode<Job>(job) {
) : JobCancellingNode() {
// delegate handler shall be invoked at most once, so here is an additional flag
private val _invoked = atomic(0) // todo: replace with atomic boolean after migration to recent atomicFu
override fun invoke(cause: Throwable?) {
Expand All @@ -1462,18 +1457,16 @@ private class InvokeOnCancelling(
}

internal class ChildHandleNode(
parent: JobSupport,
@JvmField val childJob: ChildJob
) : JobCancellingNode<JobSupport>(parent), ChildHandle {
) : JobCancellingNode(), ChildHandle {
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
}

// Same as ChildHandleNode, but for cancellable continuation
internal class ChildContinuation(
parent: Job,
@JvmField val child: CancellableContinuationImpl<*>
) : JobCancellingNode<Job>(parent) {
) : JobCancellingNode() {
override fun invoke(cause: Throwable?) {
child.parentCancelled(child.getContinuationCancellationCause(job))
}
Expand Down
6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/common/src/selects/Select.kt
Original file line number Diff line number Diff line change
Expand Up @@ -327,13 +327,13 @@ internal class SelectBuilderImpl<in R>(
private fun initCancellability() {
val parent = context[Job] ?: return
val newRegistration = parent.invokeOnCompletion(
onCancelling = true, handler = SelectOnCancelling(parent).asHandler)
onCancelling = true, handler = SelectOnCancelling().asHandler)
parentHandle = newRegistration
// now check our state _after_ registering
if (isSelected) newRegistration.dispose()
}

private inner class SelectOnCancelling(job: Job) : JobCancellingNode<Job>(job) {
private inner class SelectOnCancelling : JobCancellingNode() {
// Note: may be invoked multiple times, but only the first trySelect succeeds anyway
override fun invoke(cause: Throwable?) {
if (trySelect())
Expand Down Expand Up @@ -552,7 +552,7 @@ internal class SelectBuilderImpl<in R>(
return decision
}

override val atomicOp: AtomicOp<*>?
override val atomicOp: AtomicOp<*>
get() = otherOp.atomicOp
}

Expand Down
Loading

0 comments on commit a5e8496

Please sign in to comment.