Skip to content

Commit

Permalink
Multithreaded support in select expression
Browse files Browse the repository at this point in the history
Fixes #1764
  • Loading branch information
elizarov committed Mar 20, 2020
1 parent 80e3100 commit 8a796f8
Show file tree
Hide file tree
Showing 14 changed files with 261 additions and 71 deletions.
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/AbstractCoroutine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,6 @@ public abstract class AbstractCoroutine<in T>(
*/
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
startCoroutine(start, this, receiver, block)
startCoroutine(start, receiver, this, block)
}
}
10 changes: 5 additions & 5 deletions kotlinx-coroutines-core/common/src/Builders.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,13 @@ public suspend inline operator fun <T> CoroutineDispatcher.invoke(

internal fun <T, R> startCoroutineImpl(
start: CoroutineStart,
coroutine: AbstractCoroutine<T>,
receiver: R,
completion: Continuation<T>,
block: suspend R.() -> T
) = when (start) {
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, coroutine)
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, coroutine)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, coroutine)
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
CoroutineStart.LAZY -> Unit // will start lazily
}

Expand All @@ -189,8 +189,8 @@ internal fun <T, R> startCoroutineImpl(
// todo: impl a separate startCoroutineCancellable as a fast-path for startCoroutine(DEFAULT, ...)
internal expect fun <T, R> startCoroutine(
start: CoroutineStart,
coroutine: AbstractCoroutine<T>,
receiver: R,
completion: Continuation<T>,
block: suspend R.() -> T
)

Expand Down
97 changes: 67 additions & 30 deletions kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,10 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
select.disposeOnSelect(node)
return
}
enqueueResult is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(enqueueResult))
enqueueResult is Closed<*> -> {
node.dispose()
throw recoverStackTrace(helpCloseAndGetSendException(enqueueResult))
}
enqueueResult === ENQUEUE_FAILED -> {} // try to offer
enqueueResult is Receive<*> -> {} // try to offer
else -> error("enqueueSend returned $enqueueResult ")
Expand Down Expand Up @@ -448,16 +451,18 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
override val pollResult: Any?,
@JvmField val channel: AbstractSendChannel<E>,
@JvmField val select: SelectInstance<R>,
@JvmField val block: suspend (SendChannel<E>) -> R
block: suspend (SendChannel<E>) -> R
) : Send(), DisposableHandle {
@JvmField val block: suspend (SendChannel<E>) -> R = block.asShareable()

override fun tryResumeSend(otherOp: PrepareOp?): Symbol? =
select.trySelectOther(otherOp) as Symbol? // must return symbol
select.trySelectOther(otherOp, onSelect = block::shareableWillBeUsed) as Symbol? // must return symbol

override fun completeResumeSend() {
block.startCoroutine(receiver = channel, completion = select.completion)
}
override fun completeResumeSend() =
startCoroutine(CoroutineStart.ATOMIC, channel, select.completion, block)

override fun dispose() { // invoked on select completion
block.shareableDispose(useIt = false)
remove()
}

Expand Down Expand Up @@ -773,7 +778,11 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
): Boolean {
val node = ReceiveSelect(this, select, block, receiveMode)
val result = enqueueReceive(node)
if (result) select.disposeOnSelect(node)
if (result) {
select.disposeOnSelect(node)
} else {
node.dispose()
}
return result
}

Expand Down Expand Up @@ -871,41 +880,53 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}

private class ReceiveElement<in E>(
@JvmField val cont: CancellableContinuation<Any?>,
cont: CancellableContinuation<Any?>,
@JvmField val receiveMode: Int
) : Receive<E>() {
private val _cont = atomic<CancellableContinuation<Any?>?>(cont)
private fun useCont() = _cont.getAndSet(null)

fun resumeValue(value: E): Any? = when (receiveMode) {
RECEIVE_RESULT -> ValueOrClosed.value(value)
else -> value
}

@Suppress("IMPLICIT_CAST_TO_ANY")
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
val token = cont.tryResume(resumeValue(value), otherOp?.desc) ?: return null
val token = _cont.value?.tryResume(resumeValue(value), otherOp?.desc) ?: run {
_cont.value = null
return null
}
assert { token === RESUME_TOKEN } // the only other possible result
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
otherOp?.finishPrepare()
return RESUME_TOKEN
}

override fun completeResumeReceive(value: E) = cont.completeResume(RESUME_TOKEN)
override fun completeResumeReceive(value: E) { useCont()?.completeResume(RESUME_TOKEN) }

override fun resumeReceiveClosed(closed: Closed<*>) {
when {
receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> cont.resume(null)
receiveMode == RECEIVE_RESULT -> cont.resume(closed.toResult<Any>())
else -> cont.resumeWithException(closed.receiveException)
receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> useCont()?.resume(null)
receiveMode == RECEIVE_RESULT -> useCont()?.resume(closed.toResult<Any>())
else -> useCont()?.resumeWithException(closed.receiveException)
}
}
override fun toString(): String = "ReceiveElement@$hexAddress[receiveMode=$receiveMode]"
}

private class ReceiveHasNext<E>(
@JvmField val iterator: Itr<E>,
@JvmField val cont: CancellableContinuation<Boolean>
cont: CancellableContinuation<Boolean>
) : Receive<E>() {
private val _cont = atomic<CancellableContinuation<Boolean>?>(cont)
private fun useCont() = _cont.getAndSet(null)

override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
val token = cont.tryResume(true, otherOp?.desc) ?: return null
val token = _cont.value?.tryResume(true, otherOp?.desc) ?: run{
_cont.value = null
return null
}
assert { token === RESUME_TOKEN } // the only other possible result
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
otherOp?.finishPrepare()
Expand All @@ -918,51 +939,61 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
but completeResumeReceive is called once so we set iterator result here.
*/
iterator.result = value
cont.completeResume(RESUME_TOKEN)
useCont()?.completeResume(RESUME_TOKEN)
}

override fun resumeReceiveClosed(closed: Closed<*>) {
val token = if (closed.closeCause == null) {
cont.tryResume(false)
_cont.value?.tryResume(false)
} else {
cont.tryResumeWithException(recoverStackTrace(closed.receiveException, cont))
_cont.value?.let { cont ->
cont.tryResumeWithException(recoverStackTrace(closed.receiveException, cont))
}
}
if (token != null) {
iterator.result = closed
cont.completeResume(token)
_cont.value?.completeResume(token)
}
_cont.value = null
}
override fun toString(): String = "ReceiveHasNext@$hexAddress"
}

private class ReceiveSelect<R, E>(
@JvmField val channel: AbstractChannel<E>,
@JvmField val select: SelectInstance<R>,
@JvmField val block: suspend (Any?) -> R,
block: suspend (Any?) -> R,
@JvmField val receiveMode: Int
) : Receive<E>(), DisposableHandle {
@JvmField val block: suspend (Any?) -> R = block.asShareable() // captured variables in this block need screening

override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? =
select.trySelectOther(otherOp) as Symbol?
select.trySelectOther(otherOp, onSelect = block::shareableWillBeUsed) as Symbol?

@Suppress("UNCHECKED_CAST")
override fun completeResumeReceive(value: E) {
block.startCoroutine(if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value, select.completion)
startCoroutine(CoroutineStart.ATOMIC, if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value, select.completion, block)
}

override fun resumeReceiveClosed(closed: Closed<*>) {
if (!select.trySelect()) return
if (!select.trySelect(onSelect = block::shareableWillBeUsed)) return
when (receiveMode) {
RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectWithException(closed.receiveException)
RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed.closed<R>(closed.closeCause), select.completion)
RECEIVE_THROWS_ON_CLOSE -> {
block.shareableDispose(useIt = true)
select.resumeSelectWithException(closed.receiveException)
}
RECEIVE_RESULT -> startCoroutine(CoroutineStart.ATOMIC, ValueOrClosed.closed<R>(closed.closeCause), select.completion, block)
RECEIVE_NULL_ON_CLOSE -> if (closed.closeCause == null) {
block.startCoroutine(null, select.completion)
startCoroutine(CoroutineStart.ATOMIC, null, select.completion, block)
} else {
block.shareableDispose(useIt = true)
select.resumeSelectWithException(closed.receiveException)
}
}
}

override fun dispose() { // invoked on select completion
block.shareableDispose(useIt = false)
if (remove())
channel.onReceiveDequeued() // notify cancellation of receive
}
Expand Down Expand Up @@ -1031,17 +1062,23 @@ internal interface ReceiveOrClosed<in E> {
@Suppress("UNCHECKED_CAST")
internal class SendElement(
override val pollResult: Any?,
@JvmField val cont: CancellableContinuation<Unit>
cont: CancellableContinuation<Unit>
) : Send() {
private val _cont = atomic<CancellableContinuation<Unit>?>(cont)
private fun useCont() = _cont.getAndSet(null)

override fun tryResumeSend(otherOp: PrepareOp?): Symbol? {
val token = cont.tryResume(Unit, otherOp?.desc) ?: return null
val token = _cont.value?.tryResume(Unit, otherOp?.desc) ?: run {
_cont.value = null
return null
}
assert { token === RESUME_TOKEN } // the only other possible result
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
otherOp?.finishPrepare() // finish preparations
return RESUME_TOKEN
}
override fun completeResumeSend() = cont.completeResume(RESUME_TOKEN)
override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
override fun completeResumeSend() { useCont()?.completeResume(RESUME_TOKEN) }
override fun resumeSendClosed(closed: Closed<*>) { useCont()?.resumeWithException(closed.sendException) }
override fun toString(): String = "SendElement@$hexAddress($pollResult)"
}

Expand Down
7 changes: 7 additions & 0 deletions kotlinx-coroutines-core/common/src/internal/Sharing.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@ internal expect fun <T> Continuation<T>.asLocalOrNull() : Continuation<T>?
internal expect fun <T> Continuation<T>.asLocalOrNullIfNotUsed() : Continuation<T>?
internal expect fun <T> Continuation<T>.useLocal() : Continuation<T>
internal expect fun <T> Continuation<T>.shareableInterceptedResumeCancellableWith(result: Result<T>)
internal expect fun <T> Continuation<T>.shareableInterceptedResumeWith(result: Result<T>)
internal expect fun <T> Continuation<T>.shareableDispose()
internal expect fun disposeContinuation(cont: () -> Continuation<*>)
internal expect fun <T> CancellableContinuationImpl<T>.shareableResume(delegate: Continuation<T>, useMode: Int)

internal expect fun <T, R> (suspend (T) -> R).asShareable(): suspend (T) -> R
internal expect fun <T, R> (suspend (T) -> R).shareableDispose(useIt: Boolean)
internal expect fun <T, R> (suspend (T) -> R).shareableWillBeUsed()

internal expect fun isReuseSupportedInPlatform(): Boolean
internal expect fun <T> ArrayList<T>.addOrUpdate(element: T, update: (ArrayList<T>) -> Unit)
internal expect fun <T> ArrayList<T>.addOrUpdate(index: Int, element: T, update: (ArrayList<T>) -> Unit)
Expand Down
60 changes: 39 additions & 21 deletions kotlinx-coroutines-core/common/src/selects/Select.kt
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public interface SelectInstance<in R> {
/**
* Tries to select this instance. Returns `true` on success.
*/
public fun trySelect(): Boolean
public fun trySelect(onSelect: () -> Unit = {}): Boolean

/**
* Tries to select this instance. Returns:
Expand All @@ -130,7 +130,7 @@ public interface SelectInstance<in R> {
* member is public, but [Symbol] is internal. When [SelectInstance] becomes a `sealed interface`
* (see KT-222860) we can declare this method as internal.
*/
public fun trySelectOther(otherOp: PrepareOp?): Any?
public fun trySelectOther(otherOp: PrepareOp?, onSelect: () -> Unit): Any?

/**
* Performs action atomically with [trySelect].
Expand Down Expand Up @@ -233,10 +233,12 @@ private val selectOpSequenceNumber = SeqNumber()

@PublishedApi
internal class SelectBuilderImpl<in R>(
private val uCont: Continuation<R> // unintercepted delegate continuation
uCont: Continuation<R>
) : LockFreeLinkedListHead(), SelectBuilder<R>,
SelectInstance<R>, Continuation<R>, CoroutineStackFrame
{
private val uCont: Continuation<R> = uCont.asShareable() // unintercepted delegate continuation, shareable

override val callerFrame: CoroutineStackFrame?
get() = uCont as? CoroutineStackFrame

Expand Down Expand Up @@ -280,7 +282,10 @@ internal class SelectBuilderImpl<in R>(
when {
result === UNDECIDED -> {
val update = value()
if (_result.compareAndSet(UNDECIDED, update)) return
if (_result.compareAndSet(UNDECIDED, update)) {
uCont.shareableDispose() // will return result without calling continuation
return
}
}
result === COROUTINE_SUSPENDED -> if (_result.compareAndSet(COROUTINE_SUSPENDED, RESUMED)) {
block()
Expand All @@ -305,7 +310,7 @@ internal class SelectBuilderImpl<in R>(
// Resumes in dispatched way so that it can be called from an arbitrary context
override fun resumeSelectWithException(exception: Throwable) {
doResume({ CompletedExceptionally(recoverStackTrace(exception, uCont)) }) {
uCont.intercepted().resumeWith(Result.failure(exception))
uCont.shareableInterceptedResumeWith(Result.failure(exception))
}
}

Expand Down Expand Up @@ -346,16 +351,19 @@ internal class SelectBuilderImpl<in R>(
internal fun handleBuilderException(e: Throwable) {
if (trySelect()) {
resumeWithException(e)
} else if (e !is CancellationException) {
/*
* Cannot handle this exception -- builder was already resumed with a different exception,
* so treat it as "unhandled exception". But only if it is not the completion reason
* and it's not the cancellation. Otherwise, in the face of structured concurrency
* the same exception will be reported to the global exception handler.
*/
val result = getResult()
if (result !is CompletedExceptionally || unwrap(result.cause) !== unwrap(e)) {
handleCoroutineException(context, e)
} else {
disposeLockFreeLinkedList { this }
if (e !is CancellationException) {
/*
* Cannot handle this exception -- builder was already resumed with a different exception,
* so treat it as "unhandled exception". But only if it is not the completion reason
* and it's not the cancellation. Otherwise, in the face of structured concurrency
* the same exception will be reported to the global exception handler.
*/
val result = getResult()
if (result !is CompletedExceptionally || unwrap(result.cause) !== unwrap(e)) {
handleCoroutineException(context, e)
}
}
}
}
Expand All @@ -381,14 +389,16 @@ internal class SelectBuilderImpl<in R>(
}

private fun doAfterSelect() {
val parentHandle = _parentHandle.getAndSet(null)
parentHandle?.dispose()
forEach<DisposeNode> {
it.handle.dispose()
it.dispose()
}
disposeLockFreeLinkedList { this }
}

override fun trySelect(): Boolean {
val result = trySelectOther(null)
override fun trySelect(onSelect: () -> Unit): Boolean {
val result = trySelectOther(null, onSelect)
return when {
result === RESUME_TOKEN -> true
result == null -> false
Expand Down Expand Up @@ -481,7 +491,7 @@ internal class SelectBuilderImpl<in R>(

// it is just like plain trySelect, but support idempotent start
// Returns RESUME_TOKEN | RETRY_ATOMIC | null (when already selected)
override fun trySelectOther(otherOp: PrepareOp?): Any? {
override fun trySelectOther(otherOp: PrepareOp?, onSelect: () -> Unit): Any? {
_state.loop { state -> // lock-free loop on state
when {
// Found initial state (not selected yet) -- try to make it selected
Expand All @@ -496,6 +506,7 @@ internal class SelectBuilderImpl<in R>(
val decision = pairSelectOp.perform(this)
if (decision !== null) return decision
}
onSelect()
doAfterSelect()
return RESUME_TOKEN
}
Expand Down Expand Up @@ -653,6 +664,13 @@ internal class SelectBuilderImpl<in R>(
}

private class DisposeNode(
@JvmField val handle: DisposableHandle
) : LockFreeLinkedListNode()
handle: DisposableHandle
) : LockFreeLinkedListNode() {
private val _handle = atomic<DisposableHandle?>(handle)

fun dispose() {
val handle = _handle.getAndSet(null)
handle?.dispose()
}
}
}
Loading

0 comments on commit 8a796f8

Please sign in to comment.