Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clear context before flux retry #8456

Merged
merged 3 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
* https://github.com/opentracing-contrib/java-reactor/blob/master/src/main/java/io/opentracing/contrib/reactor/TracedSubscriber.java
*/
public class TracingSubscriber<T> implements CoreSubscriber<T> {
private static final Class<?> fluxRetrySubscriberClass = getFluxRetrySubscriberClass();
private final io.opentelemetry.context.Context traceContext;
private final Subscriber<? super T> subscriber;
private final Context context;
Expand Down Expand Up @@ -64,7 +65,13 @@ public void onNext(T o) {

@Override
public void onError(Throwable throwable) {
withActiveSpan(() -> subscriber.onError(throwable));
if (!hasContextToPropagate && fluxRetrySubscriberClass == subscriber.getClass()) {
// clear context for retry to avoid having retried operations run with currently active
// context as parent context
withActiveSpan(io.opentelemetry.context.Context.root(), () -> subscriber.onError(throwable));
} else {
withActiveSpan(() -> subscriber.onError(throwable));
}
}

@Override
Expand All @@ -78,12 +85,24 @@ public Context currentContext() {
}

private void withActiveSpan(Runnable runnable) {
if (hasContextToPropagate) {
try (Scope ignored = traceContext.makeCurrent()) {
withActiveSpan(hasContextToPropagate ? traceContext : null, runnable);
}

private static void withActiveSpan(io.opentelemetry.context.Context context, Runnable runnable) {
if (context != null) {
try (Scope ignored = context.makeCurrent()) {
runnable.run();
}
} else {
runnable.run();
}
}

private static Class<?> getFluxRetrySubscriberClass() {
try {
return Class.forName("reactor.core.publisher.FluxRetry$RetrySubscriber");
} catch (ClassNotFoundException exception) {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.instrumentation.reactor.v3_1;

import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanName;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry;
import static java.lang.invoke.MethodType.methodType;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -21,6 +22,7 @@
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -406,6 +408,68 @@ void doesNotOverrideInnerCurrentSpansWithThereIsOuterCurrent() {
.hasAttributes(attributeEntry("onNext", true))));
}

@Test
void doesNotLeakContextOnRetry() {
// retry calls subscribe again from onError where we have active context, check that this
// context is not used as parent for retried operations
AtomicBoolean beforeRetry = new AtomicBoolean(true);
Flux<Integer> publish =
Flux.create(
sink -> {
for (int i = 0; i < 2; i++) {
Span s =
tracer
.spanBuilder(
"produce " + (beforeRetry.get() ? "before" : "after") + " retry " + i)
.startSpan();
try (Scope scope = s.makeCurrent()) {
sink.next(i);
} finally {
s.end();
}
}
});

Flux.defer(() -> publish.delaySubscription(Duration.ofMillis(1)))
.doOnNext(
message -> {
tracer.spanBuilder("process " + message).startSpan().end();
})
.handle(
(message, sink) -> {
if (message == 1 && beforeRetry.compareAndSet(true, false)) {
sink.error(new RuntimeException("Message has error"));
} else {
sink.next(message);
}
})
.retry()
.subscribe();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a test where there's an active parent span when .subscribe() is called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test with parent span, all child spans are under that parent.


testing.waitAndAssertSortedTraces(
orderByRootSpanName(
"produce before retry 0",
"produce before retry 1",
"produce after retry 0",
"produce after retry 1"),
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("produce before retry 0").hasNoParent(),
span -> span.hasName("process 0").hasParent(trace.getSpan(0))),
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("produce before retry 1").hasNoParent(),
span -> span.hasName("process 1").hasParent(trace.getSpan(0))),
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("produce after retry 0").hasNoParent(),
span -> span.hasName("process 0").hasParent(trace.getSpan(0))),
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("produce after retry 1").hasNoParent(),
span -> span.hasName("process 1").hasParent(trace.getSpan(0))));
}

@SuppressWarnings("unchecked")
private <T> Mono<T> monoSpan(Mono<T> mono, String spanName) {

Expand Down