Skip to content

Commit

Permalink
Allow scanning instrumented reactor publishers and only allow registe… (
Browse files Browse the repository at this point in the history
open-telemetry#5755)

* Allow scanning instrumented reactor publishers and only allow registering once.

* Avoid atomicboolean

* Clean

* Bug
  • Loading branch information
anuraaga authored and RashmiRam committed May 23, 2022
1 parent b9a87ff commit 802bc5a
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategies;
import java.util.function.BiFunction;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
Expand All @@ -39,6 +40,8 @@
/** Based on Spring Sleuth's Reactor instrumentation. */
public final class ContextPropagationOperator {

private static final Object VALUE = new Object();

public static ContextPropagationOperator create() {
return builder().build();
}
Expand Down Expand Up @@ -98,14 +101,20 @@ public static Context getOpenTelemetryContext(
* reactive stream. This should generally be called in a static initializer block in your
* application.
*/
public void registerOnEachOperator() {
public synchronized void registerOnEachOperator() {
if (enabled) {
return;
}
Hooks.onEachOperator(TracingSubscriber.class.getName(), tracingLift(asyncOperationEndStrategy));
AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy);
enabled = true;
}

/** Unregisters the hook registered by {@link #registerOnEachOperator()}. */
public void resetOnEachOperator() {
public synchronized void resetOnEachOperator() {
if (!enabled) {
return;
}
Hooks.resetOnEachOperator(TracingSubscriber.class.getName());
AsyncOperationEndStrategies.instance().unregisterStrategy(asyncOperationEndStrategy);
enabled = false;
Expand All @@ -125,8 +134,7 @@ public static <T> Mono<T> runWithContext(Mono<T> publisher, Context tracingConte
// this hack forces 'publisher' to run in the onNext callback of `TracingSubscriber`
// (created for this publisher) and with current() span that refers to span created here
// without the hack, publisher runs in the onAssembly stage, before traceContext is made current
return ScalarPropagatingMono.INSTANCE
.flatMap(i -> publisher)
return ScalarPropagatingMono.create(publisher)
.subscriberContext(ctx -> storeOpenTelemetryContext(ctx, tracingContext));
}

Expand All @@ -139,8 +147,7 @@ public static <T> Flux<T> runWithContext(Flux<T> publisher, Context tracingConte
// this hack forces 'publisher' to run in the onNext callback of `TracingSubscriber`
// (created for this publisher) and with current() span that refers to span created here
// without the hack, publisher runs in the onAssembly stage, before traceContext is made current
return ScalarPropagatingFlux.INSTANCE
.flatMap(i -> publisher)
return ScalarPropagatingFlux.create(publisher)
.subscriberContext(ctx -> storeOpenTelemetryContext(ctx, tracingContext));
}

Expand Down Expand Up @@ -177,29 +184,61 @@ static void subscribeInActiveSpan(CoreSubscriber<? super Object> actual, Object
}
}

static class ScalarPropagatingMono extends Mono<Object> {
public static final Mono<Object> INSTANCE = new ScalarPropagatingMono();
static class ScalarPropagatingMono extends Mono<Object> implements Scannable {

private final Object value = new Object();
static <T> Mono<T> create(Mono<T> source) {
return new ScalarPropagatingMono(source).flatMap(unused -> source);
}

private ScalarPropagatingMono() {}
private final Mono<?> source;

private ScalarPropagatingMono(Mono<?> source) {
this.source = source;
}

@Override
public void subscribe(CoreSubscriber<? super Object> actual) {
subscribeInActiveSpan(actual, value);
subscribeInActiveSpan(actual, VALUE);
}

@Override
@Nullable
// Interface method doesn't have type parameter so we can't add it either.
@SuppressWarnings("rawtypes")
public Object scanUnsafe(Attr attr) {
if (attr == Attr.PARENT) {
return source;
}
return null;
}
}

static class ScalarPropagatingFlux extends Flux<Object> {
public static final Flux<Object> INSTANCE = new ScalarPropagatingFlux();
static class ScalarPropagatingFlux extends Flux<Object> implements Scannable {

private final Object value = new Object();
static <T> Flux<T> create(Flux<T> source) {
return new ScalarPropagatingFlux(source).flatMap(unused -> source);
}

private ScalarPropagatingFlux() {}
private final Flux<?> source;

private ScalarPropagatingFlux(Flux<?> source) {
this.source = source;
}

@Override
public void subscribe(CoreSubscriber<? super Object> actual) {
subscribeInActiveSpan(actual, value);
subscribeInActiveSpan(actual, VALUE);
}

@Override
@Nullable
// Interface method doesn't have type parameter so we can't add it either.
@SuppressWarnings("rawtypes")
public Object scanUnsafe(Scannable.Attr attr) {
if (attr == Scannable.Attr.PARENT) {
return source;
}
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;

class ReactorCoreTest extends AbstractReactorCoreTest {

Expand Down Expand Up @@ -69,17 +71,16 @@ void monoInNonBlockingPublisherAssembly() {

@Test
void fluxInNonBlockingPublisherAssembly() {
Flux<Integer> source =
Flux.defer(
() -> {
Span.current().setAttribute("inner", "foo");
return Flux.just(5, 6);
});
testing.runWithSpan(
"parent",
() ->
ContextPropagationOperator.ScalarPropagatingFlux.INSTANCE
.flatMap(
unused ->
Flux.defer(
() -> {
Span.current().setAttribute("inner", "foo");
return Flux.just(5, 6);
}))
ContextPropagationOperator.ScalarPropagatingFlux.create(source)
.doOnEach(
signal -> {
if (signal.isOnError()) {
Expand Down Expand Up @@ -199,9 +200,37 @@ void noTracingBeforeRegistration() {
trace -> trace.hasSpansSatisfyingExactly(span -> span.hasName("after").hasNoParent()));
}

@Test
void monoParentsAccessible() {
UnicastProcessor<String> source = UnicastProcessor.create();
Mono<String> mono =
ContextPropagationOperator.runWithContext(source.singleOrEmpty(), Context.root());

source.onNext("foo");
source.onComplete();

assertThat(mono.block()).isEqualTo("foo");

assertThat(((Scannable) mono).parents().filter(UnicastProcessor.class::isInstance).findFirst())
.isPresent();
}

@Test
void fluxParentsAccessible() {
UnicastProcessor<String> source = UnicastProcessor.create();
Flux<String> flux = ContextPropagationOperator.runWithContext(source, Context.root());

source.onNext("foo");
source.onComplete();

assertThat(flux.collectList().block()).containsExactly("foo");

assertThat(((Scannable) flux).parents().filter(UnicastProcessor.class::isInstance).findFirst())
.isPresent();
}

private <T> Mono<T> monoSpan(Mono<T> mono, String spanName) {
return ContextPropagationOperator.ScalarPropagatingMono.INSTANCE
.flatMap(unused -> mono)
return ContextPropagationOperator.ScalarPropagatingMono.create(mono)
.doOnEach(
signal -> {
if (signal.isOnError()) {
Expand Down

0 comments on commit 802bc5a

Please sign in to comment.