Skip to content

Commit

Permalink
End span on cancellation of subscription to reactive publishers (#3153)
Browse files Browse the repository at this point in the history
* Stop span on cancellation of subscription to reactive publisher

* Add semantic attribute on cancelation of reactive publisher

* Change TracingOperator and TracingAssembly to accept configuration from Javaagent
  • Loading branch information
HaloFour authored Jun 3, 2021
1 parent e176267 commit 1402d11
Show file tree
Hide file tree
Showing 36 changed files with 1,676 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ import io.lettuce.core.RedisClient
import io.lettuce.core.resource.ClientResources
import io.opentelemetry.instrumentation.reactor.TracingOperator
import io.opentelemetry.instrumentation.test.LibraryTestTrait
import spock.lang.Shared

class LettuceReactiveClientTest extends AbstractLettuceReactiveClientTest implements LibraryTestTrait {
@Shared
TracingOperator tracingOperator = TracingOperator.create()

@Override
RedisClient createClient(String uri) {
return RedisClient.create(
Expand All @@ -21,10 +25,10 @@ class LettuceReactiveClientTest extends AbstractLettuceReactiveClientTest implem
}

def setupSpec() {
TracingOperator.registerOnEachOperator()
tracingOperator.registerOnEachOperator()
}

def cleanupSpec() {
TracingOperator.resetOnEachOperator()
tracingOperator.resetOnEachOperator()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ muzzle {
}
}

tasks.withType(Test).configureEach {
// TODO run tests both with and without experimental span attributes
jvmArgs "-Dotel.instrumentation.reactor.experimental-span-attributes=true"
}

dependencies {
implementation project(':instrumentation:reactor-3.1:library')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static net.bytebuddy.matcher.ElementMatchers.isTypeInitializer;
import static net.bytebuddy.matcher.ElementMatchers.named;

import io.opentelemetry.instrumentation.api.config.Config;
import io.opentelemetry.instrumentation.reactor.TracingOperator;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
Expand All @@ -31,7 +32,13 @@ public void transform(TypeTransformer transformer) {
public static class ResetOnEachOperatorAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void postStaticInitializer() {
TracingOperator.registerOnEachOperator();
TracingOperator.newBuilder()
.setCaptureExperimentalSpanAttributes(
Config.get()
.getBooleanProperty(
"otel.instrumentation.reactor.experimental-span-attributes", false))
.build()
.registerOnEachOperator();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,35 @@ class ReactorWithSpanInstrumentationTest extends AgentInstrumentationSpecificati
}
}

def "should capture span for canceled Mono"() {
setup:
def source = UnicastProcessor.<String>create()
def mono = source.singleOrEmpty()
def result = new TracedWithSpan()
.mono(mono)
def verifier = StepVerifier.create(result)
.expectSubscription()

expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}

verifier.thenCancel().verify()

assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.mono"
kind SpanKind.INTERNAL
hasNoParent()
attributes {
"reactor.canceled" true
}
}
}
}
}

def "should capture span for already completed Flux"() {
setup:
def source = Flux.just("Value")
Expand Down Expand Up @@ -242,4 +271,35 @@ class ReactorWithSpanInstrumentationTest extends AgentInstrumentationSpecificati
}
}
}

def "should capture span for canceled Flux"() {
setup:
def error = new IllegalArgumentException("Boom")
def source = UnicastProcessor.<String>create()
def result = new TracedWithSpan()
.flux(source)
def verifier = StepVerifier.create(result)
.expectSubscription()

expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}

source.onError(error)

verifier.thenCancel().verify()

assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.flux"
kind SpanKind.INTERNAL
hasNoParent()
attributes {
"reactor.canceled" true
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package io.opentelemetry.instrumentation.reactor;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategy;
Expand All @@ -14,8 +16,23 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public enum ReactorAsyncSpanEndStrategy implements AsyncSpanEndStrategy {
INSTANCE;
public final class ReactorAsyncSpanEndStrategy implements AsyncSpanEndStrategy {
private static final AttributeKey<Boolean> CANCELED_ATTRIBUTE_KEY =
AttributeKey.booleanKey("reactor.canceled");

public static ReactorAsyncSpanEndStrategy create() {
return newBuilder().build();
}

public static ReactorAsyncSpanEndStrategyBuilder newBuilder() {
return new ReactorAsyncSpanEndStrategyBuilder();
}

private final boolean captureExperimentalSpanAttributes;

ReactorAsyncSpanEndStrategy(boolean captureExperimentalSpanAttributes) {
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
}

@Override
public boolean supports(Class<?> returnType) {
Expand All @@ -29,10 +46,14 @@ public Object end(BaseTracer tracer, Context context, Object returnValue) {
new EndOnFirstNotificationConsumer(tracer, context);
if (returnValue instanceof Mono) {
Mono<?> mono = (Mono<?>) returnValue;
return mono.doOnError(notificationConsumer).doOnSuccess(notificationConsumer::onSuccess);
return mono.doOnError(notificationConsumer)
.doOnSuccess(notificationConsumer::onSuccess)
.doOnCancel(notificationConsumer::onCancel);
} else {
Flux<?> flux = Flux.from((Publisher<?>) returnValue);
return flux.doOnError(notificationConsumer).doOnComplete(notificationConsumer);
return flux.doOnError(notificationConsumer)
.doOnComplete(notificationConsumer)
.doOnCancel(notificationConsumer::onCancel);
}
}

Expand All @@ -41,7 +62,7 @@ public Object end(BaseTracer tracer, Context context, Object returnValue) {
* OnError notifications are received. Multiple notifications can happen anytime multiple
* subscribers subscribe to the same publisher.
*/
private static final class EndOnFirstNotificationConsumer extends AtomicBoolean
private final class EndOnFirstNotificationConsumer extends AtomicBoolean
implements Runnable, Consumer<Throwable> {

private final BaseTracer tracer;
Expand All @@ -57,6 +78,15 @@ public <T> void onSuccess(T ignored) {
accept(null);
}

public void onCancel() {
if (compareAndSet(false, true)) {
if (captureExperimentalSpanAttributes) {
Span.fromContext(context).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
}
tracer.end(context);
}
}

@Override
public void run() {
accept(null);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.reactor;

public final class ReactorAsyncSpanEndStrategyBuilder {
private boolean captureExperimentalSpanAttributes;

ReactorAsyncSpanEndStrategyBuilder() {}

public ReactorAsyncSpanEndStrategyBuilder setCaptureExperimentalSpanAttributes(
boolean captureExperimentalSpanAttributes) {
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
return this;
}

public ReactorAsyncSpanEndStrategy build() {
return new ReactorAsyncSpanEndStrategy(captureExperimentalSpanAttributes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,39 @@
/** Based on Spring Sleuth's Reactor instrumentation. */
public final class TracingOperator {

public static TracingOperator create() {
return newBuilder().build();
}

public static TracingOperatorBuilder newBuilder() {
return new TracingOperatorBuilder();
}

private final boolean captureExperimentalSpanAttributes;

TracingOperator(boolean captureExperimentalSpanAttributes) {
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
}

/**
* Registers a hook that applies to every operator, propagating {@link Context} to downstream
* callbacks to ensure spans in the {@link Context} are available throughout the lifetime of a
* reactive stream. This should generally be called in a static initializer block in your
* application.
*/
public static void registerOnEachOperator() {
public void registerOnEachOperator() {
Hooks.onEachOperator(TracingSubscriber.class.getName(), tracingLift());
AsyncSpanEndStrategies.getInstance().registerStrategy(ReactorAsyncSpanEndStrategy.INSTANCE);
AsyncSpanEndStrategies.getInstance()
.registerStrategy(
ReactorAsyncSpanEndStrategy.newBuilder()
.setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes)
.build());
}

/** Unregisters the hook registered by {@link #registerOnEachOperator()}. */
public static void resetOnEachOperator() {
public void resetOnEachOperator() {
Hooks.resetOnEachOperator(TracingSubscriber.class.getName());
AsyncSpanEndStrategies.getInstance().unregisterStrategy(ReactorAsyncSpanEndStrategy.INSTANCE);
AsyncSpanEndStrategies.getInstance().unregisterStrategy(ReactorAsyncSpanEndStrategy.class);
}

private static <T> Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift() {
Expand All @@ -69,6 +87,4 @@ public CoreSubscriber<? super T> apply(Scannable publisher, CoreSubscriber<? sup
return new TracingSubscriber<>(sub, sub.currentContext());
}
}

private TracingOperator() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.reactor;

public final class TracingOperatorBuilder {
private boolean captureExperimentalSpanAttributes;

TracingOperatorBuilder() {}

public TracingOperatorBuilder setCaptureExperimentalSpanAttributes(
boolean captureExperimentalSpanAttributes) {
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
return this;
}

public TracingOperator build() {
return new TracingOperator(captureExperimentalSpanAttributes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class HooksTest extends LibraryInstrumentationSpecification {

def "can reset out hooks"() {
setup:
def underTest = TracingOperator.create()
AtomicReference<CoreSubscriber> subscriber = new AtomicReference<>()

when: "no hook registered"
Expand All @@ -23,14 +24,14 @@ class HooksTest extends LibraryInstrumentationSpecification {
!(subscriber.get() instanceof TracingSubscriber)

when: "hook registered"
TracingOperator.registerOnEachOperator()
underTest.registerOnEachOperator()
new CapturingMono(subscriber).map { it + 1 }.subscribe()

then:
subscriber.get() instanceof TracingSubscriber

when: "hook reset"
TracingOperator.resetOnEachOperator()
underTest.resetOnEachOperator()
new CapturingMono(subscriber).map { it + 1 }.subscribe()

then:
Expand Down
Loading

0 comments on commit 1402d11

Please sign in to comment.