diff --git a/instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/v3_1/TracingSubscriber.java b/instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/v3_1/TracingSubscriber.java index f9195d56360b..e3f81bc900af 100644 --- a/instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/v3_1/TracingSubscriber.java +++ b/instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/v3_1/TracingSubscriber.java @@ -22,6 +22,7 @@ import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; +import java.util.function.Supplier; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; @@ -56,30 +57,40 @@ public TracingSubscriber( @Override public void onSubscribe(Subscription subscription) { - withActiveSpan(() -> subscriber.onSubscribe(subscription)); + try (Scope ignore = openScope()) { + subscriber.onSubscribe(subscription); + } } @Override public void onNext(T o) { - withActiveSpan(() -> subscriber.onNext(o)); + try (Scope ignore = openScope()) { + subscriber.onNext(o); + } } @Override public void onError(Throwable throwable) { + Supplier scopeSupplier; if (!hasContextToPropagate && (fluxRetrySubscriberClass == subscriber.getClass() || fluxRetryWhenSubscriberClass == subscriber.getClass())) { // clear context for retry to avoid having retried operations run with currently active // context as parent context - withActiveSpan(io.opentelemetry.context.Context.root(), () -> subscriber.onError(throwable)); + scopeSupplier = () -> openScope(io.opentelemetry.context.Context.root()); } else { - withActiveSpan(() -> subscriber.onError(throwable)); + scopeSupplier = () -> openScope(); + } + try (Scope ignore = scopeSupplier.get()) { + subscriber.onError(throwable); } } @Override public void onComplete() { - withActiveSpan(subscriber::onComplete); + try (Scope ignore = openScope()) { + subscriber.onComplete(); + } } @Override @@ -87,18 +98,12 @@ public Context currentContext() { return context; } - private void withActiveSpan(Runnable runnable) { - withActiveSpan(hasContextToPropagate ? traceContext : null, runnable); + private Scope openScope() { + return openScope(hasContextToPropagate ? traceContext : null); } - private static void withActiveSpan(io.opentelemetry.context.Context context, Runnable runnable) { - if (context != null) { - try (Scope ignored = context.makeCurrent()) { - runnable.run(); - } - } else { - runnable.run(); - } + private static Scope openScope(io.opentelemetry.context.Context context) { + return context != null ? context.makeCurrent() : null; } private static Class getFluxRetrySubscriberClass() {