Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDI context propagation improvements for the reactive stack #27443

Merged
merged 3 commits into from
Sep 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public interface Capability {

String REST = QUARKUS_PREFIX + "rest";
String REST_CLIENT = REST + ".client";
String REST_CLIENT_REACTIVE = REST_CLIENT + ".reactive";
String REST_JACKSON = REST + ".jackson";
String REST_JSONB = REST + ".jsonb";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void registerFeature(BuildProducer<FeatureBuildItem> feature, Capabilities capab
"'quarkus-narayana-lra' can only work if 'quarkus-resteasy-jackson' or 'quarkus-resteasy-reactive-jackson' is present");
}

if (!capabilities.isPresent(Capability.REST_CLIENT)) {
if (!capabilities.isCapabilityWithPrefixPresent(Capability.REST_CLIENT)) {
throw new IllegalStateException(
"'quarkus-narayana-lra' can only work if 'quarkus-rest-client' or 'quarkus-rest-client-reactive' is present");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;

import org.jboss.jandex.AnnotationInstance;
Expand All @@ -21,6 +20,8 @@
import io.quarkus.arc.processor.AnnotationsTransformer;
import io.quarkus.arc.processor.InterceptorBindingRegistrar;
import io.quarkus.bootstrap.classloading.QuarkusClassLoader;
import io.quarkus.deployment.Capabilities;
import io.quarkus.deployment.Capability;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.BuildSteps;
Expand Down Expand Up @@ -53,15 +54,6 @@ public class OpenTelemetryProcessor {
io.opentelemetry.extension.annotations.SpanAttribute.class.getName());;
private static final DotName SPAN_ATTRIBUTE = DotName.createSimple(SpanAttribute.class.getName());

static class RestClientAvailable implements BooleanSupplier {
private static final boolean IS_REST_CLIENT_AVAILABLE = isClassPresent("javax.ws.rs.client.ClientRequestFilter");

@Override
public boolean getAsBoolean() {
return IS_REST_CLIENT_AVAILABLE;
}
}

@BuildStep
AdditionalBeanBuildItem ensureProducerIsRetained() {
return AdditionalBeanBuildItem.builder()
Expand Down Expand Up @@ -138,11 +130,15 @@ public void transform(TransformationContext context) {
}));
}

@BuildStep(onlyIf = RestClientAvailable.class)
void registerProvider(BuildProducer<AdditionalIndexedClassesBuildItem> additionalIndexed,
@BuildStep
void registerRestClientClassicProvider(
Capabilities capabilities,
BuildProducer<AdditionalIndexedClassesBuildItem> additionalIndexed,
BuildProducer<AdditionalBeanBuildItem> additionalBeans) {
additionalIndexed.produce(new AdditionalIndexedClassesBuildItem(OpenTelemetryClientFilter.class.getName()));
additionalBeans.produce(new AdditionalBeanBuildItem(OpenTelemetryClientFilter.class));
if (capabilities.isPresent(Capability.REST_CLIENT) && capabilities.isMissing(Capability.REST_CLIENT_REACTIVE)) {
additionalIndexed.produce(new AdditionalIndexedClassesBuildItem(OpenTelemetryClientFilter.class.getName()));
additionalBeans.produce(new AdditionalBeanBuildItem(OpenTelemetryClientFilter.class));
}
}

@BuildStep
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package io.quarkus.opentelemetry.runtime;

import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setContextSafe;
import static io.smallrye.common.vertx.VertxContext.getOrCreateDuplicatedContext;
import static io.smallrye.common.vertx.VertxContext.isDuplicatedContext;

import org.jboss.logging.Logger;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextStorage;
import io.opentelemetry.context.Scope;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Vertx;

/**
Expand Down Expand Up @@ -118,8 +118,10 @@ public static Context getContext(io.vertx.core.Context vertxContext) {
*/
public static io.vertx.core.Context getVertxContext() {
io.vertx.core.Context context = Vertx.currentContext();
if (context != null) {
io.vertx.core.Context dc = getOrCreateDuplicatedContext(context);
if (context != null && VertxContext.isOnDuplicatedContext()) {
return context;
} else if (context != null) {
io.vertx.core.Context dc = VertxContext.createNewDuplicatedContext(context);
setContextSafe(dc, true);
return dc;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import io.quarkus.opentelemetry.runtime.QuarkusContextStorage;

/**
* A client filter for the JAX-RS Client and MicroProfile REST Client that records OpenTelemetry data.
* A client filter for the JAX-RS Client and MicroProfile REST Client that records OpenTelemetry data. This is only used
* by RESTEasy Classic, because the handling implementation is provided by RESTEasy. This is not used by RESTEasy
* Reactive because tracing is handled by Vert.x.
*/
@Unremovable
@Provider
Expand Down Expand Up @@ -77,9 +79,19 @@ public void filter(final ClientRequestContext request) {
if (parentContext == null) {
parentContext = io.opentelemetry.context.Context.current();
}

// For each request, we need a new OTel Context from the **current one**
// the parent context needs to be the one from which the call originates.

if (instrumenter.shouldStart(parentContext, request)) {
Context spanContext = instrumenter.start(parentContext, request);
Scope scope = QuarkusContextStorage.INSTANCE.attach(vertxContext, spanContext);
// Create a new scope with an empty termination callback.
Scope scope = new Scope() {
@Override
public void close() {

}
};
request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_CONTEXT, spanContext);
request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_PARENT_CONTEXT, parentContext);
request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_SCOPE, scope);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package io.quarkus.opentelemetry.runtime.tracing.vertx;

import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setContextSafe;

import java.util.Map;
import java.util.function.BiConsumer;

import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.quarkus.opentelemetry.runtime.QuarkusContextStorage;
import io.quarkus.opentelemetry.runtime.tracing.vertx.OpenTelemetryVertxTracer.SpanOperation;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Context;
import io.vertx.core.MultiMap;
import io.vertx.core.http.impl.headers.HeadersAdaptor;
Expand Down Expand Up @@ -92,10 +89,14 @@ default <R> SpanOperation sendRequest(
if (instrumenter.shouldStart(parentContext, (REQ) request)) {
io.opentelemetry.context.Context spanContext = instrumenter.start(parentContext,
writableHeaders((REQ) request, headers));
Context duplicatedContext = VertxContext.createNewDuplicatedContext(context);
setContextSafe(duplicatedContext, true);
Scope scope = QuarkusContextStorage.INSTANCE.attach(duplicatedContext, spanContext);
return spanOperation(duplicatedContext, (REQ) request, toMultiMap(headers), spanContext, scope);
// Create a new scope with an empty termination callback.
Scope scope = new Scope() {
@Override
public void close() {

}
};
cescoffier marked this conversation as resolved.
Show resolved Hide resolved
return spanOperation(context, (REQ) request, toMultiMap(headers), spanContext, scope);
}

return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package io.quarkus.resteasy.reactive.server.test;

import static org.awaitility.Awaitility.await;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.RequestScoped;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.MediaType;

import org.jboss.shrinkwrap.api.asset.EmptyAsset;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;
import io.restassured.response.ResponseBody;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

public class RequestLeakDetectionTest {

@RegisterExtension
static QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar.addClasses(MyRestAPI.class, MyRequestScopeBean.class, Barrier.class, Task.class)
.addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml"));

@Inject
Barrier barrier;

@Test
public void testWithConcurrentCalls() {
List<String> results = new CopyOnWriteArrayList<>();
int count = 100;
barrier.setMaxConcurrency(count);
for (int i = 0; i < count; i++) {
int c = i;
new Thread(() -> {
ResponseBody<?> body = RestAssured.given().pathParam("val", c).contentType(MediaType.APPLICATION_JSON)
mkouba marked this conversation as resolved.
Show resolved Hide resolved
.get("/test/{val}").thenReturn().body();
String value = body.toString();
results.add(value);
}).start();
}
await().until(() -> results.size() == count);
Set<String> asSet = new HashSet<>(results);
Assertions.assertEquals(asSet.size(), count);
}

@Test
public void testWithConcurrentBlockingCalls() {
List<String> results = new CopyOnWriteArrayList<>();
int count = 100;
barrier.setMaxConcurrency(count);
for (int i = 0; i < count; i++) {
int c = i;
new Thread(() -> {
ResponseBody<?> body = RestAssured.given().pathParam("val", c).contentType(MediaType.APPLICATION_JSON)
.get("/test/blocking/{val}").thenReturn().body();
String value = body.toString();
results.add(value);
}).start();
}
await().until(() -> results.size() == count);
Set<String> asSet = new HashSet<>(results);
Assertions.assertEquals(asSet.size(), count);
}

@ApplicationScoped
@Path("/test")
public static class MyRestAPI {

@Inject
MyRequestScopeBean bean;

@Inject
Barrier barrier;

@GET
@Path("/{val}")
public Uni<Foo> foo(int val) {
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());
Vertx.currentContext().putLocal("count", val);
bean.setValue(val);

return Uni.createFrom().<Integer> emitter(e -> {
barrier.enqueue(Vertx.currentContext(), () -> {
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());
int r = Vertx.currentContext().getLocal("count");
Assertions.assertEquals(r, val);
e.complete(bean.getValue());
});
}).map(i -> new Foo(Integer.toString(i)));
}

@GET
@Path("/blocking/{val}")
public Foo blocking(int val) {
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());
Vertx.currentContext().putLocal("count", val);
bean.setValue(val);

return Uni.createFrom().<Integer> emitter(e -> {
barrier.enqueue(Vertx.currentContext(), () -> {
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());
int r = Vertx.currentContext().getLocal("count");
Assertions.assertEquals(r, val);
e.complete(bean.getValue());
});
})
.map(i -> new Foo(Integer.toString(i)))
.await().indefinitely();
}
}

@ApplicationScoped
public static class Barrier {

private int level;

public void setMaxConcurrency(int level) {
this.level = level;
}

private final AtomicInteger counter = new AtomicInteger();
private final List<Task> tasks = new CopyOnWriteArrayList<>();

public void enqueue(Context context, Runnable runnable) {
Task task = new Task(context, runnable);
tasks.add(task);
if (counter.incrementAndGet() >= level) {
for (Task tbr : new ArrayList<>(tasks)) {
tbr.run();
tasks.remove(tbr);
}
}
}
}

private static class Task {
private final Context context;
private final Runnable runnable;

private Task(Context context, Runnable runnable) {
this.context = context;
this.runnable = runnable;
}

void run() {
context.runOnContext(x -> runnable.run());
}
}

@RequestScoped
public static class MyRequestScopeBean {

private int value = -1;

public void setValue(int v) {
if (value != -1) {
throw new IllegalStateException("Already initialized");
}
value = v;
}

public int getValue() {
return value;
}

}

public static class Foo {

public final String value;

public Foo(String value) {
this.value = value;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ protected void handleRequestScopeActivation() {
}
}

@Override
protected void requestScopeDeactivated() {
// we intentionally don't call 'CurrentRequestManager.set(null)'
// because there is no need to clear the current request
// as that is backed by a DuplicatedContext and not accessible to other requests anyway
}

protected SecurityContext createSecurityContext() {
return new ResteasyReactiveSecurityContext(context);
}
Expand Down
Loading