diff --git a/instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/v3_1/ContextPropagationOperator.java b/instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/v3_1/ContextPropagationOperator.java index a683f61af3e2..faefac0f1861 100644 --- a/instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/v3_1/ContextPropagationOperator.java +++ b/instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/v3_1/ContextPropagationOperator.java @@ -155,7 +155,8 @@ public void resetOnEachOperator() { private static Function, ? extends Publisher> tracingLift( ReactorAsyncOperationEndStrategy asyncOperationEndStrategy) { - return Operators.lift(new Lifter<>(asyncOperationEndStrategy)); + return Operators.lift( + ContextPropagationOperator::shouldInstrument, new Lifter<>(asyncOperationEndStrategy)); } /** Forces Mono to run in traceContext scope. */ @@ -220,7 +221,12 @@ public reactor.util.context.Context apply(reactor.util.context.Context context) } } - public static class Lifter + private static boolean shouldInstrument(Scannable publisher) { + // skip if Flux/Mono #just, #empty, #error + return !(publisher instanceof Fuseable.ScalarCallable); + } + + private static class Lifter implements BiFunction, CoreSubscriber> { /** Holds reference to strategy to prevent it from being collected. */ @@ -233,10 +239,6 @@ public Lifter(ReactorAsyncOperationEndStrategy asyncOperationEndStrategy) { @Override public CoreSubscriber apply(Scannable publisher, CoreSubscriber sub) { - // if Flux/Mono #just, #empty, #error - if (publisher instanceof Fuseable.ScalarCallable) { - return sub; - } return new TracingSubscriber<>(sub, sub.currentContext()); } } diff --git a/instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/v3_1/HooksTest.java b/instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/v3_1/HooksTest.java index ba73ee18df3c..43256891396d 100644 --- a/instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/v3_1/HooksTest.java +++ b/instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/v3_1/HooksTest.java @@ -7,10 +7,14 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; import reactor.core.CoreSubscriber; +import reactor.core.Disposable; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; class HooksTest { @@ -43,4 +47,28 @@ public void subscribe(CoreSubscriber actual) { subscriber.set(actual); } } + + @Test + void testInvalidBlockUsage() throws InterruptedException { + ContextPropagationOperator operator = ContextPropagationOperator.create(); + operator.registerOnEachOperator(); + + Callable callable = + () -> { + Mono.just("test1").block(); + return "call1"; + }; + + Disposable disposable = + Mono.defer( + () -> + Mono.fromCallable(callable).publishOn(Schedulers.elastic()).flatMap(Mono::just)) + .subscribeOn(Schedulers.single()) + .subscribe(); + + TimeUnit.MILLISECONDS.sleep(100); + + disposable.dispose(); + operator.resetOnEachOperator(); + } }