Skip to content

Commit

Permalink
Create spans for multiple subscriptions to traced Reactor publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
HaloFour committed Aug 15, 2021
1 parent 080c85d commit 696f8e2
Show file tree
Hide file tree
Showing 5 changed files with 308 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.opentelemetry.instrumentation.reactor.TracedWithSpan
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.ReplayProcessor
import reactor.core.publisher.UnicastProcessor
import reactor.test.StepVerifier

Expand Down Expand Up @@ -70,6 +71,51 @@ class ReactorWithSpanInstrumentationTest extends AgentInstrumentationSpecificati
}
}

def "should capture span for eventually completed Mono per subscription"() {
setup:
def source = ReplayProcessor.<String>create()
def mono = source.singleOrEmpty()
def result = new TracedWithSpan()
.mono(mono)
def verifier = StepVerifier.create(result)
.expectSubscription()

expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}

source.onNext("Value")
source.onComplete()

verifier.expectNext("Value")
.verifyComplete()

StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()

assertTraces(2) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.mono"
kind SpanKind.INTERNAL
hasNoParent()
attributes {
}
}
}
trace(1, 1) {
span(0) {
name "TracedWithSpan.mono"
kind SpanKind.INTERNAL
hasNoParent()
attributes {
}
}
}
}
}

def "should capture span for already errored Mono"() {
setup:
def error = new IllegalArgumentException("Boom")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.reactor;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;

final class InstrumentedOperator<REQUEST, RESPONSE, T>
implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {

private final Instrumenter<REQUEST, RESPONSE> instrumenter;
private final Context context;
private final REQUEST request;
private final Class<RESPONSE> responseType;
private final boolean captureExperimentalSpanAttributes;
private final AtomicBoolean firstSubscriber = new AtomicBoolean(true);

static <REQUEST, RESPONSE, T> Mono<T> transformMono(
Mono<T> mono,
Instrumenter<REQUEST, RESPONSE> instrumenter,
Context context,
REQUEST request,
Class<RESPONSE> responseType,
boolean captureExperimentalSpanAttributes) {

return mono.transform(
InstrumentedOperator.<REQUEST, RESPONSE, T>tracingLift(
instrumenter, context, request, responseType, captureExperimentalSpanAttributes));
}

static <REQUEST, RESPONSE, T> Flux<T> transformFlux(
Flux<T> flux,
Instrumenter<REQUEST, RESPONSE> instrumenter,
Context context,
REQUEST request,
Class<RESPONSE> responseType,
boolean captureExperimentalSpanAttributes) {

return flux.transform(
InstrumentedOperator.<REQUEST, RESPONSE, T>tracingLift(
instrumenter, context, request, responseType, captureExperimentalSpanAttributes));
}

private static <REQUEST, RESPONSE, T>
Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift(
Instrumenter<REQUEST, RESPONSE> instrumenter,
Context context,
REQUEST request,
Class<RESPONSE> responseType,
boolean captureExperimentalSpanAttributes) {

return Operators.lift(
new InstrumentedOperator<>(
instrumenter, context, request, responseType, captureExperimentalSpanAttributes));
}

private InstrumentedOperator(
Instrumenter<REQUEST, RESPONSE> instrumenter,
Context context,
REQUEST request,
Class<RESPONSE> responseType,
boolean captureExperimentalSpanAttributes) {
this.instrumenter = instrumenter;
this.context = context;
this.request = request;
this.responseType = responseType;
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
}

@Override
public CoreSubscriber<? super T> apply(
Scannable scannable, CoreSubscriber<? super T> coreSubscriber) {

if (isFirstSubscriber()) {
return new InstrumentedSubscriber<>(
instrumenter,
context,
request,
responseType,
captureExperimentalSpanAttributes,
coreSubscriber);
}

Context parentContext = Context.current();
if (instrumenter.shouldStart(parentContext, request)) {
Context context = instrumenter.start(parentContext, request);
return new InstrumentedSubscriber<>(
instrumenter,
context,
request,
responseType,
captureExperimentalSpanAttributes,
coreSubscriber);
}
return coreSubscriber;
}

private boolean isFirstSubscriber() {
return firstSubscriber.compareAndSet(true, false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.reactor;

import static io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndSupport.tryToGetResponse;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Operators;

final class InstrumentedSubscriber<REQUEST, RESPONSE, T>
implements CoreSubscriber<T>, Subscription {

private static final AttributeKey<Boolean> CANCELED_ATTRIBUTE_KEY =
AttributeKey.booleanKey("reactor.canceled");

private final Instrumenter<REQUEST, RESPONSE> instrumenter;
private final Context context;
private final REQUEST request;
private final Class<RESPONSE> responseType;
private final boolean captureExperimentalSpanAttributes;
private final CoreSubscriber<T> actual;
private Subscription subscription;
private T value;

InstrumentedSubscriber(
Instrumenter<REQUEST, RESPONSE> instrumenter,
Context context,
REQUEST request,
Class<RESPONSE> responseType,
boolean captureExperimentalSpanAttributes,
CoreSubscriber<T> actual) {

this.instrumenter = instrumenter;
this.context = context;
this.request = request;
this.responseType = responseType;
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
this.actual = actual;
}

@Override
public void onSubscribe(Subscription subscription) {
if (Operators.validate(this.subscription, subscription)) {
this.subscription = subscription;
actual.onSubscribe(this);
}
}

@Override
public void request(long count) {
if (subscription != null) {
subscription.request(count);
}
}

@Override
public void cancel() {
if (subscription != null) {
if (captureExperimentalSpanAttributes) {
Span.fromContext(context).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
}
instrumenter.end(context, request, null, null);
subscription.cancel();
}
}

@Override
public void onNext(T value) {
this.value = value;
actual.onNext(value);
}

@Override
public void onError(Throwable error) {
instrumenter.end(context, request, null, error);
actual.onError(error);
}

@Override
public void onComplete() {
instrumenter.end(context, request, tryToGetResponse(responseType, value), null);
actual.onComplete();
}

@Override
public reactor.util.context.Context currentContext() {
return actual.currentContext();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,15 @@

import static io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndSupport.tryToGetResponse;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategy;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import reactor.core.Fuseable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public final class ReactorAsyncOperationEndStrategy implements AsyncOperationEndStrategy {
private static final AttributeKey<Boolean> CANCELED_ATTRIBUTE_KEY =
AttributeKey.booleanKey("reactor.canceled");

public static ReactorAsyncOperationEndStrategy create() {
return newBuilder().build();
}
Expand Down Expand Up @@ -49,71 +43,38 @@ public <REQUEST, RESPONSE> Object end(
Object asyncValue,
Class<RESPONSE> responseType) {

EndOnFirstNotificationConsumer notificationConsumer =
new EndOnFirstNotificationConsumer(context) {
@Override
protected void end(Object result, Throwable error) {
instrumenter.end(context, request, tryToGetResponse(responseType, result), error);
}
};
if (tryEndSynchronously(instrumenter, context, request, asyncValue, responseType)) {
return asyncValue;
}

if (asyncValue instanceof Mono) {
Mono<?> mono = (Mono<?>) asyncValue;
return mono.doOnError(notificationConsumer)
.doOnSuccess(notificationConsumer::onSuccess)
.doOnCancel(notificationConsumer::onCancel);
return InstrumentedOperator.transformMono(
mono, instrumenter, context, request, responseType, captureExperimentalSpanAttributes);
} else {
Flux<?> flux = Flux.from((Publisher<?>) asyncValue);
return flux.doOnError(notificationConsumer)
.doOnComplete(notificationConsumer)
.doOnCancel(notificationConsumer::onCancel);
return InstrumentedOperator.transformFlux(
flux, instrumenter, context, request, responseType, captureExperimentalSpanAttributes);
}
}

/**
* Helper class to ensure that the span is ended exactly once regardless of how many OnComplete or
* OnError notifications are received. Multiple notifications can happen anytime multiple
* subscribers subscribe to the same publisher.
*/
private abstract class EndOnFirstNotificationConsumer extends AtomicBoolean
implements Runnable, Consumer<Throwable> {

private final Context context;

protected EndOnFirstNotificationConsumer(Context context) {
super(false);
this.context = context;
}

public <T> void onSuccess(T result) {
accept(result, null);
}

public void onCancel() {
if (compareAndSet(false, true)) {
if (captureExperimentalSpanAttributes) {
Span.fromContext(context).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
}
end(null, null);
}
}

@Override
public void run() {
accept(null, null);
}

@Override
public void accept(Throwable exception) {
end(null, exception);
}
private static <REQUEST, RESPONSE> boolean tryEndSynchronously(
Instrumenter<REQUEST, RESPONSE> instrumenter,
Context context,
REQUEST request,
Object asyncValue,
Class<RESPONSE> responseType) {

private void accept(Object result, Throwable error) {
if (compareAndSet(false, true)) {
end(result, error);
if (asyncValue instanceof Fuseable.ScalarCallable) {
Fuseable.ScalarCallable<?> scalarCallable = (Fuseable.ScalarCallable<?>) asyncValue;
try {
Object result = scalarCallable.call();
instrumenter.end(context, request, tryToGetResponse(responseType, result), null);
} catch (Throwable error) {
instrumenter.end(context, request, null, error);
}
return true;
}

protected abstract void end(Object result, Throwable error);
return false;
}
}
Loading

0 comments on commit 696f8e2

Please sign in to comment.