Skip to content

Commit

Permalink
Introduce CoroutineDispatcher.limitedParallelism and make Dispatchers…
Browse files Browse the repository at this point in the history
….IO unbounded for limited parallelism (Kotlin#2918)

* Introduce CoroutineDispatcher.limitedParallelism for granular concurrency control

* Elastic Dispatchers.IO:

    * Extract Ktor-obsolete API to a separate file for backwards compatibility
    * Make Dispatchers.IO being a slice of unlimited blocking scheduler
    * Make Dispatchers.IO.limitParallelism take slices from the same internal scheduler

Fixes Kotlin#2943
Fixes Kotlin#2919
  • Loading branch information
qwwdfsad authored and yorickhenning committed Oct 14, 2021
1 parent 9dda95a commit 5b4b612
Show file tree
Hide file tree
Showing 29 changed files with 755 additions and 347 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ abstract class ParametrizedDispatcherBase : CoroutineScope {
coroutineContext = when {
dispatcher == "fjp" -> ForkJoinPool.commonPool().asCoroutineDispatcher()
dispatcher == "scheduler" -> {
ExperimentalCoroutineDispatcher(CORES_COUNT).also { closeable = it }
Dispatchers.Default
}
dispatcher.startsWith("ftp") -> {
newFixedThreadPoolContext(dispatcher.substring(4).toInt(), dispatcher).also { closeable = it }
Expand Down
11 changes: 4 additions & 7 deletions benchmarks/src/jmh/kotlin/benchmarks/SemaphoreBenchmark.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,10 @@ package benchmarks

import benchmarks.common.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.scheduling.ExperimentalCoroutineDispatcher
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.sync.*
import org.openjdk.jmh.annotations.*
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.TimeUnit
import java.util.concurrent.*

@Warmup(iterations = 3, time = 500, timeUnit = TimeUnit.MICROSECONDS)
@Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MICROSECONDS)
Expand Down Expand Up @@ -84,7 +81,7 @@ open class SemaphoreBenchmark {

enum class SemaphoreBenchDispatcherCreator(val create: (parallelism: Int) -> CoroutineDispatcher) {
FORK_JOIN({ parallelism -> ForkJoinPool(parallelism).asCoroutineDispatcher() }),
EXPERIMENTAL({ parallelism -> ExperimentalCoroutineDispatcher(corePoolSize = parallelism, maxPoolSize = parallelism) })
EXPERIMENTAL({ parallelism -> Dispatchers.Default }) // TODO doesn't take parallelism into account
}

private const val WORK_INSIDE = 80
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ import kotlin.coroutines.*
@State(Scope.Benchmark)
open class PingPongWithBlockingContext {

@UseExperimental(InternalCoroutinesApi::class)
private val experimental = ExperimentalCoroutineDispatcher(8)
@UseExperimental(InternalCoroutinesApi::class)
private val blocking = experimental.blocking(8)
private val experimental = Dispatchers.Default
private val blocking = Dispatchers.IO.limitedParallelism(8)
private val threadPool = newFixedThreadPoolContext(8, "PongCtx")

@TearDown
Expand Down
8 changes: 4 additions & 4 deletions integration/kotlinx-coroutines-play-services/test/TaskTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ class TaskTest : TestBase() {
}

@Test
fun testCancelledAsTask() {
val deferred = GlobalScope.async {
fun testCancelledAsTask() = runTest {
val deferred = async(Dispatchers.Default) {
delay(100)
}.apply { cancel() }

Expand All @@ -60,8 +60,8 @@ class TaskTest : TestBase() {
}

@Test
fun testThrowingAsTask() {
val deferred = GlobalScope.async<Int> {
fun testThrowingAsTask() = runTest({ e -> e is TestException }) {
val deferred = async<Int>(Dispatchers.Default) {
throw TestException("Fail")
}

Expand Down
2 changes: 2 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public abstract class kotlinx/coroutines/CoroutineDispatcher : kotlin/coroutines
public fun get (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext$Element;
public final fun interceptContinuation (Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation;
public fun isDispatchNeeded (Lkotlin/coroutines/CoroutineContext;)Z
public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher;
public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext;
public final fun plus (Lkotlinx/coroutines/CoroutineDispatcher;)Lkotlinx/coroutines/CoroutineDispatcher;
public final fun releaseInterceptedContinuation (Lkotlin/coroutines/Continuation;)V
Expand Down Expand Up @@ -447,6 +448,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
public abstract class kotlinx/coroutines/MainCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher {
public fun <init> ()V
public abstract fun getImmediate ()Lkotlinx/coroutines/MainCoroutineDispatcher;
public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher;
public fun toString ()Ljava/lang/String;
protected final fun toStringInternalImpl ()Ljava/lang/String;
}
Expand Down
39 changes: 39 additions & 0 deletions kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,45 @@ public abstract class CoroutineDispatcher :
*/
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true

/**
* Creates a view of the current dispatcher that limits the parallelism to the given [value][parallelism].
* The resulting view uses the original dispatcher for execution, but with the guarantee that
* no more than [parallelism] coroutines are executed at the same time.
*
* This method does not impose restrictions on the number of views or the total sum of parallelism values,
* each view controls its own parallelism independently with the guarantee that the effective parallelism
* of all views cannot exceed the actual parallelism of the original dispatcher.
*
* ### Limitations
*
* The default implementation of `limitedParallelism` does not support direct dispatchers,
* such as executing the given runnable in place during [dispatch] calls.
* Any dispatcher that may return `false` from [isDispatchNeeded] is considered direct.
* For direct dispatchers, it is recommended to override this method
* and provide a domain-specific implementation or to throw an [UnsupportedOperationException].
*
* ### Example of usage
* ```
* private val backgroundDispatcher = newFixedThreadPoolContext(4, "App Background")
* // At most 2 threads will be processing images as it is really slow and CPU-intensive
* private val imageProcessingDispatcher = backgroundDispatcher.limitedParallelism(2)
* // At most 3 threads will be processing JSON to avoid image processing starvation
* private val imageProcessingDispatcher = backgroundDispatcher.limitedParallelism(3)
* // At most 1 thread will be doing IO
* private val fileWriterDispatcher = backgroundDispatcher.limitedParallelism(1)
* ```
* is 6. Yet at most 4 coroutines can be executed simultaneously as each view limits only its own parallelism.
*
* Note that this example was structured in such a way that it illustrates the parallelism guarantees.
* In practice, it is usually better to use [Dispatchers.IO] or [Dispatchers.Default] instead of creating a
* `backgroundDispatcher`. It is both possible and advised to call `limitedParallelism` on them.
*/
@ExperimentalCoroutinesApi
public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
parallelism.checkParallelism()
return LimitedDispatcher(this, parallelism)
}

/**
* Dispatches execution of a runnable [block] onto another thread in the given [context].
* This method should guarantee that the given [block] will be eventually invoked,
Expand Down
6 changes: 5 additions & 1 deletion kotlinx-coroutines-core/common/src/EventLoop.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ internal abstract class EventLoop : CoroutineDispatcher() {
}
}

final override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
parallelism.checkParallelism()
return this
}

open fun shutdown() {}
}

Expand Down Expand Up @@ -525,4 +530,3 @@ internal expect fun nanoTime(): Long
internal expect object DefaultExecutor {
public fun enqueue(task: Runnable)
}

8 changes: 8 additions & 0 deletions kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package kotlinx.coroutines

import kotlinx.coroutines.internal.*

/**
* Base class for special [CoroutineDispatcher] which is confined to application "Main" or "UI" thread
* and used for any UI-based activities. Instance of `MainDispatcher` can be obtained by [Dispatchers.Main].
Expand Down Expand Up @@ -51,6 +53,12 @@ public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {
*/
override fun toString(): String = toStringInternalImpl() ?: "$classSimpleName@$hexAddress"

override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
parallelism.checkParallelism()
// MainCoroutineDispatcher is single-threaded -- short-circuit any attempts to limit it
return this
}

/**
* Internal method for more specific [toString] implementations. It returns non-null
* string if this dispatcher is set in the platform as the main one.
Expand Down
6 changes: 6 additions & 0 deletions kotlinx-coroutines-core/common/src/Unconfined.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ import kotlin.jvm.*
* A coroutine dispatcher that is not confined to any specific thread.
*/
internal object Unconfined : CoroutineDispatcher() {

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
throw UnsupportedOperationException("limitedParallelism is not supported for Dispatchers.Unconfined")
}

override fun isDispatchNeeded(context: CoroutineContext): Boolean = false

override fun dispatch(context: CoroutineContext, block: Runnable) {
Expand Down
105 changes: 105 additions & 0 deletions kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.internal

import kotlinx.coroutines.*
import kotlin.coroutines.*
import kotlin.jvm.*

/**
* The result of .limitedParallelism(x) call, a dispatcher
* that wraps the given dispatcher, but limits the parallelism level, while
* trying to emulate fairness.
*/
internal class LimitedDispatcher(
private val dispatcher: CoroutineDispatcher,
private val parallelism: Int
) : CoroutineDispatcher(), Runnable, Delay by (dispatcher as? Delay ?: DefaultDelay) {

@Volatile
private var runningWorkers = 0

private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
parallelism.checkParallelism()
if (parallelism >= this.parallelism) return this
return super.limitedParallelism(parallelism)
}

override fun run() {
var fairnessCounter = 0
while (true) {
val task = queue.removeFirstOrNull()
if (task != null) {
try {
task.run()
} catch (e: Throwable) {
handleCoroutineException(EmptyCoroutineContext, e)
}
// 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this)) {
// Do "yield" to let other views to execute their runnable as well
// Note that we do not decrement 'runningWorkers' as we still committed to do our part of work
dispatcher.dispatch(this, this)
return
}
continue
}

@Suppress("CAST_NEVER_SUCCEEDS")
synchronized(this as SynchronizedObject) {
--runningWorkers
if (queue.size == 0) return
++runningWorkers
fairnessCounter = 0
}
}
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
dispatchInternal(block) {
dispatcher.dispatch(this, this)
}
}

@InternalCoroutinesApi
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
dispatchInternal(block) {
dispatcher.dispatchYield(this, this)
}
}

private inline fun dispatchInternal(block: Runnable, dispatch: () -> Unit) {
// Add task to queue so running workers will be able to see that
if (addAndTryDispatching(block)) return
/*
* Protect against the race when the number of workers is enough,
* but one (because of synchronized serialization) attempts to complete,
* and we just observed the number of running workers smaller than the actual
* number (hit right between `--runningWorkers` and `++runningWorkers` in `run()`)
*/
if (!tryAllocateWorker()) return
dispatch()
}

private fun tryAllocateWorker(): Boolean {
@Suppress("CAST_NEVER_SUCCEEDS")
synchronized(this as SynchronizedObject) {
if (runningWorkers >= parallelism) return false
++runningWorkers
return true
}
}

private fun addAndTryDispatching(block: Runnable): Boolean {
queue.addLast(block)
return runningWorkers >= parallelism
}
}

// Save a few bytecode ops
internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive parallelism level, but got $this" }
5 changes: 5 additions & 0 deletions kotlinx-coroutines-core/js/src/JSDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ internal sealed class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay {

abstract fun scheduleQueueProcessing()

override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
parallelism.checkParallelism()
return this
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
messageQueue.enqueue(block)
}
Expand Down
30 changes: 23 additions & 7 deletions kotlinx-coroutines-core/jvm/src/Dispatchers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public actual object Dispatchers {
* Note that if you need your coroutine to be confined to a particular thread or a thread-pool after resumption,
* but still want to execute it in the current call-frame until its first suspension, then you can use
* an optional [CoroutineStart] parameter in coroutine builders like
* [launch][CoroutineScope.launch] and [async][CoroutineScope.async] setting it to the
* [launch][CoroutineScope.launch] and [async][CoroutineScope.async] setting it to
* the value of [CoroutineStart.UNDISPATCHED].
*/
@JvmStatic
Expand All @@ -100,22 +100,38 @@ public actual object Dispatchers {
* "`kotlinx.coroutines.io.parallelism`" ([IO_PARALLELISM_PROPERTY_NAME]) system property.
* It defaults to the limit of 64 threads or the number of cores (whichever is larger).
*
* Moreover, the maximum configurable number of threads is capped by the
* `kotlinx.coroutines.scheduler.max.pool.size` system property.
* If you need a higher number of parallel threads,
* you should use a custom dispatcher backed by your own thread pool.
* ### Elasticity for limited parallelism
*
* `Dispatchers.IO` has a unique property of elasticity: its views
* obtained with [CoroutineDispatcher.limitedParallelism] are
* not restricted by the `Dispatchers.IO` parallelism. Conceptually, there is
* a dispatcher backed by an unlimited pool of threads, and both `Dispatchers.IO`
* and views of `Dispatchers.IO` are actually views of that dispatcher. In practice
* this means that, despite not abiding by `Dispatchers.IO`'s parallelism
* restrictions, its views share threads and resources with it.
*
* In the following example
* ```
* // 100 threads for MySQL connection
* val myMysqlDbDispatcher = Dispatchers.IO.limitedParallelism(100)
* // 60 threads for MongoDB connection
* val myMongoDbDispatcher = Dispatchers.IO.limitedParallelism(60)
* ```
* the system may have up to `64 + 100 + 60` threads dedicated to blocking tasks during peak loads,
* but during its steady state there is only a small number of threads shared
* among `Dispatchers.IO`, `myMysqlDbDispatcher` and `myMongoDbDispatcher`.
*
* ### Implementation note
*
* This dispatcher shares threads with the [Default][Dispatchers.Default] dispatcher, so using
* This dispatcher and its views share threads with the [Default][Dispatchers.Default] dispatcher, so using
* `withContext(Dispatchers.IO) { ... }` when already running on the [Default][Dispatchers.Default]
* dispatcher does not lead to an actual switching to another thread &mdash; typically execution
* continues in the same thread.
* As a result of thread sharing, more than 64 (default parallelism) threads can be created (but not used)
* during operations over IO dispatcher.
*/
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO
public val IO: CoroutineDispatcher = DefaultIoScheduler

/**
* Shuts down built-in dispatchers, such as [Default] and [IO],
Expand Down
3 changes: 3 additions & 0 deletions kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ private class MissingMainCoroutineDispatcher(
override fun isDispatchNeeded(context: CoroutineContext): Boolean =
missing()

override fun limitedParallelism(parallelism: Int): CoroutineDispatcher =
missing()

override suspend fun delay(time: Long) =
missing()

Expand Down
Loading

0 comments on commit 5b4b612

Please sign in to comment.