Skip to content

Commit

Permalink
Add checkpoint and multiple subscriber options
Browse files Browse the repository at this point in the history
  • Loading branch information
HaloFour committed Sep 1, 2021
1 parent c2dbdb5 commit 6624968
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 84 deletions.
2 changes: 1 addition & 1 deletion instrumentation/reactor-3.1/javaagent/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ muzzle {

tasks.withType<Test>().configureEach {
// TODO run tests both with and without experimental span attributes
jvmArgs("-Dotel.instrumentation.reactor.experimental-span-attributes=true")
jvmArgs("-Dotel.instrumentation.reactor.experimental-span-attributes=true", "-Dotel.instrumentation.reactor.trace-multiple-subscribers=true", "-Dotel.instrumentation.reactor.emit-checkpoints=true")
}

dependencies {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,16 @@ public static class ResetOnEachOperatorAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void postStaticInitializer() {
Config config = Config.get();
TracingOperator.newBuilder()
.setCaptureExperimentalSpanAttributes(
Config.get()
.getBoolean("otel.instrumentation.reactor.experimental-span-attributes", false))
config.getBoolean(
"otel.instrumentation.reactor.experimental-span-attributes", false))
.setEmitCheckpoints(
config.getBoolean("otel.instrumentation.reactor.emit-checkpoints", false))
.setTraceMultipleSubscribers(
config.getBoolean(
"otel.instrumentation.reactor.trace-multiple-subscribers", false))
.build()
.registerOnEachOperator();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;

final class InstrumentedOperator<REQUEST, RESPONSE, T>
Expand All @@ -24,59 +22,32 @@ final class InstrumentedOperator<REQUEST, RESPONSE, T>
private final Context context;
private final REQUEST request;
private final Class<RESPONSE> responseType;
private final boolean captureExperimentalSpanAttributes;
private final ReactorAsyncOperationOptions options;
private final AtomicBoolean firstSubscriber = new AtomicBoolean(true);

static <REQUEST, RESPONSE, T> Mono<T> transformMono(
Mono<T> mono,
Instrumenter<REQUEST, RESPONSE> instrumenter,
Context context,
REQUEST request,
Class<RESPONSE> responseType,
boolean captureExperimentalSpanAttributes) {

return mono.transform(
InstrumentedOperator.<REQUEST, RESPONSE, T>tracingLift(
instrumenter, context, request, responseType, captureExperimentalSpanAttributes));
}

static <REQUEST, RESPONSE, T> Flux<T> transformFlux(
Flux<T> flux,
Instrumenter<REQUEST, RESPONSE> instrumenter,
Context context,
REQUEST request,
Class<RESPONSE> responseType,
boolean captureExperimentalSpanAttributes) {

return flux.transform(
InstrumentedOperator.<REQUEST, RESPONSE, T>tracingLift(
instrumenter, context, request, responseType, captureExperimentalSpanAttributes));
}

private static <REQUEST, RESPONSE, T>
Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift(
static <REQUEST, RESPONSE, T>
Function<? super Publisher<T>, ? extends Publisher<T>> instrumentedLift(
Instrumenter<REQUEST, RESPONSE> instrumenter,
Context context,
REQUEST request,
Class<RESPONSE> responseType,
boolean captureExperimentalSpanAttributes) {
ReactorAsyncOperationOptions options) {

return Operators.lift(
new InstrumentedOperator<>(
instrumenter, context, request, responseType, captureExperimentalSpanAttributes));
new InstrumentedOperator<>(instrumenter, context, request, responseType, options));
}

private InstrumentedOperator(
Instrumenter<REQUEST, RESPONSE> instrumenter,
Context context,
REQUEST request,
Class<RESPONSE> responseType,
boolean captureExperimentalSpanAttributes) {
ReactorAsyncOperationOptions options) {
this.instrumenter = instrumenter;
this.context = context;
this.request = request;
this.responseType = responseType;
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
this.options = options;
}

@Override
Expand All @@ -85,24 +56,16 @@ public CoreSubscriber<? super T> apply(

if (isFirstSubscriber()) {
return new InstrumentedSubscriber<>(
instrumenter,
context,
request,
responseType,
captureExperimentalSpanAttributes,
coreSubscriber);
instrumenter, context, request, responseType, options, coreSubscriber);
}

Context parentContext = Context.current();
if (instrumenter.shouldStart(parentContext, request)) {
Context context = instrumenter.start(parentContext, request);
return new InstrumentedSubscriber<>(
instrumenter,
context,
request,
responseType,
captureExperimentalSpanAttributes,
coreSubscriber);
if (options.traceMultipleSubscribers()) {
Context parentContext = Context.current();
if (instrumenter.shouldStart(parentContext, request)) {
Context context = instrumenter.start(parentContext, request);
return new InstrumentedSubscriber<>(
instrumenter, context, request, responseType, options, coreSubscriber);
}
}
return coreSubscriber;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ final class InstrumentedSubscriber<REQUEST, RESPONSE, T>
private final Context context;
private final REQUEST request;
private final Class<RESPONSE> responseType;
private final boolean captureExperimentalSpanAttributes;
private final ReactorAsyncOperationOptions options;
private final CoreSubscriber<T> actual;
private Subscription subscription;
private T value;
Expand All @@ -35,14 +35,14 @@ final class InstrumentedSubscriber<REQUEST, RESPONSE, T>
Context context,
REQUEST request,
Class<RESPONSE> responseType,
boolean captureExperimentalSpanAttributes,
ReactorAsyncOperationOptions options,
CoreSubscriber<T> actual) {

this.instrumenter = instrumenter;
this.context = context;
this.request = request;
this.responseType = responseType;
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
this.options = options;
this.actual = actual;
}

Expand All @@ -64,7 +64,7 @@ public void request(long count) {
@Override
public void cancel() {
if (subscription != null) {
if (captureExperimentalSpanAttributes) {
if (options.captureExperimentalSpanAttributes()) {
Span.fromContext(context).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
}
instrumenter.end(context, request, null, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@

import static io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndSupport.tryToGetResponse;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategy;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.util.Objects;
import java.util.function.BiFunction;
import org.reactivestreams.Publisher;
import reactor.core.Fuseable;
import reactor.core.publisher.Flux;
Expand All @@ -24,10 +27,10 @@ public static ReactorAsyncOperationEndStrategyBuilder newBuilder() {
return new ReactorAsyncOperationEndStrategyBuilder();
}

private final boolean captureExperimentalSpanAttributes;
private final ReactorAsyncOperationOptions options;

ReactorAsyncOperationEndStrategy(boolean captureExperimentalSpanAttributes) {
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
ReactorAsyncOperationEndStrategy(ReactorAsyncOperationOptions options) {
this.options = options;
}

@Override
Expand All @@ -43,30 +46,65 @@ public <REQUEST, RESPONSE> Object end(
Object asyncValue,
Class<RESPONSE> responseType) {

if (tryEndSynchronously(instrumenter, context, request, asyncValue, responseType)) {
return asyncValue;
}

if (asyncValue instanceof Mono) {
Mono<?> mono = (Mono<?>) asyncValue;
return InstrumentedOperator.transformMono(
mono, instrumenter, context, request, responseType, captureExperimentalSpanAttributes);
return instrumentMono(mono, instrumenter, context, request, responseType);
} else {
Flux<?> flux = Flux.from((Publisher<?>) asyncValue);
return InstrumentedOperator.transformFlux(
flux, instrumenter, context, request, responseType, captureExperimentalSpanAttributes);
return instrumentFlux(flux, instrumenter, context, request, responseType);
}
}

private <RESPONSE, REQUEST, T> Mono<T> instrumentMono(
Mono<T> mono,
Instrumenter<REQUEST, RESPONSE> instrumenter,
Context context,
REQUEST request,
Class<RESPONSE> responseType) {
Mono<T> withCheckpoint = checkpoint(mono, context, Mono::checkpoint);
if (tryEndSynchronously(mono, instrumenter, context, request, responseType)) {
return withCheckpoint;
}
return withCheckpoint.transform(
InstrumentedOperator.<REQUEST, RESPONSE, T>instrumentedLift(
instrumenter, context, request, responseType, options));
}

private <RESPONSE, REQUEST, T> Flux<T> instrumentFlux(
Flux<T> flux,
Instrumenter<REQUEST, RESPONSE> instrumenter,
Context context,
REQUEST request,
Class<RESPONSE> responseType) {
Flux<T> withCheckpoint = checkpoint(flux, context, Flux::checkpoint);
if (tryEndSynchronously(flux, instrumenter, context, request, responseType)) {
return withCheckpoint;
}
return withCheckpoint.transform(
InstrumentedOperator.<REQUEST, RESPONSE, T>instrumentedLift(
instrumenter, context, request, responseType, options));
}

private <T, P extends Publisher<T>> P checkpoint(
P publisher, Context context, BiFunction<P, String, P> checkpoint) {
if (options.emitCheckpoints()) {
Span currentSpan = Span.fromContextOrNull(context);
if (currentSpan != null) {
return checkpoint.apply(publisher, Objects.toString(currentSpan));
}
}
return publisher;
}

private static <REQUEST, RESPONSE> boolean tryEndSynchronously(
Publisher<?> publisher,
Instrumenter<REQUEST, RESPONSE> instrumenter,
Context context,
REQUEST request,
Object asyncValue,
Class<RESPONSE> responseType) {

if (asyncValue instanceof Fuseable.ScalarCallable) {
Fuseable.ScalarCallable<?> scalarCallable = (Fuseable.ScalarCallable<?>) asyncValue;
if (publisher instanceof Fuseable.ScalarCallable) {
Fuseable.ScalarCallable<?> scalarCallable = (Fuseable.ScalarCallable<?>) publisher;
try {
Object result = scalarCallable.call();
instrumenter.end(context, request, tryToGetResponse(responseType, result), null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

public final class ReactorAsyncOperationEndStrategyBuilder {
private boolean captureExperimentalSpanAttributes;
private boolean emitCheckpoints;
private boolean traceMultipleSubscribers;

ReactorAsyncOperationEndStrategyBuilder() {}

Expand All @@ -16,7 +18,21 @@ public ReactorAsyncOperationEndStrategyBuilder setCaptureExperimentalSpanAttribu
return this;
}

public ReactorAsyncOperationEndStrategyBuilder setEmitCheckpoints(boolean emitCheckpoints) {
this.emitCheckpoints = emitCheckpoints;
return this;
}

public ReactorAsyncOperationEndStrategyBuilder setTraceMultipleSubscribers(
boolean traceMultipleSubscribers) {
this.traceMultipleSubscribers = traceMultipleSubscribers;
return this;
}

public ReactorAsyncOperationEndStrategy build() {
return new ReactorAsyncOperationEndStrategy(captureExperimentalSpanAttributes);
ReactorAsyncOperationOptions options =
new ReactorAsyncOperationOptions(
captureExperimentalSpanAttributes, emitCheckpoints, traceMultipleSubscribers);
return new ReactorAsyncOperationEndStrategy(options);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.reactor;

public final class ReactorAsyncOperationOptions {
private final boolean captureExperimentalSpanAttributes;
private final boolean emitCheckpoints;
private final boolean traceMultipleSubscribers;

ReactorAsyncOperationOptions(
boolean captureExperimentalSpanAttributes,
boolean emitCheckpoint,
boolean traceMultipleSubscribers) {
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
this.emitCheckpoints = emitCheckpoint;
this.traceMultipleSubscribers = traceMultipleSubscribers;
}

public boolean captureExperimentalSpanAttributes() {
return captureExperimentalSpanAttributes;
}

public boolean emitCheckpoints() {
return emitCheckpoints;
}

public boolean traceMultipleSubscribers() {
return traceMultipleSubscribers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,8 @@ public static TracingOperatorBuilder newBuilder() {

private final ReactorAsyncOperationEndStrategy asyncOperationEndStrategy;

TracingOperator(boolean captureExperimentalSpanAttributes) {
this.asyncOperationEndStrategy =
ReactorAsyncOperationEndStrategy.newBuilder()
.setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes)
.build();
TracingOperator(ReactorAsyncOperationEndStrategy asyncOperationEndStrategy) {
this.asyncOperationEndStrategy = asyncOperationEndStrategy;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,30 @@
package io.opentelemetry.instrumentation.reactor;

public final class TracingOperatorBuilder {
private boolean captureExperimentalSpanAttributes;
private final ReactorAsyncOperationEndStrategyBuilder asyncOperationEndStrategyBuilder;

TracingOperatorBuilder() {}
TracingOperatorBuilder() {
asyncOperationEndStrategyBuilder = ReactorAsyncOperationEndStrategy.newBuilder();
}

public TracingOperatorBuilder setCaptureExperimentalSpanAttributes(
boolean captureExperimentalSpanAttributes) {
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
asyncOperationEndStrategyBuilder.setCaptureExperimentalSpanAttributes(
captureExperimentalSpanAttributes);
return this;
}

public TracingOperatorBuilder setEmitCheckpoints(boolean emitCheckpoints) {
asyncOperationEndStrategyBuilder.setEmitCheckpoints(emitCheckpoints);
return this;
}

public TracingOperatorBuilder setTraceMultipleSubscribers(boolean traceMultipleSubscribers) {
asyncOperationEndStrategyBuilder.setTraceMultipleSubscribers(traceMultipleSubscribers);
return this;
}

public TracingOperator build() {
return new TracingOperator(captureExperimentalSpanAttributes);
return new TracingOperator(asyncOperationEndStrategyBuilder.build());
}
}
Loading

0 comments on commit 6624968

Please sign in to comment.