Skip to content

Commit

Permalink
Implement HTTP resend spec for Reactor Netty (excl CONNECT spans) (#8111
Browse files Browse the repository at this point in the history
)

Co-authored-by: Trask Stalnaker <[email protected]>
  • Loading branch information
Mateusz Rzeszutek and trask authored Jul 21, 2023
1 parent bd8ddf4 commit 718fa0d
Show file tree
Hide file tree
Showing 15 changed files with 285 additions and 252 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,19 @@ public void accept(M message, Throwable throwable) {
@Nullable
private static Context getChannelContext(
ContextView contextView, PropagatedContext propagatedContext) {

InstrumentationContexts contexts =
contextView.getOrDefault(ReactorContextKeys.CONTEXTS_HOLDER_KEY, null);
if (contexts == null) {
return null;
}

Context context = null;
if (propagatedContext.useClientContext) {
context = contextView.getOrDefault(ReactorContextKeys.CLIENT_CONTEXT_KEY, null);
context = contexts.getClientContext();
}
if (context == null) {
context = contextView.getOrDefault(ReactorContextKeys.CLIENT_PARENT_CONTEXT_KEY, null);
context = contexts.getParentContext();
}
return context;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,17 @@

package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;

import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorContextKeys.CLIENT_CONTEXT_KEY;
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorContextKeys.CLIENT_PARENT_CONTEXT_KEY;
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorNettySingletons.instrumenter;
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorContextKeys.CONTEXTS_HOLDER_KEY;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientResend;
import io.opentelemetry.instrumentation.netty.v4_1.NettyClientTelemetry;
import io.opentelemetry.instrumentation.reactor.v3_1.ContextPropagationOperator;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiConsumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientConfig;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;

Expand All @@ -29,25 +24,26 @@ public final class HttpResponseReceiverInstrumenter {
// this method adds several stateful listeners that execute the instrumenter lifecycle during HTTP
// request processing
// it should be used just before one of the response*() methods is called - after this point the
// HTTP
// request is no longer modifiable by the user
// HTTP request is no longer modifiable by the user
@Nullable
public static HttpClient.ResponseReceiver<?> instrument(HttpClient.ResponseReceiver<?> receiver) {
// receiver should always be an HttpClientFinalizer, which both extends HttpClient and
// implements ResponseReceiver
if (receiver instanceof HttpClient) {
HttpClient client = (HttpClient) receiver;
HttpClientConfig config = client.configuration();

ContextHolder contextHolder = new ContextHolder();
InstrumentationContexts instrumentationContexts = new InstrumentationContexts();

HttpClient modified =
client
.mapConnect(new StartOperation(contextHolder, config))
.doOnRequest(new PropagateContext(contextHolder))
.doOnRequestError(new EndOperationWithRequestError(contextHolder, config))
.doOnResponseError(new EndOperationWithResponseError(contextHolder, config))
.doAfterResponseSuccess(new EndOperationWithSuccess(contextHolder, config));
.mapConnect(new CaptureParentContext(instrumentationContexts))
.doOnRequestError(new EndOperationWithRequestError(instrumentationContexts))
.doOnRequest(new StartOperation(instrumentationContexts))
.doOnResponseError(new EndOperationWithResponseError(instrumentationContexts))
.doAfterResponseSuccess(new EndOperationWithSuccess(instrumentationContexts))
// end the current span on redirects; StartOperation will start another one for the
// next resend
.doOnRedirect(new EndOperationWithSuccess(instrumentationContexts));

// modified should always be an HttpClientFinalizer too
if (modified instanceof HttpClient.ResponseReceiver) {
Expand All @@ -58,151 +54,106 @@ public static HttpClient.ResponseReceiver<?> instrument(HttpClient.ResponseRecei
return null;
}

static final class ContextHolder {

private static final AtomicReferenceFieldUpdater<ContextHolder, Context> contextUpdater =
AtomicReferenceFieldUpdater.newUpdater(ContextHolder.class, Context.class, "context");

volatile Context parentContext;
volatile Context context;

void setContext(Context context) {
contextUpdater.set(this, context);
}

Context getAndRemoveContext() {
return contextUpdater.getAndSet(this, null);
}
}

static final class StartOperation
private static final class CaptureParentContext
implements Function<Mono<? extends Connection>, Mono<? extends Connection>> {

private final ContextHolder contextHolder;
private final HttpClientConfig config;
private final InstrumentationContexts instrumentationContexts;

StartOperation(ContextHolder contextHolder, HttpClientConfig config) {
this.contextHolder = contextHolder;
this.config = config;
CaptureParentContext(InstrumentationContexts instrumentationContexts) {
this.instrumentationContexts = instrumentationContexts;
}

@Override
public Mono<? extends Connection> apply(Mono<? extends Connection> mono) {
return Mono.defer(
() -> {
Context parentContext = Context.current();
contextHolder.parentContext = parentContext;
if (!instrumenter().shouldStart(parentContext, config)) {
// make context accessible via the reactor ContextView - the doOn* callbacks
// instrumentation uses this to set the proper context for callbacks
return mono.contextWrite(
ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext));
}

Context context = instrumenter().start(parentContext, config);
contextHolder.setContext(context);
return ContextPropagationOperator.runWithContext(mono, context)
// make contexts accessible via the reactor ContextView - the doOn* callbacks
// instrumentation uses the parent context to set the proper context for
// callbacks
.contextWrite(ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext))
.contextWrite(ctx -> ctx.put(CLIENT_CONTEXT_KEY, context));
instrumentationContexts.initialize(parentContext);
// make contexts accessible via the reactor ContextView - the doOn* callbacks
// instrumentation uses this to set the proper context for callbacks
return mono.contextWrite(
ctx -> ctx.put(CONTEXTS_HOLDER_KEY, instrumentationContexts));
})
.doOnCancel(
() -> {
Context context = contextHolder.getAndRemoveContext();
if (context == null) {
return;
}
instrumenter().end(context, config, null, null);
});
// if there's still any span in flight, end it
.doOnCancel(() -> instrumentationContexts.endClientSpan(null, null));
}
}

static final class PropagateContext implements BiConsumer<HttpClientRequest, Connection> {
private static final class StartOperation implements BiConsumer<HttpClientRequest, Connection> {

private final ContextHolder contextHolder;
private final InstrumentationContexts instrumentationContexts;

PropagateContext(ContextHolder contextHolder) {
this.contextHolder = contextHolder;
StartOperation(InstrumentationContexts instrumentationContexts) {
this.instrumentationContexts = instrumentationContexts;
}

@Override
public void accept(HttpClientRequest httpClientRequest, Connection connection) {
Context context = contextHolder.context;
if (context != null) {
GlobalOpenTelemetry.getPropagators()
.getTextMapPropagator()
.inject(context, httpClientRequest, HttpClientRequestHeadersSetter.INSTANCE);
}
public void accept(HttpClientRequest request, Connection connection) {
Context context = instrumentationContexts.startClientSpan(request);

// also propagate the context to the underlying netty instrumentation
// if this span was suppressed and context is null, propagate parentContext - this will allow
// netty spans to be suppressed too
Context nettyParentContext = context == null ? contextHolder.parentContext : context;
Context nettyParentContext =
context == null ? instrumentationContexts.getParentContext() : context;
NettyClientTelemetry.setChannelContext(connection.channel(), nettyParentContext);
}
}

static final class EndOperationWithRequestError
private static final class EndOperationWithRequestError
implements BiConsumer<HttpClientRequest, Throwable> {

private final ContextHolder contextHolder;
private final HttpClientConfig config;
private final InstrumentationContexts instrumentationContexts;

EndOperationWithRequestError(ContextHolder contextHolder, HttpClientConfig config) {
this.contextHolder = contextHolder;
this.config = config;
EndOperationWithRequestError(InstrumentationContexts instrumentationContexts) {
this.instrumentationContexts = instrumentationContexts;
}

@Override
public void accept(HttpClientRequest httpClientRequest, Throwable error) {
Context context = contextHolder.getAndRemoveContext();
if (context == null) {
return;
public void accept(HttpClientRequest request, Throwable error) {
instrumentationContexts.endClientSpan(null, error);

if (HttpClientResend.get(instrumentationContexts.getParentContext()) == 0) {
// TODO: emit connection error span

// FIXME: this branch requires lots of changes around the NettyConnectionInstrumenter
// currently it also creates that connection error span (when the connection telemetry is
// turned off), but without HTTP semantics - it does not have access to any HTTP information
// after all
// it should be possible to completely disable it, and just start and end the span here
// this requires lots of refactoring and pretty uninteresting changes in the netty code, so
// I'll do that in a separate PR - for better readability
}
instrumenter().end(context, config, null, error);
}
}

static final class EndOperationWithResponseError
private static final class EndOperationWithResponseError
implements BiConsumer<HttpClientResponse, Throwable> {

private final ContextHolder contextHolder;
private final HttpClientConfig config;
private final InstrumentationContexts instrumentationContexts;

EndOperationWithResponseError(ContextHolder contextHolder, HttpClientConfig config) {
this.contextHolder = contextHolder;
this.config = config;
EndOperationWithResponseError(InstrumentationContexts instrumentationContexts) {
this.instrumentationContexts = instrumentationContexts;
}

@Override
public void accept(HttpClientResponse response, Throwable error) {
Context context = contextHolder.getAndRemoveContext();
if (context == null) {
return;
}
instrumenter().end(context, config, response, error);
instrumentationContexts.endClientSpan(response, error);
}
}

static final class EndOperationWithSuccess implements BiConsumer<HttpClientResponse, Connection> {
private static final class EndOperationWithSuccess
implements BiConsumer<HttpClientResponse, Connection> {

private final ContextHolder contextHolder;
private final HttpClientConfig config;
private final InstrumentationContexts instrumentationContexts;

EndOperationWithSuccess(ContextHolder contextHolder, HttpClientConfig config) {
this.contextHolder = contextHolder;
this.config = config;
EndOperationWithSuccess(InstrumentationContexts instrumentationContexts) {
this.instrumentationContexts = instrumentationContexts;
}

@Override
public void accept(HttpClientResponse response, Connection connection) {
Context context = contextHolder.getAndRemoveContext();
if (context == null) {
return;
}
instrumenter().end(context, config, response, null);
instrumentationContexts.endClientSpan(response, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;

import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorNettySingletons.instrumenter;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientResend;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;

final class InstrumentationContexts {

private static final Logger logger = Logger.getLogger(InstrumentationContexts.class.getName());

private volatile Context parentContext;
// on retries, reactor-netty starts the next resend attempt before it ends the previous one (i.e.
// it calls the callback functions in that order); thus for a short moment there can be 2
// coexisting HTTP client spans
private final Queue<RequestAndContext> clientContexts = new ArrayBlockingQueue<>(2, true);

void initialize(Context parentContext) {
this.parentContext = HttpClientResend.initialize(parentContext);
}

Context getParentContext() {
return parentContext;
}

@Nullable
Context getClientContext() {
RequestAndContext requestAndContext = clientContexts.peek();
return requestAndContext == null ? null : requestAndContext.context;
}

@Nullable
Context startClientSpan(HttpClientRequest request) {
Context parentContext = this.parentContext;
Context context = null;
if (instrumenter().shouldStart(parentContext, request)) {
context = instrumenter().start(parentContext, request);
if (!clientContexts.offer(new RequestAndContext(request, context))) {
// should not ever happen in reality
String message =
"Could not instrument HTTP client request; not enough space in the request queue";
logger.log(Level.FINE, message);
instrumenter().end(context, request, null, new IllegalStateException(message));
}
}
return context;
}

void endClientSpan(@Nullable HttpClientResponse response, @Nullable Throwable error) {
RequestAndContext requestAndContext = clientContexts.poll();
if (requestAndContext != null) {
instrumenter().end(requestAndContext.context, requestAndContext.request, response, error);
}
}

static final class RequestAndContext {
final HttpClientRequest request;
final Context context;

RequestAndContext(HttpClientRequest request, Context context) {
this.request = request;
this.context = context;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@

public final class ReactorContextKeys {

public static final String CLIENT_PARENT_CONTEXT_KEY =
ReactorContextKeys.class.getName() + ".client-parent-context";
public static final String CLIENT_CONTEXT_KEY =
ReactorContextKeys.class.getName() + ".client-context";
public static final String CONTEXTS_HOLDER_KEY =
ReactorContextKeys.class.getName() + ".contexts-holder";

private ReactorContextKeys() {}
}
Loading

0 comments on commit 718fa0d

Please sign in to comment.