diff --git a/instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ContextPropagationOperator.java b/instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ContextPropagationOperator.java index ea07b522011a..608b666e87f7 100644 --- a/instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ContextPropagationOperator.java +++ b/instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ContextPropagationOperator.java @@ -27,6 +27,7 @@ import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategies; import java.util.function.BiFunction; import java.util.function.Function; +import javax.annotation.Nullable; import org.reactivestreams.Publisher; import reactor.core.CoreSubscriber; import reactor.core.Fuseable; @@ -39,6 +40,8 @@ /** Based on Spring Sleuth's Reactor instrumentation. */ public final class ContextPropagationOperator { + private static final Object VALUE = new Object(); + public static ContextPropagationOperator create() { return builder().build(); } @@ -98,14 +101,20 @@ public static Context getOpenTelemetryContext( * reactive stream. This should generally be called in a static initializer block in your * application. */ - public void registerOnEachOperator() { + public synchronized void registerOnEachOperator() { + if (enabled) { + return; + } Hooks.onEachOperator(TracingSubscriber.class.getName(), tracingLift(asyncOperationEndStrategy)); AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy); enabled = true; } /** Unregisters the hook registered by {@link #registerOnEachOperator()}. */ - public void resetOnEachOperator() { + public synchronized void resetOnEachOperator() { + if (!enabled) { + return; + } Hooks.resetOnEachOperator(TracingSubscriber.class.getName()); AsyncOperationEndStrategies.instance().unregisterStrategy(asyncOperationEndStrategy); enabled = false; @@ -125,8 +134,7 @@ public static Mono runWithContext(Mono publisher, Context tracingConte // this hack forces 'publisher' to run in the onNext callback of `TracingSubscriber` // (created for this publisher) and with current() span that refers to span created here // without the hack, publisher runs in the onAssembly stage, before traceContext is made current - return ScalarPropagatingMono.INSTANCE - .flatMap(i -> publisher) + return ScalarPropagatingMono.create(publisher) .subscriberContext(ctx -> storeOpenTelemetryContext(ctx, tracingContext)); } @@ -139,8 +147,7 @@ public static Flux runWithContext(Flux publisher, Context tracingConte // this hack forces 'publisher' to run in the onNext callback of `TracingSubscriber` // (created for this publisher) and with current() span that refers to span created here // without the hack, publisher runs in the onAssembly stage, before traceContext is made current - return ScalarPropagatingFlux.INSTANCE - .flatMap(i -> publisher) + return ScalarPropagatingFlux.create(publisher) .subscriberContext(ctx -> storeOpenTelemetryContext(ctx, tracingContext)); } @@ -177,29 +184,61 @@ static void subscribeInActiveSpan(CoreSubscriber actual, Object } } - static class ScalarPropagatingMono extends Mono { - public static final Mono INSTANCE = new ScalarPropagatingMono(); + static class ScalarPropagatingMono extends Mono implements Scannable { - private final Object value = new Object(); + static Mono create(Mono source) { + return new ScalarPropagatingMono(source).flatMap(unused -> source); + } - private ScalarPropagatingMono() {} + private final Mono source; + + private ScalarPropagatingMono(Mono source) { + this.source = source; + } @Override public void subscribe(CoreSubscriber actual) { - subscribeInActiveSpan(actual, value); + subscribeInActiveSpan(actual, VALUE); + } + + @Override + @Nullable + // Interface method doesn't have type parameter so we can't add it either. + @SuppressWarnings("rawtypes") + public Object scanUnsafe(Attr attr) { + if (attr == Attr.PARENT) { + return source; + } + return null; } } - static class ScalarPropagatingFlux extends Flux { - public static final Flux INSTANCE = new ScalarPropagatingFlux(); + static class ScalarPropagatingFlux extends Flux implements Scannable { - private final Object value = new Object(); + static Flux create(Flux source) { + return new ScalarPropagatingFlux(source).flatMap(unused -> source); + } - private ScalarPropagatingFlux() {} + private final Flux source; + + private ScalarPropagatingFlux(Flux source) { + this.source = source; + } @Override public void subscribe(CoreSubscriber actual) { - subscribeInActiveSpan(actual, value); + subscribeInActiveSpan(actual, VALUE); + } + + @Override + @Nullable + // Interface method doesn't have type parameter so we can't add it either. + @SuppressWarnings("rawtypes") + public Object scanUnsafe(Scannable.Attr attr) { + if (attr == Scannable.Attr.PARENT) { + return source; + } + return null; } } } diff --git a/instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.java b/instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.java index cf5da0779015..aefefb572356 100644 --- a/instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.java +++ b/instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.java @@ -18,8 +18,10 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import reactor.core.Scannable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.UnicastProcessor; class ReactorCoreTest extends AbstractReactorCoreTest { @@ -69,17 +71,16 @@ void monoInNonBlockingPublisherAssembly() { @Test void fluxInNonBlockingPublisherAssembly() { + Flux source = + Flux.defer( + () -> { + Span.current().setAttribute("inner", "foo"); + return Flux.just(5, 6); + }); testing.runWithSpan( "parent", () -> - ContextPropagationOperator.ScalarPropagatingFlux.INSTANCE - .flatMap( - unused -> - Flux.defer( - () -> { - Span.current().setAttribute("inner", "foo"); - return Flux.just(5, 6); - })) + ContextPropagationOperator.ScalarPropagatingFlux.create(source) .doOnEach( signal -> { if (signal.isOnError()) { @@ -199,9 +200,37 @@ void noTracingBeforeRegistration() { trace -> trace.hasSpansSatisfyingExactly(span -> span.hasName("after").hasNoParent())); } + @Test + void monoParentsAccessible() { + UnicastProcessor source = UnicastProcessor.create(); + Mono mono = + ContextPropagationOperator.runWithContext(source.singleOrEmpty(), Context.root()); + + source.onNext("foo"); + source.onComplete(); + + assertThat(mono.block()).isEqualTo("foo"); + + assertThat(((Scannable) mono).parents().filter(UnicastProcessor.class::isInstance).findFirst()) + .isPresent(); + } + + @Test + void fluxParentsAccessible() { + UnicastProcessor source = UnicastProcessor.create(); + Flux flux = ContextPropagationOperator.runWithContext(source, Context.root()); + + source.onNext("foo"); + source.onComplete(); + + assertThat(flux.collectList().block()).containsExactly("foo"); + + assertThat(((Scannable) flux).parents().filter(UnicastProcessor.class::isInstance).findFirst()) + .isPresent(); + } + private Mono monoSpan(Mono mono, String spanName) { - return ContextPropagationOperator.ScalarPropagatingMono.INSTANCE - .flatMap(unused -> mono) + return ContextPropagationOperator.ScalarPropagatingMono.create(mono) .doOnEach( signal -> { if (signal.isOnError()) {