Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement HTTP resend spec for Reactor Netty (excl CONNECT spans) #8111

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,19 @@ public void accept(M message, Throwable throwable) {
@Nullable
private static Context getChannelContext(
ContextView contextView, PropagatedContext propagatedContext) {

InstrumentationContexts contexts =
contextView.getOrDefault(ReactorContextKeys.CONTEXTS_HOLDER_KEY, null);
if (contexts == null) {
return null;
}

Context context = null;
if (propagatedContext.useClientContext) {
context = contextView.getOrDefault(ReactorContextKeys.CLIENT_CONTEXT_KEY, null);
context = contexts.getClientContext();
}
if (context == null) {
context = contextView.getOrDefault(ReactorContextKeys.CLIENT_PARENT_CONTEXT_KEY, null);
context = contexts.getParentContext();
}
return context;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,17 @@

package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;

import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorContextKeys.CLIENT_CONTEXT_KEY;
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorContextKeys.CLIENT_PARENT_CONTEXT_KEY;
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorNettySingletons.instrumenter;
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorContextKeys.CONTEXTS_HOLDER_KEY;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientResend;
import io.opentelemetry.instrumentation.netty.v4_1.NettyClientTelemetry;
import io.opentelemetry.instrumentation.reactor.v3_1.ContextPropagationOperator;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiConsumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientConfig;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;

Expand All @@ -29,25 +24,26 @@ public final class HttpResponseReceiverInstrumenter {
// this method adds several stateful listeners that execute the instrumenter lifecycle during HTTP
// request processing
// it should be used just before one of the response*() methods is called - after this point the
// HTTP
// request is no longer modifiable by the user
// HTTP request is no longer modifiable by the user
@Nullable
public static HttpClient.ResponseReceiver<?> instrument(HttpClient.ResponseReceiver<?> receiver) {
// receiver should always be an HttpClientFinalizer, which both extends HttpClient and
// implements ResponseReceiver
if (receiver instanceof HttpClient) {
HttpClient client = (HttpClient) receiver;
HttpClientConfig config = client.configuration();

ContextHolder contextHolder = new ContextHolder();
InstrumentationContexts instrumentationContexts = new InstrumentationContexts();

HttpClient modified =
client
.mapConnect(new StartOperation(contextHolder, config))
.doOnRequest(new PropagateContext(contextHolder))
.doOnRequestError(new EndOperationWithRequestError(contextHolder, config))
.doOnResponseError(new EndOperationWithResponseError(contextHolder, config))
.doAfterResponseSuccess(new EndOperationWithSuccess(contextHolder, config));
.mapConnect(new CaptureParentContext(instrumentationContexts))
.doOnRequestError(new EndOperationWithRequestError(instrumentationContexts))
.doOnRequest(new StartOperation(instrumentationContexts))
.doOnResponseError(new EndOperationWithResponseError(instrumentationContexts))
.doAfterResponseSuccess(new EndOperationWithSuccess(instrumentationContexts))
// end the current span on redirects; StartOperation will start another one for the
// next resend
.doOnRedirect(new EndOperationWithSuccess(instrumentationContexts));

// modified should always be an HttpClientFinalizer too
if (modified instanceof HttpClient.ResponseReceiver) {
Expand All @@ -58,151 +54,106 @@ public static HttpClient.ResponseReceiver<?> instrument(HttpClient.ResponseRecei
return null;
}

static final class ContextHolder {

private static final AtomicReferenceFieldUpdater<ContextHolder, Context> contextUpdater =
AtomicReferenceFieldUpdater.newUpdater(ContextHolder.class, Context.class, "context");

volatile Context parentContext;
volatile Context context;

void setContext(Context context) {
contextUpdater.set(this, context);
}

Context getAndRemoveContext() {
return contextUpdater.getAndSet(this, null);
}
}

static final class StartOperation
private static final class CaptureParentContext
implements Function<Mono<? extends Connection>, Mono<? extends Connection>> {

private final ContextHolder contextHolder;
private final HttpClientConfig config;
private final InstrumentationContexts instrumentationContexts;

StartOperation(ContextHolder contextHolder, HttpClientConfig config) {
this.contextHolder = contextHolder;
this.config = config;
CaptureParentContext(InstrumentationContexts instrumentationContexts) {
this.instrumentationContexts = instrumentationContexts;
}

@Override
public Mono<? extends Connection> apply(Mono<? extends Connection> mono) {
return Mono.defer(
() -> {
Context parentContext = Context.current();
contextHolder.parentContext = parentContext;
if (!instrumenter().shouldStart(parentContext, config)) {
// make context accessible via the reactor ContextView - the doOn* callbacks
// instrumentation uses this to set the proper context for callbacks
return mono.contextWrite(
ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext));
}

Context context = instrumenter().start(parentContext, config);
contextHolder.setContext(context);
return ContextPropagationOperator.runWithContext(mono, context)
// make contexts accessible via the reactor ContextView - the doOn* callbacks
// instrumentation uses the parent context to set the proper context for
// callbacks
.contextWrite(ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext))
.contextWrite(ctx -> ctx.put(CLIENT_CONTEXT_KEY, context));
instrumentationContexts.initialize(parentContext);
// make contexts accessible via the reactor ContextView - the doOn* callbacks
// instrumentation uses this to set the proper context for callbacks
return mono.contextWrite(
ctx -> ctx.put(CONTEXTS_HOLDER_KEY, instrumentationContexts));
})
.doOnCancel(
() -> {
Context context = contextHolder.getAndRemoveContext();
if (context == null) {
return;
}
instrumenter().end(context, config, null, null);
});
// if there's still any span in flight, end it
.doOnCancel(() -> instrumentationContexts.endClientSpan(null, null));
}
}

static final class PropagateContext implements BiConsumer<HttpClientRequest, Connection> {
private static final class StartOperation implements BiConsumer<HttpClientRequest, Connection> {

private final ContextHolder contextHolder;
private final InstrumentationContexts instrumentationContexts;

PropagateContext(ContextHolder contextHolder) {
this.contextHolder = contextHolder;
StartOperation(InstrumentationContexts instrumentationContexts) {
this.instrumentationContexts = instrumentationContexts;
}

@Override
public void accept(HttpClientRequest httpClientRequest, Connection connection) {
Context context = contextHolder.context;
if (context != null) {
GlobalOpenTelemetry.getPropagators()
.getTextMapPropagator()
.inject(context, httpClientRequest, HttpClientRequestHeadersSetter.INSTANCE);
}
public void accept(HttpClientRequest request, Connection connection) {
Context context = instrumentationContexts.startClientSpan(request);

// also propagate the context to the underlying netty instrumentation
// if this span was suppressed and context is null, propagate parentContext - this will allow
// netty spans to be suppressed too
Context nettyParentContext = context == null ? contextHolder.parentContext : context;
Context nettyParentContext =
context == null ? instrumentationContexts.getParentContext() : context;
NettyClientTelemetry.setChannelContext(connection.channel(), nettyParentContext);
}
}

static final class EndOperationWithRequestError
private static final class EndOperationWithRequestError
implements BiConsumer<HttpClientRequest, Throwable> {

private final ContextHolder contextHolder;
private final HttpClientConfig config;
private final InstrumentationContexts instrumentationContexts;

EndOperationWithRequestError(ContextHolder contextHolder, HttpClientConfig config) {
this.contextHolder = contextHolder;
this.config = config;
EndOperationWithRequestError(InstrumentationContexts instrumentationContexts) {
this.instrumentationContexts = instrumentationContexts;
}

@Override
public void accept(HttpClientRequest httpClientRequest, Throwable error) {
Context context = contextHolder.getAndRemoveContext();
if (context == null) {
return;
public void accept(HttpClientRequest request, Throwable error) {
instrumentationContexts.endClientSpan(null, error);

if (HttpClientResend.get(instrumentationContexts.getParentContext()) == 0) {
// TODO: emit connection error span

// FIXME: this branch requires lots of changes around the NettyConnectionInstrumenter
// currently it also creates that connection error span (when the connection telemetry is
// turned off), but without HTTP semantics - it does not have access to any HTTP information
// after all
// it should be possible to completely disable it, and just start and end the span here
// this requires lots of refactoring and pretty uninteresting changes in the netty code, so
// I'll do that in a separate PR - for better readability
}
Comment on lines +116 to 126
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's where the connection span will be added -- similar to the ConnectionErrorSpanInterceptor in the OkHttp instrumentation, it's only emitted on error and when there were no HTTP requests made.

instrumenter().end(context, config, null, error);
}
}

static final class EndOperationWithResponseError
private static final class EndOperationWithResponseError
implements BiConsumer<HttpClientResponse, Throwable> {

private final ContextHolder contextHolder;
private final HttpClientConfig config;
private final InstrumentationContexts instrumentationContexts;

EndOperationWithResponseError(ContextHolder contextHolder, HttpClientConfig config) {
this.contextHolder = contextHolder;
this.config = config;
EndOperationWithResponseError(InstrumentationContexts instrumentationContexts) {
this.instrumentationContexts = instrumentationContexts;
}

@Override
public void accept(HttpClientResponse response, Throwable error) {
Context context = contextHolder.getAndRemoveContext();
if (context == null) {
return;
}
instrumenter().end(context, config, response, error);
instrumentationContexts.endClientSpan(response, error);
}
}

static final class EndOperationWithSuccess implements BiConsumer<HttpClientResponse, Connection> {
private static final class EndOperationWithSuccess
implements BiConsumer<HttpClientResponse, Connection> {

private final ContextHolder contextHolder;
private final HttpClientConfig config;
private final InstrumentationContexts instrumentationContexts;

EndOperationWithSuccess(ContextHolder contextHolder, HttpClientConfig config) {
this.contextHolder = contextHolder;
this.config = config;
EndOperationWithSuccess(InstrumentationContexts instrumentationContexts) {
this.instrumentationContexts = instrumentationContexts;
}

@Override
public void accept(HttpClientResponse response, Connection connection) {
Context context = contextHolder.getAndRemoveContext();
if (context == null) {
return;
}
instrumenter().end(context, config, response, null);
instrumentationContexts.endClientSpan(response, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;

import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorNettySingletons.instrumenter;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientResend;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;

final class InstrumentationContexts {

private static final Logger logger = Logger.getLogger(InstrumentationContexts.class.getName());

private volatile Context parentContext;
// on retries, reactor-netty starts the next resend attempt before it ends the previous one (i.e.
// it calls the callback functions in that order); thus for a short moment there can be 2
// coexisting HTTP client spans
private final Queue<RequestAndContext> clientContexts = new ArrayBlockingQueue<>(2, true);

void initialize(Context parentContext) {
this.parentContext = HttpClientResend.initialize(parentContext);
}

Context getParentContext() {
return parentContext;
}

@Nullable
Context getClientContext() {
RequestAndContext requestAndContext = clientContexts.peek();
return requestAndContext == null ? null : requestAndContext.context;
}

@Nullable
Context startClientSpan(HttpClientRequest request) {
Context parentContext = this.parentContext;
Context context = null;
if (instrumenter().shouldStart(parentContext, request)) {
context = instrumenter().start(parentContext, request);
if (!clientContexts.offer(new RequestAndContext(request, context))) {
// should not ever happen in reality
String message =
"Could not instrument HTTP client request; not enough space in the request queue";
logger.log(Level.FINE, message);
breedx-splk marked this conversation as resolved.
Show resolved Hide resolved
instrumenter().end(context, request, null, new IllegalStateException(message));
}
}
return context;
}

void endClientSpan(@Nullable HttpClientResponse response, @Nullable Throwable error) {
RequestAndContext requestAndContext = clientContexts.poll();
if (requestAndContext != null) {
breedx-splk marked this conversation as resolved.
Show resolved Hide resolved
instrumenter().end(requestAndContext.context, requestAndContext.request, response, error);
}
}

static final class RequestAndContext {
final HttpClientRequest request;
final Context context;

RequestAndContext(HttpClientRequest request, Context context) {
this.request = request;
this.context = context;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@

public final class ReactorContextKeys {

public static final String CLIENT_PARENT_CONTEXT_KEY =
ReactorContextKeys.class.getName() + ".client-parent-context";
public static final String CLIENT_CONTEXT_KEY =
ReactorContextKeys.class.getName() + ".client-context";
public static final String CONTEXTS_HOLDER_KEY =
ReactorContextKeys.class.getName() + ".contexts-holder";

private ReactorContextKeys() {}
}
Loading