Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@Scheduled task instrumentation does not work for Kotlin suspend functions #32165

Closed
pwestlin opened this issue Jan 31, 2024 · 9 comments
Closed
Assignees
Labels
in: core Issues in core modules (aop, beans, core, context, expression) status: superseded An issue that has been superseded by another theme: observability An issue related to observability and tracing

Comments

@pwestlin
Copy link

pwestlin commented Jan 31, 2024

Spring docs for Scheduled tasks instrumentation states:

An Observation is created for each execution of an @Scheduled task.

This function get an automatic observation:

@Scheduled(fixedDelay = 5, timeUnit = TimeUnit.SECONDS)
fun nonSuspendable() {
    logger.info("Not suspendable")
}

but this suspend function does not:

@Scheduled(fixedDelay = 5, timeUnit = TimeUnit.SECONDS)
suspend fun suspendable() {
    logger.info("Suspendable")
}

I use Spring Boot 3.2.2 and I've also tried 3.2.3-SNAPSHOT and 3.3.0-M1.

build.gradle.kts (shortened):

plugins {
    id("org.springframework.boot") version "3.2.2"
    id("io.spring.dependency-management") version "1.1.4"
    kotlin("jvm") version "1.9.22"
    kotlin("plugin.spring") version "1.9.22"
}

java {
    sourceCompatibility = JavaVersion.VERSION_21
}

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-webflux")
    implementation("org.springframework.boot:spring-boot-starter-actuator")
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
    implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")

    implementation("io.micrometer:micrometer-tracing")
    implementation("io.micrometer:micrometer-tracing-bridge-brave")

    testImplementation("org.springframework.boot:spring-boot-starter-test")
    testImplementation("io.projectreactor:reactor-test")
}

Application code:

@SpringBootApplication
@EnableScheduling
class Application

fun main(args: Array<String>) {
    Hooks.enableAutomaticContextPropagation()
    runApplication<Application>(*args)
}

@Service
class SchedulingService {

    private val logger: Logger = LoggerFactory.getLogger(this.javaClass)

    @Scheduled(fixedDelay = 5, timeUnit = TimeUnit.SECONDS)
    fun nonSuspendable() {
        logger.info("Not suspendable")
    }

    @Scheduled(fixedDelay = 5, timeUnit = TimeUnit.SECONDS)
    suspend fun suspendable() {
        logger.info("Suspendable")
    }
}

When I run the application I get the following output:

2024-01-31T07:33:26.925+01:00  INFO [65b9e9b65a4ecd1702feecf2dbdd6be4,02feecf2dbdd6be4] 926648 --- [   scheduling-1] [65b9e9b65a4ecd1702feecf2dbdd6be4-02feecf2dbdd6be4] n.w.s.SchedulingService                  : Not suspendable
2024-01-31T07:33:26.946+01:00  INFO [,] 926648 --- [   scheduling-1] [                                                 ] n.w.s.SchedulingService                  : Suspendable

There we can se that "65b9e9b65a4ecd1702feecf2dbdd6be4,02feecf2dbdd6be4" means that the function "nonSuspendable" gets an observation but "suspendable" doesn't.

Regards Peter

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged or decided on label Jan 31, 2024
@bclozel bclozel added type: bug A general bug theme: observability An issue related to observability and tracing and removed status: waiting-for-triage An issue we've not yet triaged or decided on labels Jan 31, 2024
@bclozel bclozel transferred this issue from spring-projects/spring-boot Jan 31, 2024
@bclozel bclozel added this to the 6.0.17 milestone Jan 31, 2024
@bclozel bclozel self-assigned this Jan 31, 2024
@jhoeller jhoeller added the in: core Issues in core modules (aop, beans, core, context, expression) label Jan 31, 2024
@bclozel bclozel modified the milestones: 6.0.17, 6.1.4 Jan 31, 2024
@github-actions github-actions bot added status: backported An issue that has been backported to maintenance branches and removed for: backport-to-6.0.x labels Jan 31, 2024
@sbrannen sbrannen changed the title @Scheduled tasks instrumentation does not work for Kotlin suspend functions @Scheduled task instrumentation does not work for Kotlin suspend functions Jan 31, 2024
@bclozel bclozel removed this from the 6.1.4 milestone Jan 31, 2024
@bclozel bclozel added status: blocked An issue that's blocked on an external project change and removed status: backported An issue that has been backported to maintenance branches labels Jan 31, 2024
@bclozel bclozel changed the title @Scheduled task instrumentation does not work for Kotlin suspend functions @Scheduled task instrumentation does not work for Kotlin suspend functions Jan 31, 2024
@jhoeller jhoeller added this to the 6.x Backlog milestone Jan 31, 2024
@bclozel
Copy link
Member

bclozel commented Jan 31, 2024

@sdeleuze and I investigated this issue and we've found that this is not a simple Spring Framework issue, but rather a broader problem with Kotlin Coroutines and Observability. This issue explored in micrometer-metrics/tracing#174.

How this works with plain Spring MVC

With Spring MVC, when an observation is created, we can then use it to open a scope around some code execution. The opening of this scope is handled by the tracing infrastructure and sets up the relevant ThreadLocal and MDC values. The logging statements, when executed, have all the information in context and you can see traceId and spanId.

How this works with Reactor Mono and Flux

With Reactor, work can be scheduled on any worker thread so this is not as straightforward.
Reactor is using the Micrometer context-propagation library to propagate the information across contexts.
Here, our instrumentation is putting the current observation in the Reactor ContextView under the ObservationThreadLocalAccessor.KEY key. If the context propagation feature is enabled (typically in Spring Boot with spring.reactor.context-propagation=auto, then Reactor uses the registered context propagation ThreadLocalAccessor to get the information from the ContextView and temporarily restore it as ThreadLocal in the current scope. The ObservationThreadLocalAccessor takes care of opening and closing the observation scope, which sets up the MDC as expected.

With Kotlin Coroutines

In this case, the reactor context is propagated by the org.jetbrains.kotlinx:kotlinx-coroutines-reactor library automatically, but under a specific key. Unlike Reactor's ContextView, keys are not String-based but rather must implement the CoroutineContext.Key interface. This means that you can get the entire Reactor context from the current coroutine context and get values from it.

But unlike Reactor, there is no automatic integration with context propagation, ThreadLocals or the MDC. In fact, it seems Kotlin Coroutines expect users to directly interact with the context to get values, or compose with a CoroutineContext implementation that implements ThreadContextElement to set and restore context values.

I have managed to get this working with a custom function that leverages existing Micrometer infrastructure:

import io.micrometer.core.instrument.kotlin.asContextElement
import io.micrometer.observation.Observation
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor
import kotlinx.coroutines.reactor.ReactorContext
import kotlinx.coroutines.withContext
import org.slf4j.LoggerFactory
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import reactor.util.context.ContextView
import java.util.concurrent.TimeUnit
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext

@Component
class SchedulingService {

    companion object {
        val logger = LoggerFactory.getLogger(SchedulingService::class.java.name)
    }

    @Scheduled(fixedDelay = 5, timeUnit = TimeUnit.SECONDS)
    suspend fun suspendable() {
        withContext(observationContext(coroutineContext)) {
            logger.info("Suspendable")
        }
    }

    fun observationContext(context: CoroutineContext) : CoroutineContext {
        // get the Reactor context from the CoroutineContext
        val contextView = context[ReactorContext]!!.context as ContextView
        // this context contains the current observation under this well known key
        // because the @Scheduled instrumentation contributed it
        val observation = contextView.get(ObservationThreadLocalAccessor.KEY) as Observation
        // we can then use this Micrometer context to wrap the execution
        // the observation scope and MDC will be taken care of
        return observation.observationRegistry.asContextElement()
    }
}

This is by no means the solution we're advertizing and we're not sure how to tackle this problem at this point.
I'm personally not convinced that we should restore ThreadLocals and MDC transparently for developers, as the kotlinx library does not do this in the first place.

I'm leaving this issue opened for now because we might want to revisit the Coroutine to Publisher arrangement in this case, in order to pass a custom CoroutineContext. But this issue should be mainly discussed and tackled with the Micrometer team.

@bclozel
Copy link
Member

bclozel commented Feb 1, 2024

I think I've refined a solution that could be contributed to the context-propagation project.

This ThreadContextElement will help Kotlin apps to lift the ReactorContext from the CoroutineContext if it's there, or just capture the context from the local ThreadLocals:

package io.mircrometer.context

import io.micrometer.context.ContextRegistry
import io.micrometer.context.ContextSnapshot
import io.micrometer.context.ContextSnapshotFactory
import kotlinx.coroutines.ThreadContextElement
import kotlinx.coroutines.reactor.ReactorContext
import reactor.util.context.ContextView
import kotlin.coroutines.AbstractCoroutineContextElement
import kotlin.coroutines.CoroutineContext


class PropagationContextElement(private val context: CoroutineContext) : ThreadContextElement<ContextSnapshot.Scope>,
    AbstractCoroutineContextElement(Key) {


    public companion object Key : CoroutineContext.Key<PropagationContextElement>

    val contextSnapshot: ContextSnapshot
        get() {
            val contextView: ContextView? = context[ReactorContext]?.context
            val contextSnapshotFactory =
                ContextSnapshotFactory.builder().contextRegistry(ContextRegistry.getInstance()).build()
            if (contextView != null) {
                return contextSnapshotFactory.captureFrom(contextView)
            }
            return contextSnapshotFactory.captureAll()
        }

    override fun restoreThreadContext(context: CoroutineContext, scope: ContextSnapshot.Scope) {
        scope.close()
    }

    override fun updateThreadContext(context: CoroutineContext): ContextSnapshot.Scope {
        return contextSnapshot.setThreadLocals()
    }
}

Using it in the application leverages the context-propagation ThreadLocalAccessors as Reactor does and does not need to depend on Micrometer Observation or MDC.

    @Scheduled(fixedDelay = 5, timeUnit = TimeUnit.SECONDS)
    suspend fun suspendable() {
        withContext(PropagationContextElement(coroutineContext)) {
            logger.info("Suspendable")
        }
    }

WDYT @sdeleuze ?

@sdeleuze
Copy link
Contributor

sdeleuze commented Feb 2, 2024

I think I like the direction taken by this proposal as Spring support for Coroutines is tightly linked to the Reactive support, and the scope of this issue is probably wider than scheduling support and could potentially solved what has been discussed for months in micrometer-metrics/tracing#174.

But we should experiment and discuss about where and how this potential feature should be provided and integrated. To make this feature usable, I think we should find a way to configure it automatically.

As proposed by @antechrestos in this comment, I am wondering if we could use this kind of facility to provide that support seamlessly at Spring level.

For example in org.springframework.web.reactive.result.method.InvocableHandlerMethod, should we change:

if (coroutineContext == null) {
    return CoroutinesUtils.invokeSuspendingFunction(method, target, args);
}
else {
    return CoroutinesUtils.invokeSuspendingFunction((CoroutineContext) coroutineContext, method, target, args);
}

To something like:

if (coroutineContext == null) {
    return CoroutinesUtils.invokeSuspendingFunction(PropagationContextElement(Dispatchers.getUnconfined()), method, target, args);
}
else {
    return CoroutinesUtils.invokeSuspendingFunction(PropagationContextElement((CoroutineContext) coroutineContext), method, target, args);
}

and do the same for ScheduledAnnotationReactiveSupport?

IMO that could provide the level of integration people expect from Spring and would be consistent with what we do on Reactive side.

If provided at context-propagation level, we should make sure this will be provided in a dependency we can use in Spring Java code (potentially using defensive classpath check and nest class to ensure this remains optional).

Any thoughts?

@ilya40umov
Copy link
Contributor

ilya40umov commented Mar 2, 2024

@sdeleuze btw, I guess you meant to write something like this (you need to use plus operation to "mix in" the new element):

var context = coroutineContext != null ? (CoroutineContext) coroutineContext : Dispatchers.getUnconfined();
return CoroutinesUtils.invokeSuspendingFunction(context.plus(PropagationContextElement(context)), method, target, args);

Either way, I would totally love having something like PropagationContextElement at our disposal, as right now the only option to propagate the observation (and thus the trace) is via KotlinObservationContextElement from Micrometer Observation library.

However, KotlinObservationContextElement only interacts with ObservationThreadLocalAccessor, and thus does nothing to handle ObservationAwareSpanThreadLocalAccessor from Micrometer Tracing.

If Spring would then add this element to the context in all cases when suspending Kotlin code is called (i.e. WebFlux Controller methods, coRouter, @scheduled etc.) it would hopefully take care of all ThreadLocalAccessors at once.

I am wondering if KotlinObservationContextElement would also work for the cases when a child observation / span etc. is created and needs to be put into the scope. E.g. I'm currently trying to add some more Kotlin-friendly API to Observation in in micrometer-metrics/micrometer#4823, and one of the main use-cases for this API is to be able to create a nested observation (e.g. to have a new span to demarcate a section of the code).


Edit: I realized that this context element may need to be added earlier, as something like CoWebFilter would already require it to be in the context.

And as a side note, looks like there are multiple places in Spring Framework which are all creating "an initial" coroutine context, which right now happens to be just consist of Dispatchers.Unconfined. I am wondering if this should also be abstracted away and all of these places should then just call this abstraction to get an initial context (if none is was created so far in any of the filters etc.).

@bclozel
Copy link
Member

bclozel commented Mar 4, 2024

I have discussed this with the Micrometer team and finding a proper home for PropagationContextElement is not that easy. This class depends on io.micrometer:context-propagation and org.jetbrains.kotlinx:kotlinx-coroutines-reactor (which itself depends on kotlin and io.projectreactor:reactor-core).

  • we can't add that to the context-propagation project, as this would create a dependency cycle with reactor
  • micrometer-core is not a good option either, because you can use this without Micrometer: its main goal is to support context-propagation in Kotlin coroutines and reactor
  • maybe org.jetbrains.kotlinx:kotlinx-coroutines-reactor would be the best option here, but I'm not sure this project is interested in providing context-propagation support.
  • Spring Framework is also an option, but this would significantly restrict the usage of this type: only Spring apps would be supported

@grassehh
Copy link

grassehh commented May 14, 2024

Hello, I seem to have found another context propagation issue when using proxied beans.
For example suppose you have the following controller:

 @GetMapping("test")
 @ResponseStatus(OK
 suspend fun test() {
     CoroutineScope(SupervisorJob()).launch(observationRegistry.asContextElement()) {
         proxiedService.doSomethingSuspend()
     }
 }

And proxiedService bean is a simple service that just makes a WebClient call:

class ProxiedServiceImpl(private val webClient: WebClient) : ProxiedService {
    override suspend fun doSomethingSuspend() {
        webClient.get()
            .uri("https://www.google.fr")
            .awaitExchange {
                println(MDC.getCopyOfContextMap())
            }
    }
}

and it is proxied using AutoProxyCreator:

@Bean
fun autoProxyCreator() = BeanNameAutoProxyCreator().apply {
    setBeanNames("proxiedService")
    setInterceptorNames("dummyInterceptor")
}

@Bean
fun dummyInterceptor(observationRegistry: ObservationRegistry) = MethodInterceptor { invocation -> invocation.proceed() }

Then the MDC context will be empty in the awaitExchange.
To make it work, I need to pass the whole parent coroutine context to the child coroutine instead of just observationRegistry.asContextElement().

If the service is not a proxy, it works fine. Also if the service is not called inside the coroutine, but rather directly inside the controller method, it works with the proxy too.

So I wonder if there is not an issue with Spring AOP with observability.

@ilya40umov
Copy link
Contributor

@grassehh I'm just curious if this is because CoroutineScope(SupervisorJob()).launch() will basically launch a new coroutine detached from the one created for handling of HTTP request, which potentially would mean that handling of HTTP request can finish before the code in the launch block ever has chance to execute (and thus the associated observation is potentially stopped by that point).

Could you try grabbing the job returned by the launch call and calling join() on it?

@grassehh
Copy link

grassehh commented May 16, 2024

@grassehh I'm just curious if this is because CoroutineScope(SupervisorJob()).launch() will basically launch a new coroutine detached from the one created for handling of HTTP request, which potentially would mean that handling of HTTP request can finish before the code in the launch block ever has chance to execute (and thus the associated observation is potentially stopped by that point).

Could you try grabbing the job returned by the launch call and calling join() on it?

Doesn't seem to work. You can checkout my sample here.

The thing is that if you use the nonProxiedService instead of proxiedService and call the /aop route, it works (the MDC context will be preserved).

Alternatively like I said, if you pass the parent coroutineContext to the new coroutine, it works too. Then maybe it makes more sense than just re-transforming the ObservationRegistry as context element, as it is already done in the web filter.

@bclozel
Copy link
Member

bclozel commented Jul 19, 2024

I'm closing this issue for now, in favor of Kotlin/kotlinx.coroutines#4187.
At this point I think we should explore a org.jetbrains.kotlinx:kotlinx-coroutines-reactor integration, as Spring Framework would only serve a fraction of the Kotlin community for this need.

We can reopen this issue if we need to reconsider.

@bclozel bclozel closed this as not planned Won't fix, can't repro, duplicate, stale Jul 19, 2024
@bclozel bclozel added status: superseded An issue that has been superseded by another and removed type: bug A general bug status: blocked An issue that's blocked on an external project change labels Jul 19, 2024
@bclozel bclozel removed this from the 6.2.x milestone Jul 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: core Issues in core modules (aop, beans, core, context, expression) status: superseded An issue that has been superseded by another theme: observability An issue related to observability and tracing
Projects
None yet
Development

No branches or pull requests

7 participants