-
Notifications
You must be signed in to change notification settings - Fork 879
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kotlinx-coroutines-reactor context propagation #5196
Conversation
fc9dfdb
to
d7e910a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this something that would only affect agent users and not library users
import net.bytebuddy.description.type.TypeDescription; | ||
import net.bytebuddy.matcher.ElementMatcher; | ||
|
||
public class KotlinCoroutinesFluxInstrumentation implements TypeInstrumentation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This propagates context when the flux is created as opposed to when it is subscribed to, which I believe differs from our general pattern in reactor / rxjava. Is there anything about kotlin coroutines that would mean we should not follow that pattern?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hopefully fixed now. I moved the instrumentation so that it should now be called from reactive flow and propagate the context that is active there into coroutine.
For library users context propagation is possible using kotlin coroutine library instrumentation and reactor ContextPropagationOperator
val mono = mono(dispatcher) {
// extract context from reactor and propagate it into coroutine
val reactorContext = coroutineContext[ReactorContext.Key]?.context
val otelContext = ContextPropagationOperator.getOpenTelemetryContext(reactorContext, Context.current())
withContext(otelContext.asContextElement()) {
tracedChild("child")
}
}
ContextPropagationOperator.runWithContext(mono, Context.current()).awaitSingle()
It's always hard to context switch back into reactive land, but I think our general thought has been for patterns like this one Where a flux is returned to a framework that subscribes to it, we're supposed to instrument the framework (spring) rather than flux creation, the latter not being applicable to all cases (for example a cached flux used in several different requests). |
6e8bdd4
to
4cfedc4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current agent instrumentation was written by me without knowing kotlin coroutines well :)
Wondering if we should change the current instrumentation to instead instrument newCoroutineContext
?
https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/jvm/src/Builders.kt#L49
https://github.com/Kotlin/kotlinx.coroutines/blob/master/reactive/kotlinx-coroutines-reactor/src/Mono.kt#L87
@laurit Any thoughts on #5196 (review) ? Was hoping to instrument the lowest level building block to not have to instrument every type of coroutines library they build on it |
@anuraaga seems to work nicely. I removed everything else and just kept instrumentation for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the iterations!
return addOpenTelemetryContext(coroutineContext, current); | ||
} | ||
|
||
public static CoroutineContext addOpenTelemetryContext( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be missing it but looks like we can revert this file? Don't think I see this used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, reverted
* kotlinx-coroutines-reactor context propagation * extract context from reactor * add generics * muzzle * actually use the context extracted from reactor * test context propagation operator * typo * used named instead of namedOneOf * instrument newCoroutineContext, remove reactor specific code * revert changes
Resolves #5154