From d30af7cc954224312d8cd7f55f5065ccf6bc064c Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 21 Oct 2024 17:43:13 +0200 Subject: [PATCH] Document and fix dispatchYield codepath (#4255) * Treat dispatchYield as regular dispatch in CoroutineScheduler * Always unpark a worker to avoid potential starvation in cases when coroutine was launched via UNDISPATCHED mechanism Fixes #4248 --- .../common/src/CoroutineDispatcher.kt | 3 +++ .../jvm/src/scheduling/CoroutineScheduler.kt | 23 ++++++++----------- .../jvm/src/scheduling/Dispatcher.kt | 18 +++++++++++---- .../test/scheduling/CoroutineSchedulerTest.kt | 3 +-- 4 files changed, 28 insertions(+), 19 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt index b208c84f83..45573f30cc 100644 --- a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt @@ -222,6 +222,9 @@ public abstract class CoroutineDispatcher : * Though the `yield` marker may be passed as a part of [context], this * is a separate method for performance reasons. * + * Implementation note: this entry-point is used for `Dispatchers.IO` and [Dispatchers.Default] + * unerlying implementations, see overrides for this method. + * * @suppress **This an internal API and should not be used from general code.** */ @InternalCoroutinesApi diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index 3c22116b66..3430ebadec 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -384,14 +384,14 @@ internal class CoroutineScheduler( * this [block] may execute blocking operations (IO, system calls, locking primitives etc.) * * [taskContext] -- concurrency context of given [block]. - * [tailDispatch] -- whether this [dispatch] call is the last action the (presumably) worker thread does in its current task. - * If `true`, then the task will be dispatched in a FIFO manner and no additional workers will be requested, - * but only if the current thread is a corresponding worker thread. + * [fair] -- whether this [dispatch] call is fair. + * If `true` then the task will be dispatched in a FIFO manner. * Note that caller cannot be ensured that it is being executed on worker thread for the following reasons: * - [CoroutineStart.UNDISPATCHED] - * - Concurrent [close] that effectively shutdowns the worker thread + * - Concurrent [close] that effectively shutdowns the worker thread. + * Used for [yield]. */ - fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) { + fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) { trackTask() // this is needed for virtual time support val task = createTask(block, taskContext) val isBlockingTask = task.isBlocking @@ -400,20 +400,18 @@ internal class CoroutineScheduler( val stateSnapshot = if (isBlockingTask) incrementBlockingTasks() else 0 // try to submit the task to the local queue and act depending on the result val currentWorker = currentWorker() - val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch) + val notAdded = currentWorker.submitToLocalQueue(task, fair) if (notAdded != null) { if (!addToGlobalQueue(notAdded)) { // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted throw RejectedExecutionException("$schedulerName was terminated") } } - val skipUnpark = tailDispatch && currentWorker != null // Checking 'task' instead of 'notAdded' is completely okay if (isBlockingTask) { // Use state snapshot to better estimate the number of running threads - signalBlockingWork(stateSnapshot, skipUnpark = skipUnpark) + signalBlockingWork(stateSnapshot) } else { - if (skipUnpark) return signalCpuWork() } } @@ -429,8 +427,7 @@ internal class CoroutineScheduler( } // NB: should only be called from 'dispatch' method due to blocking tasks increment - private fun signalBlockingWork(stateSnapshot: Long, skipUnpark: Boolean) { - if (skipUnpark) return + private fun signalBlockingWork(stateSnapshot: Long) { if (tryUnpark()) return // Use state snapshot to avoid accidental thread overprovision if (tryCreateWorker(stateSnapshot)) return @@ -506,7 +503,7 @@ internal class CoroutineScheduler( * Returns `null` if task was successfully added or an instance of the * task that was not added or replaced (thus should be added to global queue). */ - private fun Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? { + private fun Worker?.submitToLocalQueue(task: Task, fair: Boolean): Task? { if (this == null) return task /* * This worker could have been already terminated from this thread by close/shutdown and it should not @@ -518,7 +515,7 @@ internal class CoroutineScheduler( return task } mayHaveLocalTasks = true - return localQueue.add(task, fair = tailDispatch) + return localQueue.add(task, fair = fair) } private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this } diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt index 350e3e9aee..28d5537108 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt @@ -113,11 +113,21 @@ internal open class SchedulerCoroutineDispatcher( override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block) - override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = - coroutineScheduler.dispatch(block, tailDispatch = true) + override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit { + /* + * 'dispatchYield' implementation is needed to address the scheduler's scheduling policy. + * By default, the scheduler dispatches tasks in a semi-LIFO order, meaning that for the + * task sequence [#1, #2, #3], the scheduling of task #4 will produce + * [#4, #1, #2, #3], allocates new worker and makes #4 stealable after some time. + * On a fast enough system, it means that `while (true) { yield() }` might obstruct the progress + * of the system and potentially starve it. + * To mitigate that, `dispatchYield` is a dedicated entry point that produces [#1, #2, #3, #4] + */ + coroutineScheduler.dispatch(block, fair = true) + } - internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) { - coroutineScheduler.dispatch(block, context, tailDispatch) + internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean) { + coroutineScheduler.dispatch(block, context, fair) } override fun close() { diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerTest.kt index 042ea2f7d8..fe09090362 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerTest.kt @@ -1,7 +1,6 @@ package kotlinx.coroutines.scheduling import kotlinx.coroutines.testing.* -import kotlinx.coroutines.* import org.junit.Test import java.lang.Runnable import java.util.concurrent.* @@ -80,7 +79,7 @@ class CoroutineSchedulerTest : TestBase() { it.dispatch(Runnable { expect(2) finishLatch.countDown() - }, tailDispatch = true) + }, fair = true) }) startLatch.countDown()