From fd5ee0cb2662855aab4cd197ad628cbfb11aa5b6 Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Mon, 23 Aug 2021 16:31:06 -0400 Subject: [PATCH] Add checkpoint and multiple subscriber options --- .../reactor-3.1/javaagent/build.gradle.kts | 2 +- .../reactor/HooksInstrumentation.java | 10 ++- .../reactor/InstrumentedOperator.java | 67 +++++-------------- .../reactor/InstrumentedSubscriber.java | 8 +-- .../ReactorAsyncOperationEndStrategy.java | 66 ++++++++++++++---- ...actorAsyncOperationEndStrategyBuilder.java | 18 ++++- .../reactor/ReactorAsyncOperationOptions.java | 33 +++++++++ .../reactor/TracingOperator.java | 7 +- .../reactor/TracingOperatorBuilder.java | 21 ++++-- ...eactorAsyncOperationEndStrategyTest.groovy | 7 +- 10 files changed, 155 insertions(+), 84 deletions(-) create mode 100644 instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationOptions.java diff --git a/instrumentation/reactor-3.1/javaagent/build.gradle.kts b/instrumentation/reactor-3.1/javaagent/build.gradle.kts index 980f3ee128d5..ff9d2a3bc524 100644 --- a/instrumentation/reactor-3.1/javaagent/build.gradle.kts +++ b/instrumentation/reactor-3.1/javaagent/build.gradle.kts @@ -13,7 +13,7 @@ muzzle { tasks.withType().configureEach { // TODO run tests both with and without experimental span attributes - jvmArgs("-Dotel.instrumentation.reactor.experimental-span-attributes=true") + jvmArgs("-Dotel.instrumentation.reactor.experimental-span-attributes=true", "-Dotel.instrumentation.reactor.trace-multiple-subscribers=true", "-Dotel.instrumentation.reactor.emit-checkpoints=true") } dependencies { diff --git a/instrumentation/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/HooksInstrumentation.java b/instrumentation/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/HooksInstrumentation.java index 624818ef7abf..71ed5c8b7401 100644 --- a/instrumentation/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/HooksInstrumentation.java +++ b/instrumentation/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/HooksInstrumentation.java @@ -34,10 +34,16 @@ public static class ResetOnEachOperatorAdvice { @Advice.OnMethodExit(suppress = Throwable.class) public static void postStaticInitializer() { + Config config = Config.get(); TracingOperator.newBuilder() .setCaptureExperimentalSpanAttributes( - Config.get() - .getBoolean("otel.instrumentation.reactor.experimental-span-attributes", false)) + config.getBooleanProperty( + "otel.instrumentation.reactor.experimental-span-attributes", false)) + .setEmitCheckpoints( + config.getBooleanProperty("otel.instrumentation.reactor.emit-checkpoints", false)) + .setTraceMultipleSubscribers( + config.getBooleanProperty( + "otel.instrumentation.reactor.trace-multiple-subscribers", false)) .build() .registerOnEachOperator(); } diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/InstrumentedOperator.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/InstrumentedOperator.java index 300fa77f48f4..d0d405aafefb 100644 --- a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/InstrumentedOperator.java +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/InstrumentedOperator.java @@ -13,8 +13,6 @@ import org.reactivestreams.Publisher; import reactor.core.CoreSubscriber; import reactor.core.Scannable; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; final class InstrumentedOperator @@ -24,46 +22,19 @@ final class InstrumentedOperator private final Context context; private final REQUEST request; private final Class responseType; - private final boolean captureExperimentalSpanAttributes; + private final ReactorAsyncOperationOptions options; private final AtomicBoolean firstSubscriber = new AtomicBoolean(true); - static Mono transformMono( - Mono mono, - Instrumenter instrumenter, - Context context, - REQUEST request, - Class responseType, - boolean captureExperimentalSpanAttributes) { - - return mono.transform( - InstrumentedOperator.tracingLift( - instrumenter, context, request, responseType, captureExperimentalSpanAttributes)); - } - - static Flux transformFlux( - Flux flux, - Instrumenter instrumenter, - Context context, - REQUEST request, - Class responseType, - boolean captureExperimentalSpanAttributes) { - - return flux.transform( - InstrumentedOperator.tracingLift( - instrumenter, context, request, responseType, captureExperimentalSpanAttributes)); - } - - private static - Function, ? extends Publisher> tracingLift( + static + Function, ? extends Publisher> instrumentedLift( Instrumenter instrumenter, Context context, REQUEST request, Class responseType, - boolean captureExperimentalSpanAttributes) { + ReactorAsyncOperationOptions options) { return Operators.lift( - new InstrumentedOperator<>( - instrumenter, context, request, responseType, captureExperimentalSpanAttributes)); + new InstrumentedOperator<>(instrumenter, context, request, responseType, options)); } private InstrumentedOperator( @@ -71,12 +42,12 @@ private InstrumentedOperator( Context context, REQUEST request, Class responseType, - boolean captureExperimentalSpanAttributes) { + ReactorAsyncOperationOptions options) { this.instrumenter = instrumenter; this.context = context; this.request = request; this.responseType = responseType; - this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + this.options = options; } @Override @@ -85,24 +56,16 @@ public CoreSubscriber apply( if (isFirstSubscriber()) { return new InstrumentedSubscriber<>( - instrumenter, - context, - request, - responseType, - captureExperimentalSpanAttributes, - coreSubscriber); + instrumenter, context, request, responseType, options, coreSubscriber); } - Context parentContext = Context.current(); - if (instrumenter.shouldStart(parentContext, request)) { - Context context = instrumenter.start(parentContext, request); - return new InstrumentedSubscriber<>( - instrumenter, - context, - request, - responseType, - captureExperimentalSpanAttributes, - coreSubscriber); + if (options.traceMultipleSubscribers()) { + Context parentContext = Context.current(); + if (instrumenter.shouldStart(parentContext, request)) { + Context context = instrumenter.start(parentContext, request); + return new InstrumentedSubscriber<>( + instrumenter, context, request, responseType, options, coreSubscriber); + } } return coreSubscriber; } diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/InstrumentedSubscriber.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/InstrumentedSubscriber.java index f2984ae3b27d..717dd8e88b9e 100644 --- a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/InstrumentedSubscriber.java +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/InstrumentedSubscriber.java @@ -25,7 +25,7 @@ final class InstrumentedSubscriber private final Context context; private final REQUEST request; private final Class responseType; - private final boolean captureExperimentalSpanAttributes; + private final ReactorAsyncOperationOptions options; private final CoreSubscriber actual; private Subscription subscription; private T value; @@ -35,14 +35,14 @@ final class InstrumentedSubscriber Context context, REQUEST request, Class responseType, - boolean captureExperimentalSpanAttributes, + ReactorAsyncOperationOptions options, CoreSubscriber actual) { this.instrumenter = instrumenter; this.context = context; this.request = request; this.responseType = responseType; - this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + this.options = options; this.actual = actual; } @@ -64,7 +64,7 @@ public void request(long count) { @Override public void cancel() { if (subscription != null) { - if (captureExperimentalSpanAttributes) { + if (options.captureExperimentalSpanAttributes()) { Span.fromContext(context).setAttribute(CANCELED_ATTRIBUTE_KEY, true); } instrumenter.end(context, request, null, null); diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategy.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategy.java index fed948a12144..3186d57570fc 100644 --- a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategy.java +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategy.java @@ -7,9 +7,12 @@ import static io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndSupport.tryToGetResponse; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategy; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import java.util.Objects; +import java.util.function.BiFunction; import org.reactivestreams.Publisher; import reactor.core.Fuseable; import reactor.core.publisher.Flux; @@ -24,10 +27,10 @@ public static ReactorAsyncOperationEndStrategyBuilder newBuilder() { return new ReactorAsyncOperationEndStrategyBuilder(); } - private final boolean captureExperimentalSpanAttributes; + private final ReactorAsyncOperationOptions options; - ReactorAsyncOperationEndStrategy(boolean captureExperimentalSpanAttributes) { - this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + ReactorAsyncOperationEndStrategy(ReactorAsyncOperationOptions options) { + this.options = options; } @Override @@ -43,30 +46,65 @@ public Object end( Object asyncValue, Class responseType) { - if (tryEndSynchronously(instrumenter, context, request, asyncValue, responseType)) { - return asyncValue; - } - if (asyncValue instanceof Mono) { Mono mono = (Mono) asyncValue; - return InstrumentedOperator.transformMono( - mono, instrumenter, context, request, responseType, captureExperimentalSpanAttributes); + return instrumentMono(mono, instrumenter, context, request, responseType); } else { Flux flux = Flux.from((Publisher) asyncValue); - return InstrumentedOperator.transformFlux( - flux, instrumenter, context, request, responseType, captureExperimentalSpanAttributes); + return instrumentFlux(flux, instrumenter, context, request, responseType); } } + private Mono instrumentMono( + Mono mono, + Instrumenter instrumenter, + Context context, + REQUEST request, + Class responseType) { + Mono withCheckpoint = checkpoint(mono, context, Mono::checkpoint); + if (tryEndSynchronously(mono, instrumenter, context, request, responseType)) { + return withCheckpoint; + } + return withCheckpoint.transform( + InstrumentedOperator.instrumentedLift( + instrumenter, context, request, responseType, options)); + } + + private Flux instrumentFlux( + Flux flux, + Instrumenter instrumenter, + Context context, + REQUEST request, + Class responseType) { + Flux withCheckpoint = checkpoint(flux, context, Flux::checkpoint); + if (tryEndSynchronously(flux, instrumenter, context, request, responseType)) { + return withCheckpoint; + } + return withCheckpoint.transform( + InstrumentedOperator.instrumentedLift( + instrumenter, context, request, responseType, options)); + } + + private > P checkpoint( + P publisher, Context context, BiFunction checkpoint) { + if (options.emitCheckpoints()) { + Span currentSpan = Span.fromContextOrNull(context); + if (currentSpan != null) { + return checkpoint.apply(publisher, Objects.toString(currentSpan)); + } + } + return publisher; + } + private static boolean tryEndSynchronously( + Publisher publisher, Instrumenter instrumenter, Context context, REQUEST request, - Object asyncValue, Class responseType) { - if (asyncValue instanceof Fuseable.ScalarCallable) { - Fuseable.ScalarCallable scalarCallable = (Fuseable.ScalarCallable) asyncValue; + if (publisher instanceof Fuseable.ScalarCallable) { + Fuseable.ScalarCallable scalarCallable = (Fuseable.ScalarCallable) publisher; try { Object result = scalarCallable.call(); instrumenter.end(context, request, tryToGetResponse(responseType, result), null); diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategyBuilder.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategyBuilder.java index 2fc91e0372e3..9bab9fc36bec 100644 --- a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategyBuilder.java +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategyBuilder.java @@ -7,6 +7,8 @@ public final class ReactorAsyncOperationEndStrategyBuilder { private boolean captureExperimentalSpanAttributes; + private boolean emitCheckpoints; + private boolean traceMultipleSubscribers; ReactorAsyncOperationEndStrategyBuilder() {} @@ -16,7 +18,21 @@ public ReactorAsyncOperationEndStrategyBuilder setCaptureExperimentalSpanAttribu return this; } + public ReactorAsyncOperationEndStrategyBuilder setEmitCheckpoints(boolean emitCheckpoints) { + this.emitCheckpoints = emitCheckpoints; + return this; + } + + public ReactorAsyncOperationEndStrategyBuilder setTraceMultipleSubscribers( + boolean traceMultipleSubscribers) { + this.traceMultipleSubscribers = traceMultipleSubscribers; + return this; + } + public ReactorAsyncOperationEndStrategy build() { - return new ReactorAsyncOperationEndStrategy(captureExperimentalSpanAttributes); + ReactorAsyncOperationOptions options = + new ReactorAsyncOperationOptions( + captureExperimentalSpanAttributes, emitCheckpoints, traceMultipleSubscribers); + return new ReactorAsyncOperationEndStrategy(options); } } diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationOptions.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationOptions.java new file mode 100644 index 000000000000..95fa9856c283 --- /dev/null +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationOptions.java @@ -0,0 +1,33 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.reactor; + +public final class ReactorAsyncOperationOptions { + private final boolean captureExperimentalSpanAttributes; + private final boolean emitCheckpoints; + private final boolean traceMultipleSubscribers; + + ReactorAsyncOperationOptions( + boolean captureExperimentalSpanAttributes, + boolean emitCheckpoint, + boolean traceMultipleSubscribers) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + this.emitCheckpoints = emitCheckpoint; + this.traceMultipleSubscribers = traceMultipleSubscribers; + } + + public boolean captureExperimentalSpanAttributes() { + return captureExperimentalSpanAttributes; + } + + public boolean emitCheckpoints() { + return emitCheckpoints; + } + + public boolean traceMultipleSubscribers() { + return traceMultipleSubscribers; + } +} diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java index cedbadae8173..1c50e9ab84f5 100644 --- a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java @@ -46,11 +46,8 @@ public static TracingOperatorBuilder newBuilder() { private final ReactorAsyncOperationEndStrategy asyncOperationEndStrategy; - TracingOperator(boolean captureExperimentalSpanAttributes) { - this.asyncOperationEndStrategy = - ReactorAsyncOperationEndStrategy.newBuilder() - .setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes) - .build(); + TracingOperator(ReactorAsyncOperationEndStrategy asyncOperationEndStrategy) { + this.asyncOperationEndStrategy = asyncOperationEndStrategy; } /** diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperatorBuilder.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperatorBuilder.java index 99889cfa3cc2..8f8cab153aab 100644 --- a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperatorBuilder.java +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperatorBuilder.java @@ -6,17 +6,30 @@ package io.opentelemetry.instrumentation.reactor; public final class TracingOperatorBuilder { - private boolean captureExperimentalSpanAttributes; + private final ReactorAsyncOperationEndStrategyBuilder asyncOperationEndStrategyBuilder; - TracingOperatorBuilder() {} + TracingOperatorBuilder() { + asyncOperationEndStrategyBuilder = ReactorAsyncOperationEndStrategy.newBuilder(); + } public TracingOperatorBuilder setCaptureExperimentalSpanAttributes( boolean captureExperimentalSpanAttributes) { - this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + asyncOperationEndStrategyBuilder.setCaptureExperimentalSpanAttributes( + captureExperimentalSpanAttributes); + return this; + } + + public TracingOperatorBuilder setEmitCheckpoints(boolean emitCheckpoints) { + asyncOperationEndStrategyBuilder.setEmitCheckpoints(emitCheckpoints); + return this; + } + + public TracingOperatorBuilder setTraceMultipleSubscribers(boolean traceMultipleSubscribers) { + asyncOperationEndStrategyBuilder.setTraceMultipleSubscribers(traceMultipleSubscribers); return this; } public TracingOperator build() { - return new TracingOperator(captureExperimentalSpanAttributes); + return new TracingOperator(asyncOperationEndStrategyBuilder.build()); } } diff --git a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategyTest.groovy b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategyTest.groovy index 2cb615a64907..55fae0ad307a 100644 --- a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategyTest.groovy +++ b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategyTest.groovy @@ -24,9 +24,14 @@ class ReactorAsyncOperationEndStrategyTest extends Specification { Span span - def underTest = ReactorAsyncOperationEndStrategy.create() + def underTest = ReactorAsyncOperationEndStrategy.newBuilder() + .setEmitCheckpoints(true) + .setTraceMultipleSubscribers(true) + .build() def underTestWithExperimentalAttributes = ReactorAsyncOperationEndStrategy.newBuilder() + .setEmitCheckpoints(true) + .setTraceMultipleSubscribers(true) .setCaptureExperimentalSpanAttributes(true) .build()