Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kotlinx-coroutines-reactor context propagation #5196

Merged
merged 10 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@ muzzle {
versions.set("[1.3.9,)")
}
}

dependencies {
compileOnly("io.opentelemetry:opentelemetry-extension-kotlin")
compileOnly("org.jetbrains.kotlin:kotlin-stdlib-jdk8")

testInstrumentation(project(":instrumentation:reactor:reactor-3.1:javaagent"))

testImplementation("io.opentelemetry:opentelemetry-extension-kotlin")
testImplementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
testImplementation(project(":instrumentation:reactor:reactor-3.1:library"))

// Use first version with flow support since we have tests for it.
testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0")
testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.3.0")
}

tasks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
Expand All @@ -19,40 +18,27 @@
public class KotlinCoroutinesInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("kotlinx.coroutines.BuildersKt");
return named("kotlinx.coroutines.CoroutineContextKt");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
namedOneOf("launch", "launch$default")
named("newCoroutineContext")
.and(takesArgument(1, named("kotlin.coroutines.CoroutineContext"))),
this.getClass().getName() + "$LaunchAdvice");
transformer.applyAdviceToMethod(
namedOneOf("runBlocking", "runBlocking$default")
.and(takesArgument(0, named("kotlin.coroutines.CoroutineContext"))),
this.getClass().getName() + "$RunBlockingAdvice");
this.getClass().getName() + "$ContextAdvice");
}

@SuppressWarnings("unused")
public static class LaunchAdvice {
public static class ContextAdvice {

@Advice.OnMethodEnter
public static void enter(
@Advice.Argument(value = 1, readOnly = false) CoroutineContext coroutineContext) {
coroutineContext =
KotlinCoroutinesInstrumentationHelper.addOpenTelemetryContext(coroutineContext);
}
}

@SuppressWarnings("unused")
public static class RunBlockingAdvice {

@Advice.OnMethodEnter
public static void enter(
@Advice.Argument(value = 0, readOnly = false) CoroutineContext coroutineContext) {
coroutineContext =
KotlinCoroutinesInstrumentationHelper.addOpenTelemetryContext(coroutineContext);
if (coroutineContext != null) {
coroutineContext =
KotlinCoroutinesInstrumentationHelper.addOpenTelemetryContext(coroutineContext);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@ public final class KotlinCoroutinesInstrumentationHelper {

public static CoroutineContext addOpenTelemetryContext(CoroutineContext coroutineContext) {
Context current = Context.current();
return addOpenTelemetryContext(coroutineContext, current);
}

public static CoroutineContext addOpenTelemetryContext(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be missing it but looks like we can revert this file? Don't think I see this used

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, reverted

CoroutineContext coroutineContext, Context otelContext) {
Context inCoroutine = ContextExtensionsKt.getOpenTelemetryContext(coroutineContext);
if (current == inCoroutine) {
if (otelContext == inCoroutine) {
return coroutineContext;
}
return coroutineContext.plus(ContextExtensionsKt.asContextElement(current));
return coroutineContext.plus(ContextExtensionsKt.asContextElement(otelContext));
}

private KotlinCoroutinesInstrumentationHelper() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,87 @@ class KotlinCoroutineInstrumentationTest extends AgentInstrumentationSpecificati
assert seenItersA.equals(expectedIters)
assert seenItersB.equals(expectedIters)
}

def "kotlin traced mono"() {
setup:
KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher)

when:
kotlinTest.tracedMono()

then:
assertTraces(1) {
trace(0, 2) {
span(0) {
name "parent"
attributes {
}
}
span("child") {
childOf span(0)
attributes {
}
}
}
}

where:
dispatcher << dispatchersToTest
}

def "kotlin traced mono with context propagation operator"() {
setup:
KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher)

when:
kotlinTest.tracedMonoContextPropagationOperator()

then:
assertTraces(1) {
trace(0, 2) {
span(0) {
name "parent"
attributes {
}
}
span("child") {
childOf span(0)
attributes {
}
}
}
}

where:
dispatcher << dispatchersToTest
}

def "kotlin traced flux"() {
setup:
KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher)

when:
kotlinTest.tracedFlux()

then:
assertTraces(1) {
trace(0, 4) {
span(0) {
name "parent"
attributes {
}
}
(0..2).each {
span("child_$it") {
childOf span(0)
attributes {
}
}
}
}
}

where:
dispatcher << dispatchersToTest
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.Tracer
import io.opentelemetry.context.Context
import io.opentelemetry.extension.kotlin.asContextElement
import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
Expand All @@ -19,6 +21,11 @@ import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactive.collect
import kotlinx.coroutines.reactor.ReactorContext
import kotlinx.coroutines.reactor.flux
import kotlinx.coroutines.reactor.mono
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.selects.select
import kotlinx.coroutines.withContext
Expand Down Expand Up @@ -125,6 +132,38 @@ class KotlinCoroutineTests(private val dispatcher: CoroutineDispatcher) {
}
}

fun tracedMono(): Unit = runTest {
mono(dispatcher) {
tracedChild("child")
}.awaitSingle()
}

fun tracedMonoContextPropagationOperator(): Unit = runTest {
val currentContext = Context.current()
// clear current context to ensure that ContextPropagationOperator is used for context propagation
withContext(Context.root().asContextElement()) {
val mono = mono(dispatcher) {
// extract context from reactor and propagate it into coroutine
val reactorContext = coroutineContext[ReactorContext.Key]?.context
val otelContext = ContextPropagationOperator.getOpenTelemetryContext(reactorContext, Context.current())
withContext(otelContext.asContextElement()) {
tracedChild("child")
}
}
ContextPropagationOperator.runWithContext(mono, currentContext).awaitSingle()
}
}

fun tracedFlux() = runTest {
flux(dispatcher) {
repeat(3) {
tracedChild("child_$it")
send(it)
}
}.collect {
}
}

fun launchConcurrentSuspendFunctions(numIters: Int) {
runBlocking {
for (i in 0 until numIters) {
Expand Down