diff --git a/instrumentation/reactor-3.1/javaagent/src/test/groovy/ReactorWithSpanInstrumentationTest.groovy b/instrumentation/reactor-3.1/javaagent/src/test/groovy/ReactorWithSpanInstrumentationTest.groovy index 47a6fd3e8c7f..c1c7187d0ad1 100644 --- a/instrumentation/reactor-3.1/javaagent/src/test/groovy/ReactorWithSpanInstrumentationTest.groovy +++ b/instrumentation/reactor-3.1/javaagent/src/test/groovy/ReactorWithSpanInstrumentationTest.groovy @@ -9,6 +9,7 @@ import io.opentelemetry.instrumentation.reactor.TracedWithSpan import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import reactor.core.publisher.ReplayProcessor import reactor.core.publisher.UnicastProcessor import reactor.test.StepVerifier @@ -70,6 +71,51 @@ class ReactorWithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for eventually completed Mono per subscription"() { + setup: + def source = ReplayProcessor.create() + def mono = source.singleOrEmpty() + def result = new TracedWithSpan() + .mono(mono) + def verifier = StepVerifier.create(result) + .expectSubscription() + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + source.onComplete() + + verifier.expectNext("Value") + .verifyComplete() + + StepVerifier.create(result) + .expectNext("Value") + .verifyComplete() + + assertTraces(2) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.mono" + kind SpanKind.INTERNAL + hasNoParent() + attributes { + } + } + } + trace(1, 1) { + span(0) { + name "TracedWithSpan.mono" + kind SpanKind.INTERNAL + hasNoParent() + attributes { + } + } + } + } + } + def "should capture span for already errored Mono"() { setup: def error = new IllegalArgumentException("Boom") diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/InstrumentedOperator.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/InstrumentedOperator.java new file mode 100644 index 000000000000..300fa77f48f4 --- /dev/null +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/InstrumentedOperator.java @@ -0,0 +1,113 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.reactor; + +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; +import java.util.function.Function; +import org.reactivestreams.Publisher; +import reactor.core.CoreSubscriber; +import reactor.core.Scannable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Operators; + +final class InstrumentedOperator + implements BiFunction, CoreSubscriber> { + + private final Instrumenter instrumenter; + private final Context context; + private final REQUEST request; + private final Class responseType; + private final boolean captureExperimentalSpanAttributes; + private final AtomicBoolean firstSubscriber = new AtomicBoolean(true); + + static Mono transformMono( + Mono mono, + Instrumenter instrumenter, + Context context, + REQUEST request, + Class responseType, + boolean captureExperimentalSpanAttributes) { + + return mono.transform( + InstrumentedOperator.tracingLift( + instrumenter, context, request, responseType, captureExperimentalSpanAttributes)); + } + + static Flux transformFlux( + Flux flux, + Instrumenter instrumenter, + Context context, + REQUEST request, + Class responseType, + boolean captureExperimentalSpanAttributes) { + + return flux.transform( + InstrumentedOperator.tracingLift( + instrumenter, context, request, responseType, captureExperimentalSpanAttributes)); + } + + private static + Function, ? extends Publisher> tracingLift( + Instrumenter instrumenter, + Context context, + REQUEST request, + Class responseType, + boolean captureExperimentalSpanAttributes) { + + return Operators.lift( + new InstrumentedOperator<>( + instrumenter, context, request, responseType, captureExperimentalSpanAttributes)); + } + + private InstrumentedOperator( + Instrumenter instrumenter, + Context context, + REQUEST request, + Class responseType, + boolean captureExperimentalSpanAttributes) { + this.instrumenter = instrumenter; + this.context = context; + this.request = request; + this.responseType = responseType; + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + } + + @Override + public CoreSubscriber apply( + Scannable scannable, CoreSubscriber coreSubscriber) { + + if (isFirstSubscriber()) { + return new InstrumentedSubscriber<>( + instrumenter, + context, + request, + responseType, + captureExperimentalSpanAttributes, + coreSubscriber); + } + + Context parentContext = Context.current(); + if (instrumenter.shouldStart(parentContext, request)) { + Context context = instrumenter.start(parentContext, request); + return new InstrumentedSubscriber<>( + instrumenter, + context, + request, + responseType, + captureExperimentalSpanAttributes, + coreSubscriber); + } + return coreSubscriber; + } + + private boolean isFirstSubscriber() { + return firstSubscriber.compareAndSet(true, false); + } +} diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/InstrumentedSubscriber.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/InstrumentedSubscriber.java new file mode 100644 index 000000000000..f2984ae3b27d --- /dev/null +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/InstrumentedSubscriber.java @@ -0,0 +1,97 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.reactor; + +import static io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndSupport.tryToGetResponse; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Operators; + +final class InstrumentedSubscriber + implements CoreSubscriber, Subscription { + + private static final AttributeKey CANCELED_ATTRIBUTE_KEY = + AttributeKey.booleanKey("reactor.canceled"); + + private final Instrumenter instrumenter; + private final Context context; + private final REQUEST request; + private final Class responseType; + private final boolean captureExperimentalSpanAttributes; + private final CoreSubscriber actual; + private Subscription subscription; + private T value; + + InstrumentedSubscriber( + Instrumenter instrumenter, + Context context, + REQUEST request, + Class responseType, + boolean captureExperimentalSpanAttributes, + CoreSubscriber actual) { + + this.instrumenter = instrumenter; + this.context = context; + this.request = request; + this.responseType = responseType; + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + this.actual = actual; + } + + @Override + public void onSubscribe(Subscription subscription) { + if (Operators.validate(this.subscription, subscription)) { + this.subscription = subscription; + actual.onSubscribe(this); + } + } + + @Override + public void request(long count) { + if (subscription != null) { + subscription.request(count); + } + } + + @Override + public void cancel() { + if (subscription != null) { + if (captureExperimentalSpanAttributes) { + Span.fromContext(context).setAttribute(CANCELED_ATTRIBUTE_KEY, true); + } + instrumenter.end(context, request, null, null); + subscription.cancel(); + } + } + + @Override + public void onNext(T value) { + this.value = value; + actual.onNext(value); + } + + @Override + public void onError(Throwable error) { + instrumenter.end(context, request, null, error); + actual.onError(error); + } + + @Override + public void onComplete() { + instrumenter.end(context, request, tryToGetResponse(responseType, value), null); + actual.onComplete(); + } + + @Override + public reactor.util.context.Context currentContext() { + return actual.currentContext(); + } +} diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategy.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategy.java index 5e2f7aab0d53..fed948a12144 100644 --- a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategy.java +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategy.java @@ -7,21 +7,15 @@ import static io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndSupport.tryToGetResponse; -import io.opentelemetry.api.common.AttributeKey; -import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategy; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import org.reactivestreams.Publisher; +import reactor.core.Fuseable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public final class ReactorAsyncOperationEndStrategy implements AsyncOperationEndStrategy { - private static final AttributeKey CANCELED_ATTRIBUTE_KEY = - AttributeKey.booleanKey("reactor.canceled"); - public static ReactorAsyncOperationEndStrategy create() { return newBuilder().build(); } @@ -49,71 +43,38 @@ public Object end( Object asyncValue, Class responseType) { - EndOnFirstNotificationConsumer notificationConsumer = - new EndOnFirstNotificationConsumer(context) { - @Override - protected void end(Object result, Throwable error) { - instrumenter.end(context, request, tryToGetResponse(responseType, result), error); - } - }; + if (tryEndSynchronously(instrumenter, context, request, asyncValue, responseType)) { + return asyncValue; + } if (asyncValue instanceof Mono) { Mono mono = (Mono) asyncValue; - return mono.doOnError(notificationConsumer) - .doOnSuccess(notificationConsumer::onSuccess) - .doOnCancel(notificationConsumer::onCancel); + return InstrumentedOperator.transformMono( + mono, instrumenter, context, request, responseType, captureExperimentalSpanAttributes); } else { Flux flux = Flux.from((Publisher) asyncValue); - return flux.doOnError(notificationConsumer) - .doOnComplete(notificationConsumer) - .doOnCancel(notificationConsumer::onCancel); + return InstrumentedOperator.transformFlux( + flux, instrumenter, context, request, responseType, captureExperimentalSpanAttributes); } } - /** - * Helper class to ensure that the span is ended exactly once regardless of how many OnComplete or - * OnError notifications are received. Multiple notifications can happen anytime multiple - * subscribers subscribe to the same publisher. - */ - private abstract class EndOnFirstNotificationConsumer extends AtomicBoolean - implements Runnable, Consumer { - - private final Context context; - - protected EndOnFirstNotificationConsumer(Context context) { - super(false); - this.context = context; - } - - public void onSuccess(T result) { - accept(result, null); - } - - public void onCancel() { - if (compareAndSet(false, true)) { - if (captureExperimentalSpanAttributes) { - Span.fromContext(context).setAttribute(CANCELED_ATTRIBUTE_KEY, true); - } - end(null, null); - } - } - - @Override - public void run() { - accept(null, null); - } - - @Override - public void accept(Throwable exception) { - end(null, exception); - } + private static boolean tryEndSynchronously( + Instrumenter instrumenter, + Context context, + REQUEST request, + Object asyncValue, + Class responseType) { - private void accept(Object result, Throwable error) { - if (compareAndSet(false, true)) { - end(result, error); + if (asyncValue instanceof Fuseable.ScalarCallable) { + Fuseable.ScalarCallable scalarCallable = (Fuseable.ScalarCallable) asyncValue; + try { + Object result = scalarCallable.call(); + instrumenter.end(context, request, tryToGetResponse(responseType, result), null); + } catch (Throwable error) { + instrumenter.end(context, request, null, error); } + return true; } - - protected abstract void end(Object result, Throwable error); + return false; } } diff --git a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategyTest.groovy b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategyTest.groovy index 043a01e9c8ae..2cb615a64907 100644 --- a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategyTest.groovy +++ b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategyTest.groovy @@ -43,35 +43,28 @@ class ReactorAsyncOperationEndStrategyTest extends Specification { underTest.supports(Mono) } - def "ends span on already completed"() { + def "ends span synchronously on already completed"() { when: - def result = (Mono) underTest.end(instrumenter, context, request, Mono.just(response), String) - StepVerifier.create(result) - .expectNext(response) - .verifyComplete() + underTest.end(instrumenter, context, request, Mono.just(response), String) then: 1 * instrumenter.end(context, request, response, null) } - def "ends span on already empty"() { + def "ends span synchronously on already empty"() { when: - def result = (Mono) underTest.end(instrumenter, context, request, Mono.empty(), String) - StepVerifier.create(result) - .verifyComplete() + underTest.end(instrumenter, context, request, Mono.empty(), String) then: 1 * instrumenter.end(context, request, null, null) } - def "ends span on already errored"() { + def "ends span synchronously on already errored"() { given: def exception = new IllegalStateException() when: - def result = (Mono) underTest.end(instrumenter, context, request, Mono.error(exception), String) - StepVerifier.create(result) - .verifyErrorMatches({ it == exception }) + underTest.end(instrumenter, context, request, Mono.error(exception), String) then: 1 * instrumenter.end(context, request, null, exception) @@ -187,10 +180,12 @@ class ReactorAsyncOperationEndStrategyTest extends Specification { 1 * span.setAttribute({ it.getKey() == "reactor.canceled" }, true) } - def "ends span once for multiple subscribers"() { + def "ends span once for each subscription"() { + given: + def mono = Mono.defer({Mono.just(response)}) when: - def result = (Mono) underTest.end(instrumenter, context, request, Mono.just(response), String) + def result = (Mono) underTest.end(instrumenter, context, request, mono, String) StepVerifier.create(result) .expectNext(response) .verifyComplete() @@ -202,7 +197,9 @@ class ReactorAsyncOperationEndStrategyTest extends Specification { .verifyComplete() then: - 1 * instrumenter.end(context, request, response, null) + 3 * instrumenter.end(context, request, response, null) + 2 * instrumenter.shouldStart(_, request) >> true + 2 * instrumenter.start(_, request) >> context } } @@ -212,35 +209,28 @@ class ReactorAsyncOperationEndStrategyTest extends Specification { underTest.supports(Flux) } - def "ends span on already completed"() { + def "ends span synchronously on already completed"() { when: - def result = (Flux) underTest.end(instrumenter, context, request, Flux.just(response), String) - StepVerifier.create(result) - .expectNext(response) - .verifyComplete() + underTest.end(instrumenter, context, request, Flux.just(response), String) then: - 1 * instrumenter.end(context, request, null, null) + 1 * instrumenter.end(context, request, response, null) } - def "ends span on already empty"() { + def "ends span synchronously on already empty"() { when: - def result = (Flux) underTest.end(instrumenter, context, request, Flux.empty(), String) - StepVerifier.create(result) - .verifyComplete() + underTest.end(instrumenter, context, request, Flux.empty(), String) then: 1 * instrumenter.end(context, request, null, null) } - def "ends span on already errored"() { + def "ends synchronously span on already errored"() { given: def exception = new IllegalStateException() when: - def result = (Flux) underTest.end(instrumenter, context, request, Flux.error(exception), String) - StepVerifier.create(result) - .verifyErrorMatches({ it == exception }) + underTest.end(instrumenter, context, request, Flux.error(exception), String) then: 1 * instrumenter.end(context, request, null, exception) @@ -265,7 +255,7 @@ class ReactorAsyncOperationEndStrategyTest extends Specification { .verifyComplete() then: - 1 * instrumenter.end(context, request, null, null) + 1 * instrumenter.end(context, request, response, null) } def "ends span when empty"() { @@ -353,9 +343,12 @@ class ReactorAsyncOperationEndStrategyTest extends Specification { 1 * span.setAttribute({ it.getKey() == "reactor.canceled" }, true) } - def "ends span once for multiple subscribers"() { + def "ends span once for each subscription"() { + given: + def flux = Flux.defer({ Flux.just(response) }) + when: - def result = (Flux) underTest.end(instrumenter, context, request, Flux.just(response), String) + def result = (Flux) underTest.end(instrumenter, context, request, flux, String) StepVerifier.create(result) .expectNext(response) .verifyComplete() @@ -367,7 +360,9 @@ class ReactorAsyncOperationEndStrategyTest extends Specification { .verifyComplete() then: - 1 * instrumenter.end(context, request, null, null) + 3 * instrumenter.end(context, request, response, null) + 2 * instrumenter.shouldStart(_, request) >> true + 2 * instrumenter.start(_, request) >> context } } }