Skip to content

Commit

Permalink
Merge pull request quarkusio#24134 from cescoffier/rest-client-captur…
Browse files Browse the repository at this point in the history
…e-and-propagate-context

Properly captures and restores the vert.x context in the reactive REST Client
  • Loading branch information
cescoffier authored Mar 8, 2022
2 parents 5238c2d + af67521 commit bf28520
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public static Context getContext(io.vertx.core.Context vertxContext) {
*
* @return a duplicated Vert.x Context or null.
*/
private static io.vertx.core.Context getVertxContext() {
public static io.vertx.core.Context getVertxContext() {
io.vertx.core.Context context = Vertx.currentContext();
if (context != null) {
io.vertx.core.Context dc = getOrCreateDuplicatedContext(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import javax.ws.rs.client.ClientRequestFilter;
import javax.ws.rs.client.ClientResponseContext;
import javax.ws.rs.client.ClientResponseFilter;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.ext.Provider;

Expand All @@ -28,17 +27,10 @@
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanStatusExtractor;
import io.quarkus.arc.Unremovable;
import io.quarkus.opentelemetry.runtime.QuarkusContextStorage;

/**
* A client filter for the JAX-RS Client and MicroProfile REST Client that records OpenTelemetry data.
*
* For the Resteasy Reactive Client, we skip the OpenTelemetry registration, since this can be handled by the
* {@link io.quarkus.opentelemetry.runtime.tracing.vertx.OpenTelemetryVertxTracer}. In theory, this wouldn't be an
* issue, because the OpenTelemetry Instrumenter detects two Client Span and merge both together, but they need to be
* executed with the same OpenTelemetry Context. Right now, the Reactive REST Client filters are executed outside the
* Vert.x Context, so we are unable to propagate the OpenTelemetry Context. This is also not a big issue, because the
* correct OpenTelemetry data will be populated in Vert.x. The only missing piece is the route name available in
* io.quarkus.resteasy.reactive.server.runtime.observability.ObservabilityHandler, which is not propagated to Vert.x.
*/
@Unremovable
@Provider
Expand All @@ -47,7 +39,15 @@ public class OpenTelemetryClientFilter implements ClientRequestFilter, ClientRes
public static final String REST_CLIENT_OTEL_SPAN_CLIENT_PARENT_CONTEXT = "otel.span.client.parentContext";
public static final String REST_CLIENT_OTEL_SPAN_CLIENT_SCOPE = "otel.span.client.scope";

private Instrumenter<ClientRequestContext, ClientResponseContext> instrumenter;
/**
* Property stored in the Client Request context to retrieve the captured Vert.x context.
* This context is captured and stored by the Reactive REST Client.
*
* We use this property to avoid having to depend on the Reactive REST Client explicitly.
*/
private static final String VERTX_CONTEXT_PROPERTY = "__context";

private final Instrumenter<ClientRequestContext, ClientResponseContext> instrumenter;

// RESTEasy requires no-arg constructor for CDI injection: https://issues.redhat.com/browse/RESTEASY-1538
// In Reactive Rest Client this is the constructor called. In the classic is the next one with injection.
Expand All @@ -72,20 +72,25 @@ public OpenTelemetryClientFilter(final OpenTelemetry openTelemetry) {

@Override
public void filter(final ClientRequestContext request) {
if (isReactiveClient(request)) {
return;
}

Context parentContext = Context.current();
if (instrumenter.shouldStart(parentContext, request)) {
Context spanContext = instrumenter.start(parentContext, request);
Scope scope = spanContext.makeCurrent();
Scope scope = QuarkusContextStorage.INSTANCE.attach(getVertxContext(request), spanContext);
request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_CONTEXT, spanContext);
request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_PARENT_CONTEXT, parentContext);
request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_SCOPE, scope);
}
}

private static io.vertx.core.Context getVertxContext(final ClientRequestContext request) {
io.vertx.core.Context vertxContext = (io.vertx.core.Context) request.getProperty(VERTX_CONTEXT_PROPERTY);
if (vertxContext == null) {
return QuarkusContextStorage.getVertxContext();
} else {
return vertxContext;
}
}

@Override
public void filter(final ClientRequestContext request, final ClientResponseContext response) {
Scope scope = (Scope) request.getProperty(REST_CLIENT_OTEL_SPAN_CLIENT_SCOPE);
Expand All @@ -106,7 +111,7 @@ public void filter(final ClientRequestContext request, final ClientResponseConte
}

static boolean isReactiveClient(final ClientRequestContext request) {
return "Resteasy Reactive Client".equals(request.getHeaderString(HttpHeaders.USER_AGENT));
return request.getProperty(VERTX_CONTEXT_PROPERTY) != null;
}

private static class ClientRequestContextTextMapSetter implements TextMapSetter<ClientRequestContext> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.quarkus.security.identity.SecurityIdentity;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.quarkus.vertx.http.runtime.security.QuarkusHttpUser;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.ext.web.RoutingContext;

public class QuarkusResteasyReactiveRequestContext extends VertxResteasyReactiveRequestContext {
Expand All @@ -27,7 +28,9 @@ public QuarkusResteasyReactiveRequestContext(Deployment deployment, ProvidersImp
CurrentIdentityAssociation currentIdentityAssociation) {
super(deployment, providers, context, requestContext, handlerChain, abortHandlerChain, devModeTccl);
this.association = currentIdentityAssociation;
VertxContextSafetyToggle.setCurrentContextSafe(true);
if (VertxContext.isOnDuplicatedContext()) {
VertxContextSafetyToggle.setCurrentContextSafe(true);
}
}

protected void handleRequestScopeActivation() {
Expand Down
4 changes: 4 additions & 0 deletions independent-projects/resteasy-reactive/client/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
<groupId>io.vertx</groupId>
<artifactId>vertx-web-client</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.common</groupId>
<artifactId>smallrye-common-vertx-context</artifactId>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.smallrye.mutiny.Uni;
import io.smallrye.stork.Stork;
import io.smallrye.stork.api.ServiceInstance;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
Expand All @@ -25,6 +26,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.ws.rs.InternalServerErrorException;
Expand All @@ -40,6 +42,7 @@
import org.jboss.resteasy.reactive.client.api.LoggingScope;
import org.jboss.resteasy.reactive.client.api.QuarkusRestClientProperties;
import org.jboss.resteasy.reactive.client.impl.AsyncInvokerImpl;
import org.jboss.resteasy.reactive.client.impl.ClientRequestContextImpl;
import org.jboss.resteasy.reactive.client.impl.RestClientRequestContext;
import org.jboss.resteasy.reactive.client.impl.multipart.PausableHttpPostRequestEncoder;
import org.jboss.resteasy.reactive.client.impl.multipart.QuarkusMultipartForm;
Expand Down Expand Up @@ -72,7 +75,30 @@ public void handle(RestClientRequestContext requestContext) {
return;
}
requestContext.suspend();
Uni<HttpClientRequest> future = createRequest(requestContext);
Uni<HttpClientRequest> future = createRequest(requestContext)
.runSubscriptionOn(new Executor() {
@Override
public void execute(Runnable command) {
Context current = Vertx.currentContext();
ClientRequestContextImpl clientRequestContext = requestContext.getClientRequestContext();
Context captured = null;
if (clientRequestContext != null) {
captured = clientRequestContext.getContext();
}
if (current == captured || captured == null) {
// No need to switch to another context.
command.run();
} else {
// Switch back to the captured context
captured.runOnContext(new Handler<Void>() {
@Override
public void handle(Void ignored) {
command.run();
}
});
}
}
});

// DNS failures happen before we send the request
future.subscribe().with(new Consumer<>() {
Expand All @@ -91,6 +117,7 @@ public void accept(HttpClientRequest httpClientRequest) {

Pipe<Buffer> pipe = actualEntity.pipe(); // Shouldn't this be called in an earlier phase ?
requestPromise.future().onComplete(ar -> {

if (ar.succeeded()) {
HttpClientRequest req = ar.result();
if (httpClientRequest.headers() == null
Expand All @@ -109,7 +136,6 @@ public void accept(HttpClientRequest httpClientRequest) {
}
});
sent = httpClientRequest.response();

requestPromise.complete(httpClientRequest);
} catch (Throwable e) {
reportFinish(e, requestContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;

import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Context;
import java.io.OutputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;
Expand Down Expand Up @@ -43,14 +45,24 @@ public class ClientRequestContextImpl implements ResteasyReactiveClientRequestCo
private final ConfigurationImpl configuration;
private final RestClientRequestContext restClientRequestContext;
private final ClientRequestHeadersMap headersMap;
private OutputStream entityStream;
private final Context context;

public ClientRequestContextImpl(RestClientRequestContext restClientRequestContext, ClientImpl client,
ConfigurationImpl configuration) {
this.restClientRequestContext = restClientRequestContext;
this.client = client;
this.configuration = configuration;
this.headersMap = new ClientRequestHeadersMap(); //restClientRequestContext.requestHeaders.getHeaders()

// Capture or create a duplicated context, and store it.
Context current = client.vertx.getOrCreateContext();
this.context = VertxContext.getOrCreateDuplicatedContext(current);
restClientRequestContext.properties.put(VERTX_CONTEXT_PROPERTY, context);
}

@Override
public Context getContext() {
return context;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,27 @@
package org.jboss.resteasy.reactive.client.spi;

import io.vertx.core.Context;
import javax.ws.rs.client.ClientRequestContext;

public interface ResteasyReactiveClientRequestContext extends ClientRequestContext {

/**
* The property used to store the (duplicated) vert.x context with the request.
* This context is captured when the ResteasyReactiveClientRequestContext instance is created.
* If, at that moment, there is no context, a new duplicated context is created.
* If, we are executed on a root context, it creates a new duplicated context from it.
* Otherwise, (we are already on a duplicated context), it captures it.
*/
String VERTX_CONTEXT_PROPERTY = "__context";

void suspend();

void resume();

void resume(Throwable t);

/**
* @return the captured or created duplicated context. See {@link #VERTX_CONTEXT_PROPERTY} for details.
*/
Context getContext();
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,8 @@ void get() {
assertEquals(HttpMethod.GET.name(), ((Map<?, ?>) server.get("attributes")).get(HTTP_METHOD.getKey()));

assertEquals(SpanKind.CLIENT.toString(), client.get("kind"));
// TODO - radcortez - Requires a fix to pass in the UrlPathTemplate in the Vert.x Context. Check:
// io.quarkus.resteasy.reactive.server.runtime.observability.ObservabilityHandler
// org.jboss.resteasy.reactive.client.AsyncResultUni
//assertEquals("reactive", client.get("name"));
assertEquals("/reactive", client.get("name"));

assertEquals(HTTP_OK, ((Map<?, ?>) client.get("attributes")).get(HTTP_STATUS_CODE.getKey()));
assertEquals(HttpMethod.GET.name(), ((Map<?, ?>) client.get("attributes")).get(HTTP_METHOD.getKey()));
}
Expand Down Expand Up @@ -107,10 +105,8 @@ void post() {
assertEquals(HttpMethod.POST.name(), ((Map<?, ?>) server.get("attributes")).get(HTTP_METHOD.getKey()));

assertEquals(SpanKind.CLIENT.toString(), client.get("kind"));
// TODO - radcortez - Requires a fix to pass in the UrlPathTemplate in the Vert.x Context. Check:
// io.quarkus.resteasy.reactive.server.runtime.observability.ObservabilityHandler
// org.jboss.resteasy.reactive.client.AsyncResultUni
//assertEquals("reactive", client.get("name"));
assertEquals("/reactive", client.get("name"));

assertEquals(HTTP_OK, ((Map<?, ?>) client.get("attributes")).get(HTTP_STATUS_CODE.getKey()));
assertEquals(HttpMethod.POST.name(), ((Map<?, ?>) client.get("attributes")).get(HTTP_METHOD.getKey()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ public class DefaultCtorTestFilter implements ClientRequestFilter {

@Override
public void filter(ClientRequestContext requestContext) {
System.out.println(requestContext.getMethod());
// Do nothing on purpose.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,5 @@ public NonDefaultCtorTestFilter(ObjectMapper mapper) {
@Override
public void filter(ClientRequestContext requestContext) {
mapper.getFactory();
System.out.println(requestContext.getUri());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,11 @@ void shouldCreateClientSpans() {
Assertions.assertNotNull(spanData.get("attr_http.client_ip"));
Assertions.assertNotNull(spanData.get("attr_http.user_agent"));
} else if (spanData.get("kind").equals(SpanKind.CLIENT.toString())
&& spanData.get("name").equals("HTTP POST")) {
&& spanData.get("name").equals("/hello")) {
clientFound = true;
// Client span
Assertions.assertEquals("/hello", spanData.get("name"));

// TODO - radcortez - Requires a fix to pass in the UrlPathTemplate in the Vert.x Context. Check:
// io.quarkus.resteasy.reactive.server.runtime.observability.ObservabilityHandler
// org.jboss.resteasy.reactive.client.AsyncResultUni
//assertEquals("reactive", client.get("name"));
Assertions.assertEquals("HTTP POST", spanData.get("name"));
Assertions.assertEquals(SpanKind.CLIENT.toString(), spanData.get("kind"));
Assertions.assertTrue((Boolean) spanData.get("ended"));

Expand Down

0 comments on commit bf28520

Please sign in to comment.