Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve DebugProbes performance #3534

Merged
merged 4 commits into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions benchmarks/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ dependencies {

implementation("com.typesafe.akka:akka-actor_2.12:2.5.0")
implementation(project(":kotlinx-coroutines-core"))
implementation(project(":kotlinx-coroutines-debug"))
implementation(project(":kotlinx-coroutines-reactive"))

// add jmh dependency on main
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks.debug

import kotlinx.coroutines.*
import kotlinx.coroutines.debug.*
import org.openjdk.jmh.annotations.*
import org.openjdk.jmh.annotations.State
import java.util.concurrent.*

@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 5, time = 1)
@Fork(value = 1)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Benchmark)
open class DebugProbesConcurrentBenchmark {

@Setup
fun setup() {
DebugProbes.sanitizeStackTraces = false
DebugProbes.enableCreationStackTraces = false
DebugProbes.install()
}

@TearDown
fun tearDown() {
DebugProbes.uninstall()
}


@Benchmark
fun run() = runBlocking<Long> {
var sum = 0L
repeat(8) {
launch(Dispatchers.Default) {
val seq = stressSequenceBuilder((1..100).asSequence()) {
(1..it).asSequence()
}

for (i in seq) {
sum += i.toLong()
}
}
}
sum
}

private fun <Node> stressSequenceBuilder(initialSequence: Sequence<Node>, children: (Node) -> Sequence<Node>): Sequence<Node> {
return sequence {
val initialIterator = initialSequence.iterator()
if (!initialIterator.hasNext()) {
return@sequence
}
val visited = HashSet<Node>()
val sequences = ArrayDeque<Sequence<Node>>()
sequences.addLast(initialIterator.asSequence())
while (sequences.isNotEmpty()) {
val currentSequence = sequences.removeFirst()
for (node in currentSequence) {
if (visited.add(node)) {
yield(node)
sequences.addLast(children(node))
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ internal const val SUSPENDED = "SUSPENDED"

/**
* Internal implementation class where debugger tracks details it knows about each coroutine.
* Its mutable fields can be updated concurrently, thus marked with `@Volatile`
*/
internal class DebugCoroutineInfoImpl(
context: CoroutineContext?,
Expand All @@ -40,15 +41,18 @@ internal class DebugCoroutineInfoImpl(
* Can be CREATED, RUNNING, SUSPENDED.
*/
public val state: String get() = _state
@Volatile
private var _state: String = CREATED

@JvmField
@Volatile
internal var lastObservedThread: Thread? = null

/**
* We cannot keep a strong reference to the last observed frame of the coroutine, because this will
* prevent garbage-collection of a coroutine that was lost.
*/
@Volatile
private var _lastObservedFrame: WeakReference<CoroutineStackFrame>? = null
internal var lastObservedFrame: CoroutineStackFrame?
get() = _lastObservedFrame?.get()
Expand Down
58 changes: 24 additions & 34 deletions kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,23 @@ internal object DebugProbesImpl {
private val capturedCoroutinesMap = ConcurrentWeakMap<CoroutineOwner<*>, Boolean>()
private val capturedCoroutines: Set<CoroutineOwner<*>> get() = capturedCoroutinesMap.keys

@Volatile
private var installations = 0
private val installations = atomic(0)

/**
* This internal method is used by IDEA debugger under the JVM name of
* "isInstalled$kotlinx_coroutines_debug".
*/
internal val isInstalled: Boolean get() = installations > 0
internal val isInstalled: Boolean get() = installations.value > 0

// To sort coroutines by creation order, used as unique id
private val sequenceNumber = atomic(0L)
/*
* RW-lock that guards all debug probes state changes.
* All individual coroutine state transitions are guarded by read-lock
* and do not interfere with each other.
* All state reads are guarded by the write lock to guarantee a strongly-consistent
* snapshot of the system.
*/
private val coroutineStateLock = ReentrantReadWriteLock()

public var sanitizeStackTraces: Boolean = true
public var enableCreationStackTraces: Boolean = true

/*
* Substitute for service loader, DI between core and debug modules.
* If the agent was installed via command line -javaagent parameter, do not use byte-byddy to avoud
* If the agent was installed via command line -javaagent parameter, do not use byte-buddy to avoid dynamic attach.
*/
private val dynamicAttach = getDynamicAttach()

Expand All @@ -77,16 +68,16 @@ internal object DebugProbesImpl {
*/
private val callerInfoCache = ConcurrentWeakMap<CoroutineStackFrame, DebugCoroutineInfoImpl>(weakRefQueue = true)

public fun install(): Unit = coroutineStateLock.write {
if (++installations > 1) return
fun install() {
if (installations.incrementAndGet() > 1) return
startWeakRefCleanerThread()
if (AgentInstallationType.isInstalledStatically) return
dynamicAttach?.invoke(true) // attach
}

public fun uninstall(): Unit = coroutineStateLock.write {
fun uninstall() {
check(isInstalled) { "Agent was not installed" }
if (--installations != 0) return
if (installations.decrementAndGet() != 0) return
stopWeakRefCleanerThread()
capturedCoroutinesMap.clear()
callerInfoCache.clear()
Expand All @@ -107,7 +98,7 @@ internal object DebugProbesImpl {
thread.join()
}

public fun hierarchyToString(job: Job): String = coroutineStateLock.write {
fun hierarchyToString(job: Job): String {
check(isInstalled) { "Debug probes are not installed" }
val jobToStack = capturedCoroutines
.filter { it.delegate.context[Job] != null }
Expand Down Expand Up @@ -149,20 +140,19 @@ internal object DebugProbesImpl {
* Private method that dumps coroutines so that different public-facing method can use
* to produce different result types.
*/
private inline fun <R : Any> dumpCoroutinesInfoImpl(crossinline create: (CoroutineOwner<*>, CoroutineContext) -> R): List<R> =
coroutineStateLock.write {
check(isInstalled) { "Debug probes are not installed" }
capturedCoroutines
.asSequence()
// Stable ordering of coroutines by their sequence number
.sortedBy { it.info.sequenceNumber }
// Leave in the dump only the coroutines that were not collected while we were dumping them
.mapNotNull { owner ->
// Fuse map and filter into one operation to save an inline
if (owner.isFinished()) null
else owner.info.context?.let { context -> create(owner, context) }
}.toList()
}
private inline fun <R : Any> dumpCoroutinesInfoImpl(crossinline create: (CoroutineOwner<*>, CoroutineContext) -> R): List<R> {
check(isInstalled) { "Debug probes are not installed" }
return capturedCoroutines
.asSequence()
// Stable ordering of coroutines by their sequence number
.sortedBy { it.info.sequenceNumber }
// Leave in the dump only the coroutines that were not collected while we were dumping them
.mapNotNull { owner ->
// Fuse map and filter into one operation to save an inline
if (owner.isFinished()) null
else owner.info.context?.let { context -> create(owner, context) }
}.toList()
}

/*
* This method optimises the number of packages sent by the IDEA debugger
Expand Down Expand Up @@ -280,7 +270,7 @@ internal object DebugProbesImpl {
return true
}

private fun dumpCoroutinesSynchronized(out: PrintStream): Unit = coroutineStateLock.write {
private fun dumpCoroutinesSynchronized(out: PrintStream) {
check(isInstalled) { "Debug probes are not installed" }
out.print("Coroutines dump ${dateFormat.format(System.currentTimeMillis())}")
capturedCoroutines
Expand Down Expand Up @@ -441,7 +431,7 @@ internal object DebugProbesImpl {
}

// See comment to callerInfoCache
private fun updateRunningState(frame: CoroutineStackFrame, state: String): Unit = coroutineStateLock.read {
private fun updateRunningState(frame: CoroutineStackFrame, state: String) {
if (!isInstalled) return
// Lookup coroutine info in cache or by traversing stack frame
val info: DebugCoroutineInfoImpl
Expand All @@ -466,7 +456,7 @@ internal object DebugProbesImpl {
return if (caller.getStackTraceElement() != null) caller else caller.realCaller()
}

private fun updateState(owner: CoroutineOwner<*>, frame: Continuation<*>, state: String) = coroutineStateLock.read {
private fun updateState(owner: CoroutineOwner<*>, frame: Continuation<*>, state: String) {
if (!isInstalled) return
owner.info.updateState(state, frame)
}
Expand Down
15 changes: 11 additions & 4 deletions kotlinx-coroutines-debug/src/DebugProbes.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,20 @@ import kotlin.coroutines.*
* asynchronous stack-traces and coroutine dumps (similar to [ThreadMXBean.dumpAllThreads] and `jstack` via [DebugProbes.dumpCoroutines].
* All introspecting methods throw [IllegalStateException] if debug probes were not installed.
*
* Installed hooks:
* ### Consistency guarantees
*
* All snapshotting operations (e.g. [dumpCoroutines]) are *weakly-consistent*, meaning that they happen
* concurrently with coroutines progressing their own state. These operations are guaranteed to observe
* each coroutine's state exactly once, but the state is not guaranteed to be the most recent before the operation.
* In practice, it means that for snapshotting operations in progress, for each concurrent coroutine either
* the state prior to the operation or the state that was reached during the current operation is observed.
*
* ### Installed hooks
* * `probeCoroutineResumed` is invoked on every [Continuation.resume].
* * `probeCoroutineSuspended` is invoked on every continuation suspension.
* * `probeCoroutineCreated` is invoked on every coroutine creation using stdlib intrinsics.
* * `probeCoroutineCreated` is invoked on every coroutine creation.
*
* Overhead:
* ### Overhead
* * Every created coroutine is stored in a concurrent hash map and hash map is looked up and
* updated on each suspension and resumption.
* * If [DebugProbes.enableCreationStackTraces] is enabled, stack trace of the current thread is captured on
Expand Down Expand Up @@ -118,7 +125,7 @@ public object DebugProbes {
printJob(scope.coroutineContext[Job] ?: error("Job is not present in the scope"), out)

/**
* Returns all existing coroutines info.
* Returns all existing coroutines' info.
* The resulting collection represents a consistent snapshot of all existing coroutines at the moment of invocation.
*/
public fun dumpCoroutinesInfo(): List<CoroutineInfo> = DebugProbesImpl.dumpCoroutinesInfo().map { CoroutineInfo(it) }
Expand Down