From 9259ce828adb6aeeef84977edf5bea31f6f74326 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Wed, 24 Jan 2024 09:11:47 +0200 Subject: [PATCH] Add context propagation for rector schedulers (#10311) --- .../v3_1/ContextPropagationOperator.java | 47 +++++++++++++++++++ .../v5_0/server/SpringWebfluxTest.java | 10 +++- .../src/test/java/server/TestController.java | 5 ++ ...pringWebfluxServerInstrumentationTest.java | 19 ++++++++ .../v5_3/TestWebfluxSpringBootApp.java | 8 +++- 5 files changed, 87 insertions(+), 2 deletions(-) diff --git a/instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/v3_1/ContextPropagationOperator.java b/instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/v3_1/ContextPropagationOperator.java index faefac0f1861..41e50f4f4ebd 100644 --- a/instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/v3_1/ContextPropagationOperator.java +++ b/instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/v3_1/ContextPropagationOperator.java @@ -31,6 +31,8 @@ import java.lang.invoke.MethodHandles; import java.util.function.BiFunction; import java.util.function.Function; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.Nullable; import org.reactivestreams.Publisher; import reactor.core.CoreSubscriber; @@ -40,9 +42,11 @@ import reactor.core.publisher.Hooks; import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; +import reactor.core.scheduler.Schedulers; /** Based on Spring Sleuth's Reactor instrumentation. */ public final class ContextPropagationOperator { + private static final Logger logger = Logger.getLogger(ContextPropagationOperator.class.getName()); private static final Object VALUE = new Object(); @@ -52,6 +56,8 @@ public final class ContextPropagationOperator { @Nullable private static final MethodHandle FLUX_CONTEXT_WRITE_METHOD = getContextWriteMethod(Flux.class); + @Nullable private static final MethodHandle SCHEDULERS_HOOK_METHOD = getSchedulersHookMethod(); + @Nullable private static MethodHandle getContextWriteMethod(Class type) { MethodHandles.Lookup lookup = MethodHandles.publicLookup(); @@ -68,6 +74,18 @@ private static MethodHandle getContextWriteMethod(Class type) { return null; } + @Nullable + private static MethodHandle getSchedulersHookMethod() { + MethodHandles.Lookup lookup = MethodHandles.publicLookup(); + try { + return lookup.findStatic( + Schedulers.class, "onScheduleHook", methodType(void.class, String.class, Function.class)); + } catch (NoSuchMethodException | IllegalAccessException e) { + // ignore + } + return null; + } + public static ContextPropagationOperator create() { return builder().build(); } @@ -137,10 +155,22 @@ public void registerOnEachOperator() { Hooks.onEachOperator( TracingSubscriber.class.getName(), tracingLift(asyncOperationEndStrategy)); AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy); + registerScheduleHook(RunnableWrapper.class.getName(), RunnableWrapper::new); enabled = true; } } + private static void registerScheduleHook(String key, Function function) { + if (SCHEDULERS_HOOK_METHOD == null) { + return; + } + try { + SCHEDULERS_HOOK_METHOD.invoke(key, function); + } catch (Throwable throwable) { + logger.log(Level.WARNING, "Failed to install scheduler hook", throwable); + } + } + /** Unregisters the hook registered by {@link #registerOnEachOperator()}. */ public void resetOnEachOperator() { synchronized (lock) { @@ -312,4 +342,21 @@ public Object scanUnsafe(Scannable.Attr attr) { return null; } } + + private static class RunnableWrapper implements Runnable { + private final Runnable delegate; + private final Context context; + + RunnableWrapper(Runnable delegate) { + this.delegate = delegate; + context = Context.current(); + } + + @Override + public void run() { + try (Scope ignore = context.makeCurrent()) { + delegate.run(); + } + } + } } diff --git a/instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/v5_0/server/SpringWebfluxTest.java b/instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/v5_0/server/SpringWebfluxTest.java index 4e7536996e3b..1a9951cf8da8 100644 --- a/instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/v5_0/server/SpringWebfluxTest.java +++ b/instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/v5_0/server/SpringWebfluxTest.java @@ -206,7 +206,15 @@ private static Stream provideParameters() { "/foo-delayed", "/foo-delayed", "getFooDelayed", - new FooModel(3L, "delayed").toString())))); + new FooModel(3L, "delayed").toString()))), + Arguments.of( + named( + "annotation API without parameters no mono", + new Parameter( + "/foo-no-mono", + "/foo-no-mono", + "getFooModelNoMono", + new FooModel(0L, "DEFAULT").toString())))); } @ParameterizedTest(name = "{index}: {0}") diff --git a/instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/test/java/server/TestController.java b/instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/test/java/server/TestController.java index 54cef12f3821..50476def96f4 100644 --- a/instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/test/java/server/TestController.java +++ b/instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/test/java/server/TestController.java @@ -63,6 +63,11 @@ Mono getFooDelayedMono(@PathVariable("id") long id) { return Mono.just(id).delayElement(Duration.ofMillis(100)).map(TestController::tracedMethod); } + @GetMapping("/foo-no-mono") + FooModel getFooModelNoMono() { + return new FooModel(0L, "DEFAULT"); + } + private static FooModel tracedMethod(long id) { tracer.spanBuilder("tracedMethod").startSpan().end(); return new FooModel(id, "tracedMethod"); diff --git a/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/test/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/SpringWebfluxServerInstrumentationTest.java b/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/test/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/SpringWebfluxServerInstrumentationTest.java index 73cb904f9545..54a71116a581 100644 --- a/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/test/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/SpringWebfluxServerInstrumentationTest.java +++ b/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/test/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/SpringWebfluxServerInstrumentationTest.java @@ -5,11 +5,17 @@ package io.opentelemetry.instrumentation.spring.webflux.v5_3; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS; +import static org.assertj.core.api.Assertions.assertThat; + import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerTest; import io.opentelemetry.instrumentation.testing.junit.http.HttpServerInstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.http.HttpServerTestOptions; import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint; +import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpRequest; +import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpResponse; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.springframework.context.ConfigurableApplicationContext; @@ -47,4 +53,17 @@ protected void configure(HttpServerTestOptions options) { options.disableTestNonStandardHttpMethod(); } + + @Test + void noMono() { + ServerEndpoint endpoint = new ServerEndpoint("NO_MONO", "no-mono", 200, "success"); + String method = "GET"; + AggregatedHttpRequest request = request(endpoint, method); + AggregatedHttpResponse response = client.execute(request).aggregate().join(); + + assertThat(response.status().code()).isEqualTo(SUCCESS.getStatus()); + assertThat(response.contentUtf8()).isEqualTo(SUCCESS.getBody()); + + assertTheTraces(1, null, null, null, method, endpoint); + } } diff --git a/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/test/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/TestWebfluxSpringBootApp.java b/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/test/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/TestWebfluxSpringBootApp.java index 717bba794b1d..c0eb9a65d91e 100644 --- a/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/test/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/TestWebfluxSpringBootApp.java +++ b/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/test/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/TestWebfluxSpringBootApp.java @@ -57,7 +57,7 @@ WebFilter telemetryFilter() { .setCapturedServerResponseHeaders( singletonList(AbstractHttpServerTest.TEST_RESPONSE_HEADER)) .build() - .createWebFilter(); + .createWebFilterAndRegisterReactorHook(); } @Controller @@ -69,6 +69,12 @@ Flux success() { return Flux.defer(() -> Flux.just(controller(SUCCESS, SUCCESS::getBody))); } + @RequestMapping("/no-mono") + @ResponseBody + String noMono() { + return controller(SUCCESS, SUCCESS::getBody); + } + @RequestMapping("/query") @ResponseBody Mono query_param(@RequestParam("some") String param) {