diff --git a/instrumentation/lettuce/lettuce-5.1/library/src/test/groovy/io/opentelemetry/instrumentation/lettuce/v5_1/LettuceReactiveClientTest.groovy b/instrumentation/lettuce/lettuce-5.1/library/src/test/groovy/io/opentelemetry/instrumentation/lettuce/v5_1/LettuceReactiveClientTest.groovy index 2cfb789cccc1..af26609719d7 100644 --- a/instrumentation/lettuce/lettuce-5.1/library/src/test/groovy/io/opentelemetry/instrumentation/lettuce/v5_1/LettuceReactiveClientTest.groovy +++ b/instrumentation/lettuce/lettuce-5.1/library/src/test/groovy/io/opentelemetry/instrumentation/lettuce/v5_1/LettuceReactiveClientTest.groovy @@ -9,8 +9,12 @@ import io.lettuce.core.RedisClient import io.lettuce.core.resource.ClientResources import io.opentelemetry.instrumentation.reactor.TracingOperator import io.opentelemetry.instrumentation.test.LibraryTestTrait +import spock.lang.Shared class LettuceReactiveClientTest extends AbstractLettuceReactiveClientTest implements LibraryTestTrait { + @Shared + TracingOperator tracingOperator = TracingOperator.create() + @Override RedisClient createClient(String uri) { return RedisClient.create( @@ -21,10 +25,10 @@ class LettuceReactiveClientTest extends AbstractLettuceReactiveClientTest implem } def setupSpec() { - TracingOperator.registerOnEachOperator() + tracingOperator.registerOnEachOperator() } def cleanupSpec() { - TracingOperator.resetOnEachOperator() + tracingOperator.resetOnEachOperator() } } diff --git a/instrumentation/reactor-3.1/javaagent/reactor-3.1-javaagent.gradle b/instrumentation/reactor-3.1/javaagent/reactor-3.1-javaagent.gradle index f4f55e428c3d..9aff3fc6d67a 100644 --- a/instrumentation/reactor-3.1/javaagent/reactor-3.1-javaagent.gradle +++ b/instrumentation/reactor-3.1/javaagent/reactor-3.1-javaagent.gradle @@ -9,6 +9,11 @@ muzzle { } } +tasks.withType(Test).configureEach { + // TODO run tests both with and without experimental span attributes + jvmArgs "-Dotel.instrumentation.reactor.experimental-span-attributes=true" +} + dependencies { implementation project(':instrumentation:reactor-3.1:library') diff --git a/instrumentation/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/HooksInstrumentation.java b/instrumentation/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/HooksInstrumentation.java index 425f6b355279..73d62f973576 100644 --- a/instrumentation/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/HooksInstrumentation.java +++ b/instrumentation/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/HooksInstrumentation.java @@ -8,6 +8,7 @@ import static net.bytebuddy.matcher.ElementMatchers.isTypeInitializer; import static net.bytebuddy.matcher.ElementMatchers.named; +import io.opentelemetry.instrumentation.api.config.Config; import io.opentelemetry.instrumentation.reactor.TracingOperator; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; @@ -31,7 +32,13 @@ public void transform(TypeTransformer transformer) { public static class ResetOnEachOperatorAdvice { @Advice.OnMethodExit(suppress = Throwable.class) public static void postStaticInitializer() { - TracingOperator.registerOnEachOperator(); + TracingOperator.newBuilder() + .setCaptureExperimentalSpanAttributes( + Config.get() + .getBooleanProperty( + "otel.instrumentation.reactor.experimental-span-attributes", false)) + .build() + .registerOnEachOperator(); } } } diff --git a/instrumentation/reactor-3.1/javaagent/src/test/groovy/ReactorWithSpanInstrumentationTest.groovy b/instrumentation/reactor-3.1/javaagent/src/test/groovy/ReactorWithSpanInstrumentationTest.groovy index 7787ff7f8b09..47a6fd3e8c7f 100644 --- a/instrumentation/reactor-3.1/javaagent/src/test/groovy/ReactorWithSpanInstrumentationTest.groovy +++ b/instrumentation/reactor-3.1/javaagent/src/test/groovy/ReactorWithSpanInstrumentationTest.groovy @@ -130,6 +130,35 @@ class ReactorWithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Mono"() { + setup: + def source = UnicastProcessor.create() + def mono = source.singleOrEmpty() + def result = new TracedWithSpan() + .mono(mono) + def verifier = StepVerifier.create(result) + .expectSubscription() + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + verifier.thenCancel().verify() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.mono" + kind SpanKind.INTERNAL + hasNoParent() + attributes { + "reactor.canceled" true + } + } + } + } + } + def "should capture span for already completed Flux"() { setup: def source = Flux.just("Value") @@ -242,4 +271,35 @@ class ReactorWithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } } + + def "should capture span for canceled Flux"() { + setup: + def error = new IllegalArgumentException("Boom") + def source = UnicastProcessor.create() + def result = new TracedWithSpan() + .flux(source) + def verifier = StepVerifier.create(result) + .expectSubscription() + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onError(error) + + verifier.thenCancel().verify() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.flux" + kind SpanKind.INTERNAL + hasNoParent() + attributes { + "reactor.canceled" true + } + } + } + } + } } diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategy.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategy.java index 133a9f2ea672..cd3d1e1e771e 100644 --- a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategy.java +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategy.java @@ -5,6 +5,8 @@ package io.opentelemetry.instrumentation.reactor; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.tracer.BaseTracer; import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategy; @@ -14,8 +16,23 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -public enum ReactorAsyncSpanEndStrategy implements AsyncSpanEndStrategy { - INSTANCE; +public final class ReactorAsyncSpanEndStrategy implements AsyncSpanEndStrategy { + private static final AttributeKey CANCELED_ATTRIBUTE_KEY = + AttributeKey.booleanKey("reactor.canceled"); + + public static ReactorAsyncSpanEndStrategy create() { + return newBuilder().build(); + } + + public static ReactorAsyncSpanEndStrategyBuilder newBuilder() { + return new ReactorAsyncSpanEndStrategyBuilder(); + } + + private final boolean captureExperimentalSpanAttributes; + + ReactorAsyncSpanEndStrategy(boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + } @Override public boolean supports(Class returnType) { @@ -29,10 +46,14 @@ public Object end(BaseTracer tracer, Context context, Object returnValue) { new EndOnFirstNotificationConsumer(tracer, context); if (returnValue instanceof Mono) { Mono mono = (Mono) returnValue; - return mono.doOnError(notificationConsumer).doOnSuccess(notificationConsumer::onSuccess); + return mono.doOnError(notificationConsumer) + .doOnSuccess(notificationConsumer::onSuccess) + .doOnCancel(notificationConsumer::onCancel); } else { Flux flux = Flux.from((Publisher) returnValue); - return flux.doOnError(notificationConsumer).doOnComplete(notificationConsumer); + return flux.doOnError(notificationConsumer) + .doOnComplete(notificationConsumer) + .doOnCancel(notificationConsumer::onCancel); } } @@ -41,7 +62,7 @@ public Object end(BaseTracer tracer, Context context, Object returnValue) { * OnError notifications are received. Multiple notifications can happen anytime multiple * subscribers subscribe to the same publisher. */ - private static final class EndOnFirstNotificationConsumer extends AtomicBoolean + private final class EndOnFirstNotificationConsumer extends AtomicBoolean implements Runnable, Consumer { private final BaseTracer tracer; @@ -57,6 +78,15 @@ public void onSuccess(T ignored) { accept(null); } + public void onCancel() { + if (compareAndSet(false, true)) { + if (captureExperimentalSpanAttributes) { + Span.fromContext(context).setAttribute(CANCELED_ATTRIBUTE_KEY, true); + } + tracer.end(context); + } + } + @Override public void run() { accept(null); diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyBuilder.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyBuilder.java new file mode 100644 index 000000000000..609e5b6487b3 --- /dev/null +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyBuilder.java @@ -0,0 +1,22 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.reactor; + +public final class ReactorAsyncSpanEndStrategyBuilder { + private boolean captureExperimentalSpanAttributes; + + ReactorAsyncSpanEndStrategyBuilder() {} + + public ReactorAsyncSpanEndStrategyBuilder setCaptureExperimentalSpanAttributes( + boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + return this; + } + + public ReactorAsyncSpanEndStrategy build() { + return new ReactorAsyncSpanEndStrategy(captureExperimentalSpanAttributes); + } +} diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java index 5c56652e9876..532db89b7b93 100644 --- a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java @@ -36,21 +36,39 @@ /** Based on Spring Sleuth's Reactor instrumentation. */ public final class TracingOperator { + public static TracingOperator create() { + return newBuilder().build(); + } + + public static TracingOperatorBuilder newBuilder() { + return new TracingOperatorBuilder(); + } + + private final boolean captureExperimentalSpanAttributes; + + TracingOperator(boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + } + /** * Registers a hook that applies to every operator, propagating {@link Context} to downstream * callbacks to ensure spans in the {@link Context} are available throughout the lifetime of a * reactive stream. This should generally be called in a static initializer block in your * application. */ - public static void registerOnEachOperator() { + public void registerOnEachOperator() { Hooks.onEachOperator(TracingSubscriber.class.getName(), tracingLift()); - AsyncSpanEndStrategies.getInstance().registerStrategy(ReactorAsyncSpanEndStrategy.INSTANCE); + AsyncSpanEndStrategies.getInstance() + .registerStrategy( + ReactorAsyncSpanEndStrategy.newBuilder() + .setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes) + .build()); } /** Unregisters the hook registered by {@link #registerOnEachOperator()}. */ - public static void resetOnEachOperator() { + public void resetOnEachOperator() { Hooks.resetOnEachOperator(TracingSubscriber.class.getName()); - AsyncSpanEndStrategies.getInstance().unregisterStrategy(ReactorAsyncSpanEndStrategy.INSTANCE); + AsyncSpanEndStrategies.getInstance().unregisterStrategy(ReactorAsyncSpanEndStrategy.class); } private static Function, ? extends Publisher> tracingLift() { @@ -69,6 +87,4 @@ public CoreSubscriber apply(Scannable publisher, CoreSubscriber(sub, sub.currentContext()); } } - - private TracingOperator() {} } diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperatorBuilder.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperatorBuilder.java new file mode 100644 index 000000000000..99889cfa3cc2 --- /dev/null +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperatorBuilder.java @@ -0,0 +1,22 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.reactor; + +public final class TracingOperatorBuilder { + private boolean captureExperimentalSpanAttributes; + + TracingOperatorBuilder() {} + + public TracingOperatorBuilder setCaptureExperimentalSpanAttributes( + boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + return this; + } + + public TracingOperator build() { + return new TracingOperator(captureExperimentalSpanAttributes); + } +} diff --git a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/HooksTest.groovy b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/HooksTest.groovy index 3cf2af8a5fbe..7760696bec54 100644 --- a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/HooksTest.groovy +++ b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/HooksTest.groovy @@ -14,6 +14,7 @@ class HooksTest extends LibraryInstrumentationSpecification { def "can reset out hooks"() { setup: + def underTest = TracingOperator.create() AtomicReference subscriber = new AtomicReference<>() when: "no hook registered" @@ -23,14 +24,14 @@ class HooksTest extends LibraryInstrumentationSpecification { !(subscriber.get() instanceof TracingSubscriber) when: "hook registered" - TracingOperator.registerOnEachOperator() + underTest.registerOnEachOperator() new CapturingMono(subscriber).map { it + 1 }.subscribe() then: subscriber.get() instanceof TracingSubscriber when: "hook reset" - TracingOperator.resetOnEachOperator() + underTest.resetOnEachOperator() new CapturingMono(subscriber).map { it + 1 }.subscribe() then: diff --git a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyTest.groovy b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyTest.groovy index dadbc2abf8be..b6164fb007df 100644 --- a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyTest.groovy +++ b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyTest.groovy @@ -5,6 +5,7 @@ package io.opentelemetry.instrumentation.reactor +import io.opentelemetry.api.trace.Span import io.opentelemetry.context.Context import io.opentelemetry.instrumentation.api.tracer.BaseTracer import reactor.core.publisher.Flux @@ -18,11 +19,19 @@ class ReactorAsyncSpanEndStrategyTest extends Specification { Context context - def underTest = ReactorAsyncSpanEndStrategy.INSTANCE + Span span + + def underTest = ReactorAsyncSpanEndStrategy.create() + + def underTestWithExperimentalAttributes = ReactorAsyncSpanEndStrategy.newBuilder() + .setCaptureExperimentalSpanAttributes(true) + .build() void setup() { tracer = Mock() context = Mock() + span = Mock() + span.storeInContext(_) >> { callRealMethod() } } static class MonoTest extends ReactorAsyncSpanEndStrategyTest { @@ -131,6 +140,50 @@ class ReactorAsyncSpanEndStrategyTest extends Specification { 1 * tracer.endExceptionally(context, exception) } + def "ends span when cancelled"() { + given: + def source = UnicastProcessor.create() + def mono = source.singleOrEmpty() + def context = span.storeInContext(Context.root()) + + when: + def result = (Mono) underTest.end(tracer, context, mono) + def verifier = StepVerifier.create(result) + .expectSubscription() + + then: + 0 * tracer._ + + when: + verifier.thenCancel().verify() + + then: + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = UnicastProcessor.create() + def mono = source.singleOrEmpty() + def context = span.storeInContext(Context.root()) + + when: + def result = (Mono) underTestWithExperimentalAttributes.end(tracer, context, mono) + def verifier = StepVerifier.create(result) + .expectSubscription() + + then: + 0 * tracer._ + + when: + verifier.thenCancel().verify() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "reactor.canceled" }, true) + } + def "ends span once for multiple subscribers"() { when: @@ -253,6 +306,50 @@ class ReactorAsyncSpanEndStrategyTest extends Specification { 1 * tracer.endExceptionally(context, exception) } + def "ends span when cancelled"() { + given: + def source = UnicastProcessor.create() + def context = span.storeInContext(Context.root()) + + when: + def result = (Flux) underTest.end(tracer, context, source) + def verifier = StepVerifier.create(result) + .expectSubscription() + + then: + 0 * tracer._ + + when: + verifier.thenCancel() + .verify() + + then: + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = UnicastProcessor.create() + def context = span.storeInContext(Context.root()) + + when: + def result = (Flux) underTestWithExperimentalAttributes.end(tracer, context, source) + def verifier = StepVerifier.create(result) + .expectSubscription() + + then: + 0 * tracer._ + + when: + verifier.thenCancel() + .verify() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "reactor.canceled" }, true) + } + def "ends span once for multiple subscribers"() { when: def result = (Flux) underTest.end(tracer, context, Flux.just("Value")) diff --git a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.groovy b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.groovy index 1fb3f8243736..9d7e65acbb71 100644 --- a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.groovy +++ b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.groovy @@ -6,13 +6,17 @@ package io.opentelemetry.instrumentation.reactor import io.opentelemetry.instrumentation.test.LibraryTestTrait +import spock.lang.Shared class ReactorCoreTest extends AbstractReactorCoreTest implements LibraryTestTrait { + @Shared + TracingOperator tracingOperator = TracingOperator.create() + def setupSpec() { - TracingOperator.registerOnEachOperator() + tracingOperator.registerOnEachOperator() } def cleanupSpec() { - TracingOperator.resetOnEachOperator() + tracingOperator.resetOnEachOperator() } } diff --git a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/SubscriptionTest.groovy b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/SubscriptionTest.groovy index 41ad20821707..9bfc899474a6 100644 --- a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/SubscriptionTest.groovy +++ b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/SubscriptionTest.groovy @@ -6,13 +6,17 @@ package io.opentelemetry.instrumentation.reactor import io.opentelemetry.instrumentation.test.LibraryTestTrait +import spock.lang.Shared class SubscriptionTest extends AbstractSubscriptionTest implements LibraryTestTrait { + @Shared + TracingOperator tracingOperator = TracingOperator.create() + def setupSpec() { - TracingOperator.registerOnEachOperator() + tracingOperator.registerOnEachOperator() } def cleanupSpec() { - TracingOperator.resetOnEachOperator() + tracingOperator.resetOnEachOperator() } } diff --git a/instrumentation/rxjava/rxjava-2.0/javaagent/rxjava-2.0-javaagent.gradle b/instrumentation/rxjava/rxjava-2.0/javaagent/rxjava-2.0-javaagent.gradle index ec79e2452bf6..72d76dfbc13b 100644 --- a/instrumentation/rxjava/rxjava-2.0/javaagent/rxjava-2.0-javaagent.gradle +++ b/instrumentation/rxjava/rxjava-2.0/javaagent/rxjava-2.0-javaagent.gradle @@ -9,6 +9,11 @@ muzzle { } } +tasks.withType(Test).configureEach { + // TODO run tests both with and without experimental span attributes + jvmArgs "-Dotel.instrumentation.rxjava.experimental-span-attributes=true" +} + dependencies { library "io.reactivex.rxjava2:rxjava:2.0.6" diff --git a/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2InstrumentationModule.java b/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava2/RxJava2InstrumentationModule.java similarity index 91% rename from instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2InstrumentationModule.java rename to instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava2/RxJava2InstrumentationModule.java index 71e5aae55fad..b36ce0e9f9e6 100644 --- a/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2InstrumentationModule.java +++ b/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava2/RxJava2InstrumentationModule.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.instrumentation.rxjava2; +package io.opentelemetry.javaagent.instrumentation.rxjava2; import com.google.auto.service.AutoService; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; diff --git a/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJavaPluginsInstrumentation.java b/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava2/RxJavaPluginsInstrumentation.java similarity index 95% rename from instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJavaPluginsInstrumentation.java rename to instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava2/RxJavaPluginsInstrumentation.java index 56e393ceb196..50f2a9e78382 100644 --- a/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJavaPluginsInstrumentation.java +++ b/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava2/RxJavaPluginsInstrumentation.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.instrumentation.rxjava2; +package io.opentelemetry.javaagent.instrumentation.rxjava2; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.named; diff --git a/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssemblyActivation.java b/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava2/TracingAssemblyActivation.java similarity index 55% rename from instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssemblyActivation.java rename to instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava2/TracingAssemblyActivation.java index c0fbbe2359a9..1246517241c2 100644 --- a/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssemblyActivation.java +++ b/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava2/TracingAssemblyActivation.java @@ -3,8 +3,10 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.instrumentation.rxjava2; +package io.opentelemetry.javaagent.instrumentation.rxjava2; +import io.opentelemetry.instrumentation.api.config.Config; +import io.opentelemetry.instrumentation.rxjava2.TracingAssembly; import java.util.concurrent.atomic.AtomicBoolean; public final class TracingAssemblyActivation { @@ -19,7 +21,13 @@ protected AtomicBoolean computeValue(Class type) { public static void activate(Class clz) { if (activated.get(clz).compareAndSet(false, true)) { - TracingAssembly.enable(); + TracingAssembly.newBuilder() + .setCaptureExperimentalSpanAttributes( + Config.get() + .getBooleanProperty( + "otel.instrumentation.rxjava.experimental-span-attributes", false)) + .build() + .enable(); } } diff --git a/instrumentation/rxjava/rxjava-2.0/javaagent/src/test/groovy/RxJava2WithSpanInstrumentationTest.groovy b/instrumentation/rxjava/rxjava-2.0/javaagent/src/test/groovy/RxJava2WithSpanInstrumentationTest.groovy index 8e33aa20131f..3f24b3309af2 100644 --- a/instrumentation/rxjava/rxjava-2.0/javaagent/src/test/groovy/RxJava2WithSpanInstrumentationTest.groovy +++ b/instrumentation/rxjava/rxjava-2.0/javaagent/src/test/groovy/RxJava2WithSpanInstrumentationTest.groovy @@ -136,6 +136,35 @@ class RxJava2WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Completable"() { + setup: + def source = CompletableSubject.create() + def observer = new TestObserver() + new TracedWithSpan() + .completable(source) + .subscribe(observer) + observer.assertSubscribed() + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.cancel() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.completable" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + def "should capture span for already completed Maybe"() { setup: def observer = new TestObserver() @@ -271,6 +300,35 @@ class RxJava2WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Maybe"() { + setup: + def source = MaybeSubject. create() + def observer = new TestObserver() + new TracedWithSpan() + .maybe(source) + .subscribe(observer) + observer.assertSubscribed() + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.cancel() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.maybe" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + def "should capture span for already completed Single"() { setup: def observer = new TestObserver() @@ -383,6 +441,35 @@ class RxJava2WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Single"() { + setup: + def source = SingleSubject. create() + def observer = new TestObserver() + new TracedWithSpan() + .single(source) + .subscribe(observer) + observer.assertSubscribed() + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.cancel() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.single" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + def "should capture span for already completed Observable"() { setup: def observer = new TestObserver() @@ -506,6 +593,41 @@ class RxJava2WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Observable"() { + setup: + def source = UnicastSubject. create() + def observer = new TestObserver() + new TracedWithSpan() + .observable(source) + .subscribe(observer) + observer.assertSubscribed() + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.cancel() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.observable" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + def "should capture span for already completed Flowable"() { setup: def observer = new TestSubscriber() @@ -629,6 +751,41 @@ class RxJava2WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Flowable"() { + setup: + def source = UnicastProcessor. create() + def observer = new TestSubscriber() + new TracedWithSpan() + .flowable(source) + .subscribe(observer) + observer.assertSubscribed() + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.dispose() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.flowable" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + def "should capture span for already completed ParallelFlowable"() { setup: def observer = new TestSubscriber() @@ -756,6 +913,42 @@ class RxJava2WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled ParallelFlowable"() { + setup: + def source = UnicastProcessor. create() + def observer = new TestSubscriber() + new TracedWithSpan() + .parallelFlowable(source.parallel()) + .sequential() + .subscribe(observer) + observer.assertSubscribed() + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.cancel() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.parallelFlowable" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + def "should capture span for eventually completed Publisher"() { setup: def source = new CustomPublisher() @@ -817,6 +1010,35 @@ class RxJava2WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Publisher"() { + setup: + def source = new CustomPublisher() + def observer = new TestSubscriber() + new TracedWithSpan() + .publisher(source) + .subscribe(observer) + observer.assertSubscribed() + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.cancel() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.publisher" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + static class CustomPublisher implements Publisher, Subscription { Subscriber subscriber diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2AsyncSpanEndStrategy.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2AsyncSpanEndStrategy.java index 495b9d888814..38701b5402e1 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2AsyncSpanEndStrategy.java +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2AsyncSpanEndStrategy.java @@ -5,6 +5,8 @@ package io.opentelemetry.instrumentation.rxjava2; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.tracer.BaseTracer; import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategy; @@ -20,8 +22,23 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.reactivestreams.Publisher; -public enum RxJava2AsyncSpanEndStrategy implements AsyncSpanEndStrategy { - INSTANCE; +public final class RxJava2AsyncSpanEndStrategy implements AsyncSpanEndStrategy { + private static final AttributeKey CANCELED_ATTRIBUTE_KEY = + AttributeKey.booleanKey("rxjava.canceled"); + + public static RxJava2AsyncSpanEndStrategy create() { + return newBuilder().build(); + } + + public static RxJava2AsyncSpanEndStrategyBuilder newBuilder() { + return new RxJava2AsyncSpanEndStrategyBuilder(); + } + + private final boolean captureExperimentalSpanAttributes; + + RxJava2AsyncSpanEndStrategy(boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + } @Override public boolean supports(Class returnType) { @@ -55,7 +72,9 @@ public Object end(BaseTracer tracer, Context context, Object returnValue) { private static Completable endWhenComplete( Completable completable, EndOnFirstNotificationConsumer notificationConsumer) { - return completable.doOnEvent(notificationConsumer); + return completable + .doOnEvent(notificationConsumer) + .doOnDispose(notificationConsumer::onCancelOrDispose); } private static Maybe endWhenMaybeComplete( @@ -63,7 +82,7 @@ private static Maybe endWhenMaybeComplete( @SuppressWarnings("unchecked") EndOnFirstNotificationConsumer typedConsumer = (EndOnFirstNotificationConsumer) notificationConsumer; - return maybe.doOnEvent(typedConsumer); + return maybe.doOnEvent(typedConsumer).doOnDispose(notificationConsumer::onCancelOrDispose); } private static Single endWhenSingleComplete( @@ -71,25 +90,32 @@ private static Single endWhenSingleComplete( @SuppressWarnings("unchecked") EndOnFirstNotificationConsumer typedConsumer = (EndOnFirstNotificationConsumer) notificationConsumer; - return single.doOnEvent(typedConsumer); + return single.doOnEvent(typedConsumer).doOnDispose(notificationConsumer::onCancelOrDispose); } private static Observable endWhenObservableComplete( Observable observable, EndOnFirstNotificationConsumer notificationConsumer) { - return observable.doOnComplete(notificationConsumer).doOnError(notificationConsumer); + return observable + .doOnComplete(notificationConsumer) + .doOnError(notificationConsumer) + .doOnDispose(notificationConsumer::onCancelOrDispose); } private static ParallelFlowable endWhenFirstComplete( ParallelFlowable parallelFlowable, EndOnFirstNotificationConsumer notificationConsumer) { - return parallelFlowable.doOnComplete(notificationConsumer).doOnError(notificationConsumer); + return parallelFlowable + .doOnComplete(notificationConsumer) + .doOnError(notificationConsumer) + .doOnCancel(notificationConsumer::onCancelOrDispose); } private static Flowable endWhenPublisherComplete( Publisher publisher, EndOnFirstNotificationConsumer notificationConsumer) { return Flowable.fromPublisher(publisher) .doOnComplete(notificationConsumer) - .doOnError(notificationConsumer); + .doOnError(notificationConsumer) + .doOnCancel(notificationConsumer::onCancelOrDispose); } /** @@ -97,7 +123,7 @@ private static Flowable endWhenPublisherComplete( * OnError notifications are received. Multiple notifications can happen anytime multiple * subscribers subscribe to the same publisher. */ - private static final class EndOnFirstNotificationConsumer extends AtomicBoolean + private final class EndOnFirstNotificationConsumer extends AtomicBoolean implements Action, Consumer, BiConsumer { private final BaseTracer tracer; @@ -111,7 +137,14 @@ public EndOnFirstNotificationConsumer(BaseTracer tracer, Context context) { @Override public void run() { + accept(null); + } + + public void onCancelOrDispose() { if (compareAndSet(false, true)) { + if (captureExperimentalSpanAttributes) { + Span.fromContext(context).setAttribute(CANCELED_ATTRIBUTE_KEY, true); + } tracer.end(context); } } diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2AsyncSpanEndStrategyBuilder.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2AsyncSpanEndStrategyBuilder.java new file mode 100644 index 000000000000..b049e6dc5ca5 --- /dev/null +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2AsyncSpanEndStrategyBuilder.java @@ -0,0 +1,23 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rxjava2; + +public final class RxJava2AsyncSpanEndStrategyBuilder { + + private boolean captureExperimentalSpanAttributes; + + RxJava2AsyncSpanEndStrategyBuilder() {} + + public RxJava2AsyncSpanEndStrategyBuilder setCaptureExperimentalSpanAttributes( + boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + return this; + } + + public RxJava2AsyncSpanEndStrategy build() { + return new RxJava2AsyncSpanEndStrategy(captureExperimentalSpanAttributes); + } +} diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssembly.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssembly.java index c7d40c5c7622..d9323bfec6ef 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssembly.java @@ -96,50 +96,66 @@ public final class TracingAssembly { @GuardedBy("TracingAssembly.class") private static boolean enabled; - private TracingAssembly() {} + public static TracingAssembly create() { + return newBuilder().build(); + } - public static synchronized void enable() { - if (enabled) { - return; - } + public static TracingAssemblyBuilder newBuilder() { + return new TracingAssemblyBuilder(); + } - enableObservable(); + private final boolean captureExperimentalSpanAttributes; - enableCompletable(); + TracingAssembly(boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + } - enableSingle(); + public void enable() { + synchronized (TracingAssembly.class) { + if (enabled) { + return; + } - enableMaybe(); + enableObservable(); - enableFlowable(); + enableCompletable(); - enableParallel(); + enableSingle(); - enableWithSpanStrategy(); + enableMaybe(); - enabled = true; - } + enableFlowable(); - public static synchronized void disable() { - if (!enabled) { - return; + enableParallel(); + + enableWithSpanStrategy(captureExperimentalSpanAttributes); + + enabled = true; } + } - disableObservable(); + public void disable() { + synchronized (TracingAssembly.class) { + if (!enabled) { + return; + } - disableCompletable(); + disableObservable(); - disableSingle(); + disableCompletable(); - disableMaybe(); + disableSingle(); - disableFlowable(); + disableMaybe(); - disableParallel(); + disableFlowable(); - disableWithSpanStrategy(); + disableParallel(); - enabled = false; + disableWithSpanStrategy(); + + enabled = false; + } } @SuppressWarnings({"rawtypes", "unchecked"}) @@ -229,8 +245,12 @@ private static void enableMaybe() { })); } - private static void enableWithSpanStrategy() { - AsyncSpanEndStrategies.getInstance().registerStrategy(RxJava2AsyncSpanEndStrategy.INSTANCE); + private static void enableWithSpanStrategy(boolean captureExperimentalSpanAttributes) { + AsyncSpanEndStrategies.getInstance() + .registerStrategy( + RxJava2AsyncSpanEndStrategy.newBuilder() + .setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes) + .build()); } private static void disableParallel() { @@ -266,7 +286,7 @@ private static void disableMaybe() { } private static void disableWithSpanStrategy() { - AsyncSpanEndStrategies.getInstance().unregisterStrategy(RxJava2AsyncSpanEndStrategy.INSTANCE); + AsyncSpanEndStrategies.getInstance().unregisterStrategy(RxJava2AsyncSpanEndStrategy.class); } private static Function compose( diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssemblyBuilder.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssemblyBuilder.java new file mode 100644 index 000000000000..050d039ff29e --- /dev/null +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssemblyBuilder.java @@ -0,0 +1,22 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rxjava2; + +public final class TracingAssemblyBuilder { + private boolean captureExperimentalSpanAttributes; + + TracingAssemblyBuilder() {} + + public TracingAssemblyBuilder setCaptureExperimentalSpanAttributes( + boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + return this; + } + + public TracingAssembly build() { + return new TracingAssembly(captureExperimentalSpanAttributes); + } +} diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2AsyncSpanEndStrategyTest.groovy b/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2AsyncSpanEndStrategyTest.groovy index b2d2ea711975..711d75805192 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2AsyncSpanEndStrategyTest.groovy +++ b/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2AsyncSpanEndStrategyTest.groovy @@ -3,6 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ +import io.opentelemetry.api.trace.Span import io.opentelemetry.context.Context import io.opentelemetry.instrumentation.api.tracer.BaseTracer import io.opentelemetry.instrumentation.rxjava2.RxJava2AsyncSpanEndStrategy @@ -31,11 +32,19 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { Context context - def underTest = RxJava2AsyncSpanEndStrategy.INSTANCE + Span span + + def underTest = RxJava2AsyncSpanEndStrategy.create() + + def underTestWithExperimentalAttributes = RxJava2AsyncSpanEndStrategy.newBuilder() + .setCaptureExperimentalSpanAttributes(true) + .build() void setup() { tracer = Mock() context = Mock() + span = Mock() + span.storeInContext(_) >> { callRealMethod() } } static class CompletableTest extends RxJava2AsyncSpanEndStrategyTest { @@ -112,6 +121,50 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) } + def "ends span when cancelled"() { + given: + def source = CompletableSubject.create() + def observer = new TestObserver() + def context = span.storeInContext(Context.root()) + + when: + def result = (Completable) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = CompletableSubject.create() + def observer = new TestObserver() + def context = span.storeInContext(Context.root()) + + when: + def result = (Completable) underTestWithExperimentalAttributes.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) + } + def "ends span once for multiple subscribers"() { given: def source = CompletableSubject.create() @@ -246,6 +299,49 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) } + def "ends span when cancelled"() { + given: + def source = MaybeSubject.create() + def observer = new TestObserver() + def context = span.storeInContext(Context.root()) + + when: + def result = (Maybe) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = MaybeSubject.create() + def observer = new TestObserver() + def context = span.storeInContext(Context.root()) + + when: + def result = (Maybe) underTestWithExperimentalAttributes.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) + } + def "ends span once for multiple subscribers"() { given: def source = MaybeSubject.create() @@ -350,6 +446,49 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) } + def "ends span when cancelled"() { + given: + def source = SingleSubject.create() + def observer = new TestObserver() + def context = span.storeInContext(Context.root()) + + when: + def result = (Single) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = SingleSubject.create() + def observer = new TestObserver() + def context = span.storeInContext(Context.root()) + + when: + def result = (Single) underTestWithExperimentalAttributes.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) + } + def "ends span once for multiple subscribers"() { given: def source = SingleSubject.create() @@ -454,6 +593,49 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) } + def "ends span when cancelled"() { + given: + def source = UnicastSubject.create() + def observer = new TestObserver() + def context = span.storeInContext(Context.root()) + + when: + def result = (Observable) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = UnicastSubject.create() + def observer = new TestObserver() + def context = span.storeInContext(Context.root()) + + when: + def result = (Observable) underTestWithExperimentalAttributes.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) + } + def "ends span once for multiple subscribers"() { given: def source = ReplaySubject.create() @@ -555,6 +737,49 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) } + def "ends span when cancelled"() { + given: + def source = UnicastProcessor.create() + def observer = new TestSubscriber() + def context = span.storeInContext(Context.root()) + + when: + def result = (Flowable) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = UnicastProcessor.create() + def observer = new TestSubscriber() + def context = span.storeInContext(Context.root()) + + when: + def result = (Flowable) underTestWithExperimentalAttributes.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) + } + def "ends span once for multiple subscribers"() { given: def source = ReplayProcessor.create() @@ -655,6 +880,49 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) 1 * tracer.endExceptionally(context, exception) } + + def "ends span when cancelled"() { + given: + def source = UnicastProcessor.create() + def observer = new TestSubscriber() + def context = span.storeInContext(Context.root()) + + when: + def result = (ParallelFlowable) underTest.end(tracer, context, source.parallel()) + result.sequential().subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = UnicastProcessor.create() + def observer = new TestSubscriber() + def context = span.storeInContext(Context.root()) + + when: + def result = (ParallelFlowable) underTestWithExperimentalAttributes.end(tracer, context, source.parallel()) + result.sequential().subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) + } } static class PublisherTest extends RxJava2AsyncSpanEndStrategyTest { @@ -703,6 +971,49 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { 1 * tracer.endExceptionally(context, exception) observer.assertError(exception) } + + def "ends span when cancelled"() { + given: + def source = new CustomPublisher() + def observer = new TestSubscriber() + def context = span.storeInContext(Context.root()) + + when: + def result = (Flowable) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = new CustomPublisher() + def observer = new TestSubscriber() + def context = span.storeInContext(Context.root()) + + when: + def result = (Flowable) underTestWithExperimentalAttributes.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) + } } static class CustomPublisher implements Publisher, Subscription { diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2SubscriptionTest.groovy b/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2SubscriptionTest.groovy index 15d2f624c8ae..935e49de1e1e 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2SubscriptionTest.groovy +++ b/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2SubscriptionTest.groovy @@ -6,10 +6,13 @@ import io.opentelemetry.instrumentation.rxjava2.AbstractRxJava2SubscriptionTest import io.opentelemetry.instrumentation.rxjava2.TracingAssembly import io.opentelemetry.instrumentation.test.LibraryTestTrait +import spock.lang.Shared class RxJava2SubscriptionTest extends AbstractRxJava2SubscriptionTest implements LibraryTestTrait { + @Shared + TracingAssembly tracingAssembly = TracingAssembly.create() def setupSpec() { - TracingAssembly.enable() + tracingAssembly.enable() } } diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2Test.groovy b/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2Test.groovy index d379cbce7885..8b9ca49ea48c 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2Test.groovy +++ b/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2Test.groovy @@ -6,10 +6,13 @@ import io.opentelemetry.instrumentation.rxjava2.AbstractRxJava2Test import io.opentelemetry.instrumentation.rxjava2.TracingAssembly import io.opentelemetry.instrumentation.test.LibraryTestTrait +import spock.lang.Shared class RxJava2Test extends AbstractRxJava2Test implements LibraryTestTrait { + @Shared + TracingAssembly tracingAssembly = TracingAssembly.create() def setupSpec() { - TracingAssembly.enable() + tracingAssembly.enable() } } diff --git a/instrumentation/rxjava/rxjava-3.0/javaagent/rxjava-3.0-javaagent.gradle b/instrumentation/rxjava/rxjava-3.0/javaagent/rxjava-3.0-javaagent.gradle index 3c681eb80c3c..f6027d3f54fb 100644 --- a/instrumentation/rxjava/rxjava-3.0/javaagent/rxjava-3.0-javaagent.gradle +++ b/instrumentation/rxjava/rxjava-3.0/javaagent/rxjava-3.0-javaagent.gradle @@ -9,6 +9,11 @@ muzzle { } } +tasks.withType(Test).configureEach { + // TODO run tests both with and without experimental span attributes + jvmArgs "-Dotel.instrumentation.rxjava.experimental-span-attributes=true" +} + dependencies { library "io.reactivex.rxjava3:rxjava:3.0.0" diff --git a/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3InstrumentationModule.java b/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/RxJava3InstrumentationModule.java similarity index 91% rename from instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3InstrumentationModule.java rename to instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/RxJava3InstrumentationModule.java index 892e5eac0d6e..8f2848abd9d7 100644 --- a/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3InstrumentationModule.java +++ b/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/RxJava3InstrumentationModule.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.instrumentation.rxjava3; +package io.opentelemetry.javaagent.instrumentation.rxjava3; import com.google.auto.service.AutoService; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; diff --git a/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJavaPluginsInstrumentation.java b/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/RxJavaPluginsInstrumentation.java similarity index 95% rename from instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJavaPluginsInstrumentation.java rename to instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/RxJavaPluginsInstrumentation.java index 7d48b39cfd50..966b75f2a755 100644 --- a/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJavaPluginsInstrumentation.java +++ b/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/RxJavaPluginsInstrumentation.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.instrumentation.rxjava3; +package io.opentelemetry.javaagent.instrumentation.rxjava3; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.named; diff --git a/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssemblyActivation.java b/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/TracingAssemblyActivation.java similarity index 55% rename from instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssemblyActivation.java rename to instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/TracingAssemblyActivation.java index 92c4937e87d1..52699d9a4975 100644 --- a/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssemblyActivation.java +++ b/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/TracingAssemblyActivation.java @@ -3,8 +3,10 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.instrumentation.rxjava3; +package io.opentelemetry.javaagent.instrumentation.rxjava3; +import io.opentelemetry.instrumentation.api.config.Config; +import io.opentelemetry.instrumentation.rxjava3.TracingAssembly; import java.util.concurrent.atomic.AtomicBoolean; public final class TracingAssemblyActivation { @@ -19,7 +21,13 @@ protected AtomicBoolean computeValue(Class type) { public static void activate(Class clz) { if (activated.get(clz).compareAndSet(false, true)) { - TracingAssembly.enable(); + TracingAssembly.newBuilder() + .setCaptureExperimentalSpanAttributes( + Config.get() + .getBooleanProperty( + "otel.instrumentation.rxjava.experimental-span-attributes", false)) + .build() + .enable(); } } diff --git a/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy b/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy index c94bf8c8af60..c196cbb98f85 100644 --- a/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy +++ b/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy @@ -134,6 +134,34 @@ class RxJava3WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Completable"() { + setup: + def source = CompletableSubject.create() + def observer = new TestObserver() + new TracedWithSpan() + .completable(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.dispose() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.completable" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + def "should capture span for already completed Maybe"() { setup: def observer = new TestObserver() @@ -267,6 +295,34 @@ class RxJava3WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Maybe"() { + setup: + def source = MaybeSubject. create() + def observer = new TestObserver() + new TracedWithSpan() + .maybe(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.dispose() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.maybe" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + def "should capture span for already completed Single"() { setup: def observer = new TestObserver() @@ -377,6 +433,34 @@ class RxJava3WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Single"() { + setup: + def source = SingleSubject. create() + def observer = new TestObserver() + new TracedWithSpan() + .single(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.dispose() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.single" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + def "should capture span for already completed Observable"() { setup: def observer = new TestObserver() @@ -498,6 +582,40 @@ class RxJava3WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Observable"() { + setup: + def source = UnicastSubject. create() + def observer = new TestObserver() + new TracedWithSpan() + .observable(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.dispose() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.observable" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + def "should capture span for already completed Flowable"() { setup: def observer = new TestSubscriber() @@ -619,6 +737,40 @@ class RxJava3WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Flowable"() { + setup: + def source = UnicastProcessor. create() + def observer = new TestSubscriber() + new TracedWithSpan() + .flowable(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.cancel() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.flowable" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + def "should capture span for already completed ParallelFlowable"() { setup: def observer = new TestSubscriber() @@ -744,6 +896,41 @@ class RxJava3WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled ParallelFlowable"() { + setup: + def source = UnicastProcessor. create() + def observer = new TestSubscriber() + new TracedWithSpan() + .parallelFlowable(source.parallel()) + .sequential() + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.cancel() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.parallelFlowable" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + def "should capture span for eventually completed Publisher"() { setup: def source = new CustomPublisher() @@ -803,6 +990,34 @@ class RxJava3WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Publisher"() { + setup: + def source = new CustomPublisher() + def observer = new TestSubscriber() + new TracedWithSpan() + .publisher(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.cancel() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.publisher" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + static class CustomPublisher implements Publisher, Subscription { Subscriber subscriber diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncSpanEndStrategy.java b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncSpanEndStrategy.java index c2aecbc191b3..1b901dc547a1 100644 --- a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncSpanEndStrategy.java +++ b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncSpanEndStrategy.java @@ -5,6 +5,8 @@ package io.opentelemetry.instrumentation.rxjava3; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.tracer.BaseTracer; import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategy; @@ -20,8 +22,23 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.reactivestreams.Publisher; -public enum RxJava3AsyncSpanEndStrategy implements AsyncSpanEndStrategy { - INSTANCE; +public final class RxJava3AsyncSpanEndStrategy implements AsyncSpanEndStrategy { + private static final AttributeKey CANCELED_ATTRIBUTE_KEY = + AttributeKey.booleanKey("rxjava.canceled"); + + public static RxJava3AsyncSpanEndStrategy create() { + return newBuilder().build(); + } + + public static RxJava3AsyncSpanEndStrategyBuilder newBuilder() { + return new RxJava3AsyncSpanEndStrategyBuilder(); + } + + private final boolean captureExperimentalSpanAttributes; + + RxJava3AsyncSpanEndStrategy(boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + } @Override public boolean supports(Class returnType) { @@ -55,7 +72,9 @@ public Object end(BaseTracer tracer, Context context, Object returnValue) { private static Completable endWhenComplete( Completable completable, EndOnFirstNotificationConsumer notificationConsumer) { - return completable.doOnEvent(notificationConsumer); + return completable + .doOnEvent(notificationConsumer) + .doOnDispose(notificationConsumer::onCancelOrDispose); } private static Maybe endWhenMaybeComplete( @@ -63,7 +82,7 @@ private static Maybe endWhenMaybeComplete( @SuppressWarnings("unchecked") EndOnFirstNotificationConsumer typedConsumer = (EndOnFirstNotificationConsumer) notificationConsumer; - return maybe.doOnEvent(typedConsumer); + return maybe.doOnEvent(typedConsumer).doOnDispose(notificationConsumer::onCancelOrDispose); } private static Single endWhenSingleComplete( @@ -71,25 +90,32 @@ private static Single endWhenSingleComplete( @SuppressWarnings("unchecked") EndOnFirstNotificationConsumer typedConsumer = (EndOnFirstNotificationConsumer) notificationConsumer; - return single.doOnEvent(typedConsumer); + return single.doOnEvent(typedConsumer).doOnDispose(notificationConsumer::onCancelOrDispose); } private static Observable endWhenObservableComplete( Observable observable, EndOnFirstNotificationConsumer notificationConsumer) { - return observable.doOnComplete(notificationConsumer).doOnError(notificationConsumer); + return observable + .doOnComplete(notificationConsumer) + .doOnError(notificationConsumer) + .doOnDispose(notificationConsumer::onCancelOrDispose); } private static ParallelFlowable endWhenFirstComplete( ParallelFlowable parallelFlowable, EndOnFirstNotificationConsumer notificationConsumer) { - return parallelFlowable.doOnComplete(notificationConsumer).doOnError(notificationConsumer); + return parallelFlowable + .doOnComplete(notificationConsumer) + .doOnError(notificationConsumer) + .doOnCancel(notificationConsumer::onCancelOrDispose); } private static Flowable endWhenPublisherComplete( Publisher publisher, EndOnFirstNotificationConsumer notificationConsumer) { return Flowable.fromPublisher(publisher) .doOnComplete(notificationConsumer) - .doOnError(notificationConsumer); + .doOnError(notificationConsumer) + .doOnCancel(notificationConsumer::onCancelOrDispose); } /** @@ -97,7 +123,7 @@ private static Flowable endWhenPublisherComplete( * OnError notifications are received. Multiple notifications can happen anytime multiple * subscribers subscribe to the same publisher. */ - private static final class EndOnFirstNotificationConsumer extends AtomicBoolean + private final class EndOnFirstNotificationConsumer extends AtomicBoolean implements Action, Consumer, BiConsumer { private final BaseTracer tracer; @@ -109,13 +135,20 @@ public EndOnFirstNotificationConsumer(BaseTracer tracer, Context context) { this.context = context; } - @Override - public void run() { + public void onCancelOrDispose() { if (compareAndSet(false, true)) { + if (captureExperimentalSpanAttributes) { + Span.fromContext(context).setAttribute(CANCELED_ATTRIBUTE_KEY, true); + } tracer.end(context); } } + @Override + public void run() { + accept(null); + } + @Override public void accept(Throwable exception) { if (compareAndSet(false, true)) { diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncSpanEndStrategyBuilder.java b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncSpanEndStrategyBuilder.java new file mode 100644 index 000000000000..04ac6cf2a21b --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncSpanEndStrategyBuilder.java @@ -0,0 +1,23 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rxjava3; + +public final class RxJava3AsyncSpanEndStrategyBuilder { + + private boolean captureExperimentalSpanAttributes; + + RxJava3AsyncSpanEndStrategyBuilder() {} + + public RxJava3AsyncSpanEndStrategyBuilder setCaptureExperimentalSpanAttributes( + boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + return this; + } + + public RxJava3AsyncSpanEndStrategy build() { + return new RxJava3AsyncSpanEndStrategy(captureExperimentalSpanAttributes); + } +} diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssembly.java b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssembly.java index eb400a629784..796ec4824914 100644 --- a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssembly.java @@ -96,50 +96,66 @@ public final class TracingAssembly { @GuardedBy("TracingAssembly.class") private static boolean enabled; - private TracingAssembly() {} + public static TracingAssembly create() { + return newBuilder().build(); + } - public static synchronized void enable() { - if (enabled) { - return; - } + public static TracingAssemblyBuilder newBuilder() { + return new TracingAssemblyBuilder(); + } - enableObservable(); + private final boolean captureExperimentalSpanAttributes; - enableCompletable(); + TracingAssembly(boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + } - enableSingle(); + public void enable() { + synchronized (TracingAssembly.class) { + if (enabled) { + return; + } - enableMaybe(); + enableObservable(); - enableFlowable(); + enableCompletable(); - enableParallel(); + enableSingle(); - enableWithSpanStrategy(); + enableMaybe(); - enabled = true; - } + enableFlowable(); - public static synchronized void disable() { - if (!enabled) { - return; + enableParallel(); + + enableWithSpanStrategy(captureExperimentalSpanAttributes); + + enabled = true; } + } - disableObservable(); + public void disable() { + synchronized (TracingAssembly.class) { + if (!enabled) { + return; + } - disableCompletable(); + disableObservable(); - disableSingle(); + disableCompletable(); - disableMaybe(); + disableSingle(); - disableFlowable(); + disableMaybe(); - disableParallel(); + disableFlowable(); - disableWithSpanStrategy(); + disableParallel(); - enabled = false; + disableWithSpanStrategy(); + + enabled = false; + } } @SuppressWarnings({"rawtypes", "unchecked"}) @@ -227,8 +243,12 @@ private static void enableMaybe() { })); } - private static void enableWithSpanStrategy() { - AsyncSpanEndStrategies.getInstance().registerStrategy(RxJava3AsyncSpanEndStrategy.INSTANCE); + private static void enableWithSpanStrategy(boolean captureExperimentalSpanAttributes) { + AsyncSpanEndStrategies.getInstance() + .registerStrategy( + RxJava3AsyncSpanEndStrategy.newBuilder() + .setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes) + .build()); } private static void disableParallel() { @@ -264,7 +284,7 @@ private static void disableMaybe() { } private static void disableWithSpanStrategy() { - AsyncSpanEndStrategies.getInstance().unregisterStrategy(RxJava3AsyncSpanEndStrategy.INSTANCE); + AsyncSpanEndStrategies.getInstance().unregisterStrategy(RxJava3AsyncSpanEndStrategy.class); } private static Function compose( diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssemblyBuilder.java b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssemblyBuilder.java new file mode 100644 index 000000000000..c5321a7b5f39 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssemblyBuilder.java @@ -0,0 +1,22 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rxjava3; + +public final class TracingAssemblyBuilder { + private boolean captureExperimentalSpanAttributes; + + TracingAssemblyBuilder() {} + + public TracingAssemblyBuilder setCaptureExperimentalSpanAttributes( + boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + return this; + } + + public TracingAssembly build() { + return new TracingAssembly(captureExperimentalSpanAttributes); + } +} diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3AsyncSpanEndStrategyTest.groovy b/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3AsyncSpanEndStrategyTest.groovy index 3df4f6cb8a10..0e8956a3da06 100644 --- a/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3AsyncSpanEndStrategyTest.groovy +++ b/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3AsyncSpanEndStrategyTest.groovy @@ -3,6 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ +import io.opentelemetry.api.trace.Span import io.opentelemetry.context.Context import io.opentelemetry.instrumentation.api.tracer.BaseTracer import io.opentelemetry.instrumentation.rxjava3.RxJava3AsyncSpanEndStrategy @@ -31,11 +32,19 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { Context context - def underTest = RxJava3AsyncSpanEndStrategy.INSTANCE + Span span + + def underTest = RxJava3AsyncSpanEndStrategy.create() + + def underTestWithExperimentalAttributes = RxJava3AsyncSpanEndStrategy.newBuilder() + .setCaptureExperimentalSpanAttributes(true) + .build() void setup() { tracer = Mock() context = Mock() + span = Mock() + span.storeInContext(_) >> { callRealMethod() } } static class CompletableTest extends RxJava3AsyncSpanEndStrategyTest { @@ -112,6 +121,50 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) } + def "ends span when cancelled"() { + given: + def source = CompletableSubject.create() + def observer = new TestObserver() + def context = span.storeInContext(Context.root()) + + when: + def result = (Completable) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.dispose() + + then: + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = CompletableSubject.create() + def observer = new TestObserver() + def context = span.storeInContext(Context.root()) + + when: + def result = (Completable) underTestWithExperimentalAttributes.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.dispose() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) + } + def "ends span once for multiple subscribers"() { given: def source = CompletableSubject.create() @@ -246,6 +299,49 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) } + def "ends span when cancelled"() { + given: + def source = MaybeSubject.create() + def observer = new TestObserver() + def context = span.storeInContext(Context.root()) + + when: + def result = (Maybe) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.dispose() + + then: + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = MaybeSubject.create() + def observer = new TestObserver() + def context = span.storeInContext(Context.root()) + + when: + def result = (Maybe) underTestWithExperimentalAttributes.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.dispose() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) + } + def "ends span once for multiple subscribers"() { given: def source = MaybeSubject.create() @@ -350,6 +446,49 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) } + def "ends span when cancelled"() { + given: + def source = SingleSubject.create() + def observer = new TestObserver() + def context = span.storeInContext(Context.root()) + + when: + def result = (Single) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.dispose() + + then: + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = SingleSubject.create() + def observer = new TestObserver() + def context = span.storeInContext(Context.root()) + + when: + def result = (Single) underTestWithExperimentalAttributes.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.dispose() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) + } + def "ends span once for multiple subscribers"() { given: def source = SingleSubject.create() @@ -454,6 +593,49 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) } + def "ends span when cancelled"() { + given: + def source = UnicastSubject.create() + def observer = new TestObserver() + def context = span.storeInContext(Context.root()) + + when: + def result = (Observable) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.dispose() + + then: + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = UnicastSubject.create() + def observer = new TestObserver() + def context = span.storeInContext(Context.root()) + + when: + def result = (Observable) underTestWithExperimentalAttributes.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.dispose() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) + } + def "ends span once for multiple subscribers"() { given: def source = ReplaySubject.create() @@ -555,6 +737,49 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) } + def "ends span when cancelled"() { + given: + def source = UnicastProcessor.create() + def observer = new TestSubscriber() + def context = span.storeInContext(Context.root()) + + when: + def result = (Flowable) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = UnicastProcessor.create() + def observer = new TestSubscriber() + def context = span.storeInContext(Context.root()) + + when: + def result = (Flowable) underTestWithExperimentalAttributes.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) + } + def "ends span once for multiple subscribers"() { given: def source = ReplayProcessor.create() @@ -655,6 +880,49 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) 1 * tracer.endExceptionally(context, exception) } + + def "ends span when cancelled"() { + given: + def source = UnicastProcessor.create() + def observer = new TestSubscriber() + def context = span.storeInContext(Context.root()) + + when: + def result = (ParallelFlowable) underTest.end(tracer, context, source.parallel()) + result.sequential().subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = UnicastProcessor.create() + def observer = new TestSubscriber() + def context = span.storeInContext(Context.root()) + + when: + def result = (ParallelFlowable) underTestWithExperimentalAttributes.end(tracer, context, source.parallel()) + result.sequential().subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) + } } static class PublisherTest extends RxJava3AsyncSpanEndStrategyTest { @@ -703,6 +971,49 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { 1 * tracer.endExceptionally(context, exception) observer.assertError(exception) } + + def "ends span when cancelled"() { + given: + def source = new CustomPublisher() + def observer = new TestSubscriber() + def context = span.storeInContext(Context.root()) + + when: + def result = (Flowable) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = new CustomPublisher() + def observer = new TestSubscriber() + def context = span.storeInContext(Context.root()) + + when: + def result = (Flowable) underTestWithExperimentalAttributes.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) + } } static class CustomPublisher implements Publisher, Subscription { diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3SubscriptionTest.groovy b/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3SubscriptionTest.groovy index 0351dae24a8c..800f5adfe1ca 100644 --- a/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3SubscriptionTest.groovy +++ b/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3SubscriptionTest.groovy @@ -6,10 +6,13 @@ import io.opentelemetry.instrumentation.rxjava3.AbstractRxJava3SubscriptionTest import io.opentelemetry.instrumentation.rxjava3.TracingAssembly import io.opentelemetry.instrumentation.test.LibraryTestTrait +import spock.lang.Shared class RxJava3SubscriptionTest extends AbstractRxJava3SubscriptionTest implements LibraryTestTrait { + @Shared + TracingAssembly tracingAssembly = TracingAssembly.create() def setupSpec() { - TracingAssembly.enable() + tracingAssembly.enable() } } diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3Test.groovy b/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3Test.groovy index 4bcf431d69a4..24215f993fab 100644 --- a/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3Test.groovy +++ b/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3Test.groovy @@ -6,10 +6,13 @@ import io.opentelemetry.instrumentation.rxjava3.AbstractRxJava3Test import io.opentelemetry.instrumentation.rxjava3.TracingAssembly import io.opentelemetry.instrumentation.test.LibraryTestTrait +import spock.lang.Shared class RxJava3Test extends AbstractRxJava3Test implements LibraryTestTrait { + @Shared + TracingAssembly tracingAssembly = TracingAssembly.create() def setupSpec() { - TracingAssembly.enable() + tracingAssembly.enable() } }