Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update webflux to Instrumenter API (and improvements/simplifications) #3798

Merged
merged 17 commits into from
Aug 13, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ dependencies {
tasks.withType<Test>().configureEach {
// TODO run tests both with and without experimental span attributes
jvmArgs("-Dotel.instrumentation.spring-webflux.experimental-span-attributes=true")
// TODO(trask): try removing this if/when latest webflux instrumentations changes prove out
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried it, strict context check still fails. I believe we need to wait for netty request processing to complete before doing context leaks check. I'll create a pr for this.

// TODO(anuraaga): There is no actual context leak - it just seems that the server-side does not
// fully complete processing before the test cases finish, which is when we check for context
// leaks. Adding Thread.sleep(1000) just before checking for leaks allows it to pass but is not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,13 @@

package io.opentelemetry.javaagent.instrumentation.spring.webflux.server;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import static io.opentelemetry.javaagent.instrumentation.spring.webflux.server.WebfluxSingletons.instrumenter;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.ClassNames;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

public class AdviceUtils {

Expand All @@ -35,114 +27,16 @@ public static String spanNameForHandler(Object handler) {
return className + ".handle";
}

public static <T> Mono<T> setPublisherSpan(
Mono<T> mono, io.opentelemetry.context.Context context) {
return mono.<T>transform(finishSpanNextOrError(context));
}

/**
* Idea for this has been lifted from https://github.com/reactor/reactor-core/issues/947. Newer
* versions of reactor-core have easier way to access context but we want to support older
* versions.
*/
public static <T> Function<? super Publisher<T>, ? extends Publisher<T>> finishSpanNextOrError(
io.opentelemetry.context.Context context) {
return Operators.lift(
(scannable, subscriber) -> new SpanFinishingSubscriber<>(subscriber, context));
}

public static void finishSpanIfPresent(ServerWebExchange exchange, Throwable throwable) {
if (exchange != null) {
finishSpanIfPresentInAttributes(exchange.getAttributes(), throwable);
}
public static <T> Mono<T> end(Mono<T> mono, ServerWebExchange exchange) {
return mono.doOnError(throwable -> end(exchange, throwable))
.doOnSuccess(t -> end(exchange, null))
.doOnCancel(() -> end(exchange, null));
}

public static void finishSpanIfPresent(ServerRequest serverRequest, Throwable throwable) {
if (serverRequest != null) {
finishSpanIfPresentInAttributes(serverRequest.attributes(), throwable);
}
}

static void finishSpanIfPresent(io.opentelemetry.context.Context context, Throwable throwable) {
private static void end(ServerWebExchange exchange, @Nullable Throwable throwable) {
Context context = (Context) exchange.getAttributes().get(AdviceUtils.CONTEXT_ATTRIBUTE);
if (context != null) {
Span span = Span.fromContext(context);
if (throwable != null) {
span.setStatus(StatusCode.ERROR);
span.recordException(throwable);
}
span.end();
}
}

private static void finishSpanIfPresentInAttributes(
Map<String, Object> attributes, Throwable throwable) {
io.opentelemetry.context.Context context =
(io.opentelemetry.context.Context) attributes.remove(CONTEXT_ATTRIBUTE);
finishSpanIfPresent(context, throwable);
}

public static class SpanFinishingSubscriber<T> implements CoreSubscriber<T>, Subscription {

private final CoreSubscriber<? super T> subscriber;
private final io.opentelemetry.context.Context otelContext;
private final Context context;
private final AtomicBoolean completed = new AtomicBoolean();
private Subscription subscription;

public SpanFinishingSubscriber(
CoreSubscriber<? super T> subscriber, io.opentelemetry.context.Context otelContext) {
this.subscriber = subscriber;
this.otelContext = otelContext;
context = subscriber.currentContext().put(Span.class, otelContext);
}

@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
try (Scope ignored = otelContext.makeCurrent()) {
subscriber.onSubscribe(this);
}
}

@Override
public void onNext(T t) {
try (Scope ignored = otelContext.makeCurrent()) {
subscriber.onNext(t);
}
}

@Override
public void onError(Throwable t) {
if (completed.compareAndSet(false, true)) {
finishSpanIfPresent(otelContext, t);
}
subscriber.onError(t);
}

@Override
public void onComplete() {
if (completed.compareAndSet(false, true)) {
finishSpanIfPresent(otelContext, null);
}
subscriber.onComplete();
}

@Override
public Context currentContext() {
return context;
}

@Override
public void request(long n) {
subscription.request(n);
}

@Override
public void cancel() {
if (completed.compareAndSet(false, true)) {
finishSpanIfPresent(otelContext, null);
}
subscription.cancel();
instrumenter().end(context, null, null, throwable);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,12 @@

package io.opentelemetry.javaagent.instrumentation.spring.webflux.server;

import static io.opentelemetry.javaagent.instrumentation.spring.webflux.server.SpringWebfluxHttpServerTracer.tracer;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
Expand All @@ -41,41 +37,20 @@ public void transform(TypeTransformer transformer) {
this.getClass().getName() + "$HandleAdvice");
}

/**
* This is 'top level' advice for Webflux instrumentation. This handles creating and finishing
* Webflux span.
*/
@SuppressWarnings("unused")
public static class HandleAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void methodEnter(
@Advice.Argument(0) ServerWebExchange exchange,
@Advice.Local("otelScope") Scope otelScope,
@Advice.Local("otelContext") Context otelContext) {

otelContext = tracer().startSpan("DispatcherHandler.handle", SpanKind.INTERNAL);
// Unfortunately Netty EventLoop is not instrumented well enough to attribute all work to the
// right things so we have to store the context in request itself.
exchange.getAttributes().put(AdviceUtils.CONTEXT_ATTRIBUTE, otelContext);

otelScope = otelContext.makeCurrent();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void methodExit(
@Advice.Thrown Throwable throwable,
@Advice.Argument(0) ServerWebExchange exchange,
@Advice.Return(readOnly = false) Mono<Void> mono,
@Advice.Local("otelScope") Scope otelScope,
@Advice.Local("otelContext") Context otelContext) {
if (throwable == null && mono != null) {
mono = AdviceUtils.setPublisherSpan(mono, otelContext);
} else if (throwable != null) {
AdviceUtils.finishSpanIfPresent(exchange, throwable);
@Advice.Return(readOnly = false) Mono<Void> mono) {
if (mono != null) {
// note: it seems like this code should go in HandleAdvice @OnMethodExit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean Enter? This code seems to be in HandleAdvice.OnMethodExit right now

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh no, two different HandleAdvice classes, I fixed the comment 👍

// but for some reason "GET to bad endpoint annotation API fail Mono" test fails
// with that placement
mono = AdviceUtils.end(mono, exchange);
}
otelScope.close();
// span finished in SpanFinishingSubscriber
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
import static io.opentelemetry.javaagent.instrumentation.spring.webflux.server.WebfluxSingletons.instrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isAbstract;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
Expand Down Expand Up @@ -64,55 +65,70 @@ public static class HandleAdvice {
public static void methodEnter(
@Advice.Argument(0) ServerWebExchange exchange,
@Advice.Argument(1) Object handler,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {

Context context = exchange.getAttribute(AdviceUtils.CONTEXT_ATTRIBUTE);
if (handler != null && context != null) {
Span span = Span.fromContext(context);
String handlerType;
String spanName;

if (handler instanceof HandlerMethod) {
// Special case for requests mapped with annotations
HandlerMethod handlerMethod = (HandlerMethod) handler;
spanName = SpanNames.fromMethod(handlerMethod.getMethod());
handlerType = handlerMethod.getMethod().getDeclaringClass().getName();
} else {
spanName = AdviceUtils.spanNameForHandler(handler);
handlerType = handler.getClass().getName();
}

span.updateName(spanName);
if (SpringWebfluxConfig.captureExperimentalSpanAttributes()) {
span.setAttribute("spring-webflux.handler.type", handlerType);
}

scope = context.makeCurrent();
Context parentContext = Context.current();

Span serverSpan = ServerSpan.fromContextOrNull(parentContext);

PathPattern bestPattern =
exchange.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE);
if (serverSpan != null && bestPattern != null) {
serverSpan.updateName(
ServletContextPath.prepend(Context.current(), bestPattern.toString()));
}

if (handler == null) {
return;
}

if (context != null) {
Span serverSpan = ServerSpan.fromContextOrNull(context);
if (!instrumenter().shouldStart(parentContext, null)) {
return;
}

context = instrumenter().start(parentContext, null);

Span span = Span.fromContext(context);
String handlerType;
String spanName;

PathPattern bestPattern =
exchange.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE);
if (serverSpan != null && bestPattern != null) {
serverSpan.updateName(
ServletContextPath.prepend(Context.current(), bestPattern.toString()));
}
if (handler instanceof HandlerMethod) {
// Special case for requests mapped with annotations
HandlerMethod handlerMethod = (HandlerMethod) handler;
spanName = SpanNames.fromMethod(handlerMethod.getMethod());
handlerType = handlerMethod.getMethod().getDeclaringClass().getName();
} else {
spanName = AdviceUtils.spanNameForHandler(handler);
handlerType = handler.getClass().getName();
}

span.updateName(spanName);
if (SpringWebfluxConfig.captureExperimentalSpanAttributes()) {
span.setAttribute("spring-webflux.handler.type", handlerType);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move that code inside instrumenter.start() call?


scope = context.makeCurrent();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void methodExit(
@Advice.Argument(0) ServerWebExchange exchange,
@Advice.Thrown Throwable throwable,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
if (throwable != null) {
AdviceUtils.finishSpanIfPresent(exchange, throwable);
if (scope == null) {
return;
}
if (scope != null) {
scope.close();
// span finished in SpanFinishingSubscriber
scope.close();

if (throwable != null) {
instrumenter().end(context, null, null, throwable);
} else {
exchange.getAttributes().put(AdviceUtils.CONTEXT_ATTRIBUTE, context);
// span finished by wrapped Mono in DispatcherHandlerInstrumentation
// the Mono is already wrapped at this point, but doesn't read the CONTEXT_ATTRIBUTE until
// the Mono is resolved, which is after this point
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.servlet.ServletContextPath;
import io.opentelemetry.instrumentation.api.tracer.ServerSpan;
import io.opentelemetry.javaagent.instrumentation.spring.webflux.SpringWebfluxConfig;
import java.util.function.BiConsumer;
import java.util.regex.Pattern;
import org.springframework.web.reactive.function.server.HandlerFunction;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerRequest;

public class RouteOnSuccessOrError implements BiConsumer<HandlerFunction<?>, Throwable> {

Expand All @@ -23,26 +21,19 @@ public class RouteOnSuccessOrError implements BiConsumer<HandlerFunction<?>, Thr
private static final Pattern METHOD_REGEX =
Pattern.compile("^(GET|HEAD|POST|PUT|DELETE|CONNECT|OPTIONS|TRACE|PATCH) ");

private final RouterFunction routerFunction;
private final ServerRequest serverRequest;
private final RouterFunction<?> routerFunction;

public RouteOnSuccessOrError(RouterFunction routerFunction, ServerRequest serverRequest) {
public RouteOnSuccessOrError(RouterFunction<?> routerFunction) {
this.routerFunction = routerFunction;
this.serverRequest = serverRequest;
}

@Override
public void accept(HandlerFunction<?> handler, Throwable throwable) {
if (handler != null) {
String predicateString = parsePredicateString();
if (predicateString != null) {
Context context = (Context) serverRequest.attributes().get(AdviceUtils.CONTEXT_ATTRIBUTE);
Context context = Context.current();
if (context != null) {
if (SpringWebfluxConfig.captureExperimentalSpanAttributes()) {
Span span = Span.fromContext(context);
span.setAttribute("spring-webflux.request.predicate", predicateString);
}

Span serverSpan = ServerSpan.fromContextOrNull(context);
if (serverSpan != null) {
serverSpan.updateName(ServletContextPath.prepend(context, parseRoute(predicateString)));
Expand Down
Loading