Skip to content

Commit

Permalink
Document and fix dispatchYield codepath (#4255)
Browse files Browse the repository at this point in the history
* Treat dispatchYield as regular dispatch in CoroutineScheduler
* Always unpark a worker to avoid potential starvation in cases when coroutine was launched via UNDISPATCHED mechanism

Fixes #4248
  • Loading branch information
qwwdfsad authored Oct 21, 2024
1 parent 46f9ccc commit d30af7c
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 19 deletions.
3 changes: 3 additions & 0 deletions kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ public abstract class CoroutineDispatcher :
* Though the `yield` marker may be passed as a part of [context], this
* is a separate method for performance reasons.
*
* Implementation note: this entry-point is used for `Dispatchers.IO` and [Dispatchers.Default]
* unerlying implementations, see overrides for this method.
*
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
Expand Down
23 changes: 10 additions & 13 deletions kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -384,14 +384,14 @@ internal class CoroutineScheduler(
* this [block] may execute blocking operations (IO, system calls, locking primitives etc.)
*
* [taskContext] -- concurrency context of given [block].
* [tailDispatch] -- whether this [dispatch] call is the last action the (presumably) worker thread does in its current task.
* If `true`, then the task will be dispatched in a FIFO manner and no additional workers will be requested,
* but only if the current thread is a corresponding worker thread.
* [fair] -- whether this [dispatch] call is fair.
* If `true` then the task will be dispatched in a FIFO manner.
* Note that caller cannot be ensured that it is being executed on worker thread for the following reasons:
* - [CoroutineStart.UNDISPATCHED]
* - Concurrent [close] that effectively shutdowns the worker thread
* - Concurrent [close] that effectively shutdowns the worker thread.
* Used for [yield].
*/
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) {
trackTask() // this is needed for virtual time support
val task = createTask(block, taskContext)
val isBlockingTask = task.isBlocking
Expand All @@ -400,20 +400,18 @@ internal class CoroutineScheduler(
val stateSnapshot = if (isBlockingTask) incrementBlockingTasks() else 0
// try to submit the task to the local queue and act depending on the result
val currentWorker = currentWorker()
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
val notAdded = currentWorker.submitToLocalQueue(task, fair)
if (notAdded != null) {
if (!addToGlobalQueue(notAdded)) {
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
throw RejectedExecutionException("$schedulerName was terminated")
}
}
val skipUnpark = tailDispatch && currentWorker != null
// Checking 'task' instead of 'notAdded' is completely okay
if (isBlockingTask) {
// Use state snapshot to better estimate the number of running threads
signalBlockingWork(stateSnapshot, skipUnpark = skipUnpark)
signalBlockingWork(stateSnapshot)
} else {
if (skipUnpark) return
signalCpuWork()
}
}
Expand All @@ -429,8 +427,7 @@ internal class CoroutineScheduler(
}

// NB: should only be called from 'dispatch' method due to blocking tasks increment
private fun signalBlockingWork(stateSnapshot: Long, skipUnpark: Boolean) {
if (skipUnpark) return
private fun signalBlockingWork(stateSnapshot: Long) {
if (tryUnpark()) return
// Use state snapshot to avoid accidental thread overprovision
if (tryCreateWorker(stateSnapshot)) return
Expand Down Expand Up @@ -506,7 +503,7 @@ internal class CoroutineScheduler(
* Returns `null` if task was successfully added or an instance of the
* task that was not added or replaced (thus should be added to global queue).
*/
private fun Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? {
private fun Worker?.submitToLocalQueue(task: Task, fair: Boolean): Task? {
if (this == null) return task
/*
* This worker could have been already terminated from this thread by close/shutdown and it should not
Expand All @@ -518,7 +515,7 @@ internal class CoroutineScheduler(
return task
}
mayHaveLocalTasks = true
return localQueue.add(task, fair = tailDispatch)
return localQueue.add(task, fair = fair)
}

private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
Expand Down
18 changes: 14 additions & 4 deletions kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,21 @@ internal open class SchedulerCoroutineDispatcher(

override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)

override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
coroutineScheduler.dispatch(block, tailDispatch = true)
override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit {
/*
* 'dispatchYield' implementation is needed to address the scheduler's scheduling policy.
* By default, the scheduler dispatches tasks in a semi-LIFO order, meaning that for the
* task sequence [#1, #2, #3], the scheduling of task #4 will produce
* [#4, #1, #2, #3], allocates new worker and makes #4 stealable after some time.
* On a fast enough system, it means that `while (true) { yield() }` might obstruct the progress
* of the system and potentially starve it.
* To mitigate that, `dispatchYield` is a dedicated entry point that produces [#1, #2, #3, #4]
*/
coroutineScheduler.dispatch(block, fair = true)
}

internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
coroutineScheduler.dispatch(block, context, tailDispatch)
internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean) {
coroutineScheduler.dispatch(block, context, fair)
}

override fun close() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kotlinx.coroutines.scheduling

import kotlinx.coroutines.testing.*
import kotlinx.coroutines.*
import org.junit.Test
import java.lang.Runnable
import java.util.concurrent.*
Expand Down Expand Up @@ -80,7 +79,7 @@ class CoroutineSchedulerTest : TestBase() {
it.dispatch(Runnable {
expect(2)
finishLatch.countDown()
}, tailDispatch = true)
}, fair = true)
})

startLatch.countDown()
Expand Down

0 comments on commit d30af7c

Please sign in to comment.