From ac1a11e274b281c291933c394258d8caeae2ecc1 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Tue, 21 Dec 2021 10:53:40 +1100 Subject: [PATCH 1/3] CDI context propagation improvements for the reactive stack == ArC Introduce the VertxCurrentContextFactory so that the Vert.x duplicated context can be used to store the "current context" for normal scopes. == resteasy-reactive Don't clear our CDI current request when suspending. This used to be done in order to prevent subsequent requests running on the same thread as a suspended request from accessing the former's data. With the advent of DuplicatedContext backed storage, there is no longer any chance of mixing data so there is no need to clear it out. Furthermore, by not clearing out current request, code that accesses the request scoped CurrentVertxRequest that is executed while the request is suspended, can now work even if context propagation is not in play. == Tests Add leak detection tests for resteasy reactive, graphql and reactive rest client. Also improve the OpenTelemetry reactive tests. == OpenTelemetry Only register the OpenTelemetryClientFilter for RESTEasy Client Classic. Use Capabilities to determine when to register the OpenTelemetryClientFilter. Co-authored-by: Georgios Andrianakis Co-authored-by: Clement Escoffier Co-authored-by: Roberto Cortez Co-authored-by: brunobat Co-authored-by: Martin Kouba --- .../io/quarkus/deployment/Capability.java | 1 + .../deployment/OpenTelemetryProcessor.java | 24 +- .../runtime/QuarkusContextStorage.java | 8 +- .../restclient/OpenTelemetryClientFilter.java | 16 +- .../vertx/InstrumenterVertxTracer.java | 15 +- .../server/test/RequestLeakDetectionTest.java | 192 +++++++++++ ...QuarkusResteasyReactiveRequestContext.java | 7 + .../reactive/RequestLeakDetectionTest.java | 309 ++++++++++++++++++ .../rest-client-reactive/runtime/pom.xml | 2 +- .../smallrye-graphql/deployment/pom.xml | 2 +- .../deployment/RequestLeakDetectionTest.java | 185 +++++++++++ .../vertx/deployment/VertxProcessor.java | 7 + .../runtime/VertxCurrentContextFactory.java | 58 ++++ .../quarkus/vertx/runtime/VertxRecorder.java | 5 + .../client/impl/ClientRequestContextImpl.java | 11 +- .../client/impl/RestClientRequestContext.java | 5 + .../core/AbstractResteasyReactiveContext.java | 6 +- .../reactive/ReactiveResource.java | 15 +- .../reactive/OpenTelemetryReactiveTest.java | 112 ++++++- 19 files changed, 939 insertions(+), 41 deletions(-) create mode 100644 extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/RequestLeakDetectionTest.java create mode 100644 extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/RequestLeakDetectionTest.java create mode 100644 extensions/smallrye-graphql/deployment/src/test/java/io/quarkus/smallrye/graphql/deployment/RequestLeakDetectionTest.java create mode 100644 extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxCurrentContextFactory.java diff --git a/core/deployment/src/main/java/io/quarkus/deployment/Capability.java b/core/deployment/src/main/java/io/quarkus/deployment/Capability.java index 5af46ef4d0b2f..e5463d19ed91a 100644 --- a/core/deployment/src/main/java/io/quarkus/deployment/Capability.java +++ b/core/deployment/src/main/java/io/quarkus/deployment/Capability.java @@ -39,6 +39,7 @@ public interface Capability { String REST = QUARKUS_PREFIX + "rest"; String REST_CLIENT = REST + ".client"; + String REST_CLIENT_REACTIVE = REST_CLIENT + ".reactive"; String REST_JACKSON = REST + ".jackson"; String REST_JSONB = REST + ".jsonb"; diff --git a/extensions/opentelemetry/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/OpenTelemetryProcessor.java b/extensions/opentelemetry/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/OpenTelemetryProcessor.java index 004bc8cc6dc20..4a54dc1c8ea00 100644 --- a/extensions/opentelemetry/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/OpenTelemetryProcessor.java +++ b/extensions/opentelemetry/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/OpenTelemetryProcessor.java @@ -3,7 +3,6 @@ import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.function.BooleanSupplier; import java.util.stream.Collectors; import org.jboss.jandex.AnnotationInstance; @@ -21,6 +20,8 @@ import io.quarkus.arc.processor.AnnotationsTransformer; import io.quarkus.arc.processor.InterceptorBindingRegistrar; import io.quarkus.bootstrap.classloading.QuarkusClassLoader; +import io.quarkus.deployment.Capabilities; +import io.quarkus.deployment.Capability; import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; import io.quarkus.deployment.annotations.BuildSteps; @@ -53,15 +54,6 @@ public class OpenTelemetryProcessor { io.opentelemetry.extension.annotations.SpanAttribute.class.getName());; private static final DotName SPAN_ATTRIBUTE = DotName.createSimple(SpanAttribute.class.getName()); - static class RestClientAvailable implements BooleanSupplier { - private static final boolean IS_REST_CLIENT_AVAILABLE = isClassPresent("javax.ws.rs.client.ClientRequestFilter"); - - @Override - public boolean getAsBoolean() { - return IS_REST_CLIENT_AVAILABLE; - } - } - @BuildStep AdditionalBeanBuildItem ensureProducerIsRetained() { return AdditionalBeanBuildItem.builder() @@ -138,11 +130,15 @@ public void transform(TransformationContext context) { })); } - @BuildStep(onlyIf = RestClientAvailable.class) - void registerProvider(BuildProducer additionalIndexed, + @BuildStep + void registerRestClientClassicProvider( + Capabilities capabilities, + BuildProducer additionalIndexed, BuildProducer additionalBeans) { - additionalIndexed.produce(new AdditionalIndexedClassesBuildItem(OpenTelemetryClientFilter.class.getName())); - additionalBeans.produce(new AdditionalBeanBuildItem(OpenTelemetryClientFilter.class)); + if (capabilities.isPresent(Capability.REST_CLIENT) && capabilities.isMissing(Capability.REST_CLIENT_REACTIVE)) { + additionalIndexed.produce(new AdditionalIndexedClassesBuildItem(OpenTelemetryClientFilter.class.getName())); + additionalBeans.produce(new AdditionalBeanBuildItem(OpenTelemetryClientFilter.class)); + } } @BuildStep diff --git a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java index 7452f6dcfcf8a..e37a29e68ff84 100644 --- a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java +++ b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java @@ -1,7 +1,6 @@ package io.quarkus.opentelemetry.runtime; import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setContextSafe; -import static io.smallrye.common.vertx.VertxContext.getOrCreateDuplicatedContext; import static io.smallrye.common.vertx.VertxContext.isDuplicatedContext; import org.jboss.logging.Logger; @@ -9,6 +8,7 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.ContextStorage; import io.opentelemetry.context.Scope; +import io.smallrye.common.vertx.VertxContext; import io.vertx.core.Vertx; /** @@ -118,8 +118,10 @@ public static Context getContext(io.vertx.core.Context vertxContext) { */ public static io.vertx.core.Context getVertxContext() { io.vertx.core.Context context = Vertx.currentContext(); - if (context != null) { - io.vertx.core.Context dc = getOrCreateDuplicatedContext(context); + if (context != null && VertxContext.isOnDuplicatedContext()) { + return context; + } else if (context != null) { + io.vertx.core.Context dc = VertxContext.createNewDuplicatedContext(context); setContextSafe(dc, true); return dc; } diff --git a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/restclient/OpenTelemetryClientFilter.java b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/restclient/OpenTelemetryClientFilter.java index 2da1d59d039f2..dcd8222698a39 100644 --- a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/restclient/OpenTelemetryClientFilter.java +++ b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/restclient/OpenTelemetryClientFilter.java @@ -30,7 +30,9 @@ import io.quarkus.opentelemetry.runtime.QuarkusContextStorage; /** - * A client filter for the JAX-RS Client and MicroProfile REST Client that records OpenTelemetry data. + * A client filter for the JAX-RS Client and MicroProfile REST Client that records OpenTelemetry data. This is only used + * by RESTEasy Classic, because the handling implementation is provided by RESTEasy. This is not used by RESTEasy + * Reactive because tracing is handled by Vert.x. */ @Unremovable @Provider @@ -77,9 +79,19 @@ public void filter(final ClientRequestContext request) { if (parentContext == null) { parentContext = io.opentelemetry.context.Context.current(); } + + // For each request, we need a new OTel Context from the **current one** + // the parent context needs to be the one from which the call originates. + if (instrumenter.shouldStart(parentContext, request)) { Context spanContext = instrumenter.start(parentContext, request); - Scope scope = QuarkusContextStorage.INSTANCE.attach(vertxContext, spanContext); + // Create a new scope with an empty termination callback. + Scope scope = new Scope() { + @Override + public void close() { + + } + }; request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_CONTEXT, spanContext); request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_PARENT_CONTEXT, parentContext); request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_SCOPE, scope); diff --git a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/vertx/InstrumenterVertxTracer.java b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/vertx/InstrumenterVertxTracer.java index 272c1a83bb9fd..6fe1bb57cdb3f 100644 --- a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/vertx/InstrumenterVertxTracer.java +++ b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/vertx/InstrumenterVertxTracer.java @@ -1,7 +1,5 @@ package io.quarkus.opentelemetry.runtime.tracing.vertx; -import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setContextSafe; - import java.util.Map; import java.util.function.BiConsumer; @@ -9,7 +7,6 @@ import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.quarkus.opentelemetry.runtime.QuarkusContextStorage; import io.quarkus.opentelemetry.runtime.tracing.vertx.OpenTelemetryVertxTracer.SpanOperation; -import io.smallrye.common.vertx.VertxContext; import io.vertx.core.Context; import io.vertx.core.MultiMap; import io.vertx.core.http.impl.headers.HeadersAdaptor; @@ -92,10 +89,14 @@ default SpanOperation sendRequest( if (instrumenter.shouldStart(parentContext, (REQ) request)) { io.opentelemetry.context.Context spanContext = instrumenter.start(parentContext, writableHeaders((REQ) request, headers)); - Context duplicatedContext = VertxContext.createNewDuplicatedContext(context); - setContextSafe(duplicatedContext, true); - Scope scope = QuarkusContextStorage.INSTANCE.attach(duplicatedContext, spanContext); - return spanOperation(duplicatedContext, (REQ) request, toMultiMap(headers), spanContext, scope); + // Create a new scope with an empty termination callback. + Scope scope = new Scope() { + @Override + public void close() { + + } + }; + return spanOperation(context, (REQ) request, toMultiMap(headers), spanContext, scope); } return null; diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/RequestLeakDetectionTest.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/RequestLeakDetectionTest.java new file mode 100644 index 0000000000000..b62fe45b9a2f8 --- /dev/null +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/RequestLeakDetectionTest.java @@ -0,0 +1,192 @@ +package io.quarkus.resteasy.reactive.server.test; + +import static org.awaitility.Awaitility.await; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.context.RequestScoped; +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.core.MediaType; + +import org.jboss.shrinkwrap.api.asset.EmptyAsset; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; +import io.restassured.response.ResponseBody; +import io.smallrye.common.vertx.VertxContext; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Context; +import io.vertx.core.Vertx; + +public class RequestLeakDetectionTest { + + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar.addClasses(MyRestAPI.class, MyRequestScopeBean.class, Barrier.class, Task.class) + .addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml")); + + @Inject + Barrier barrier; + + @Test + public void testWithConcurrentCalls() { + List results = new CopyOnWriteArrayList<>(); + int count = 100; + barrier.setMaxConcurrency(count); + for (int i = 0; i < count; i++) { + int c = i; + new Thread(() -> { + ResponseBody body = RestAssured.given().pathParam("val", c).contentType(MediaType.APPLICATION_JSON) + .get("/test/{val}").thenReturn().body(); + String value = body.toString(); + results.add(value); + }).start(); + } + await().until(() -> results.size() == count); + Set asSet = new HashSet<>(results); + Assertions.assertEquals(asSet.size(), count); + } + + @Test + public void testWithConcurrentBlockingCalls() { + List results = new CopyOnWriteArrayList<>(); + int count = 100; + barrier.setMaxConcurrency(count); + for (int i = 0; i < count; i++) { + int c = i; + new Thread(() -> { + ResponseBody body = RestAssured.given().pathParam("val", c).contentType(MediaType.APPLICATION_JSON) + .get("/test/blocking/{val}").thenReturn().body(); + String value = body.toString(); + results.add(value); + }).start(); + } + await().until(() -> results.size() == count); + Set asSet = new HashSet<>(results); + Assertions.assertEquals(asSet.size(), count); + } + + @ApplicationScoped + @Path("/test") + public static class MyRestAPI { + + @Inject + MyRequestScopeBean bean; + + @Inject + Barrier barrier; + + @GET + @Path("/{val}") + public Uni foo(int val) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Vertx.currentContext().putLocal("count", val); + bean.setValue(val); + + return Uni.createFrom(). emitter(e -> { + barrier.enqueue(Vertx.currentContext(), () -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int r = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(r, val); + e.complete(bean.getValue()); + }); + }).map(i -> new Foo(Integer.toString(i))); + } + + @GET + @Path("/blocking/{val}") + public Foo blocking(int val) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Vertx.currentContext().putLocal("count", val); + bean.setValue(val); + + return Uni.createFrom(). emitter(e -> { + barrier.enqueue(Vertx.currentContext(), () -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int r = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(r, val); + e.complete(bean.getValue()); + }); + }) + .map(i -> new Foo(Integer.toString(i))) + .await().indefinitely(); + } + } + + @ApplicationScoped + public static class Barrier { + + private int level; + + public void setMaxConcurrency(int level) { + this.level = level; + } + + private final AtomicInteger counter = new AtomicInteger(); + private final List tasks = new CopyOnWriteArrayList<>(); + + public void enqueue(Context context, Runnable runnable) { + Task task = new Task(context, runnable); + tasks.add(task); + if (counter.incrementAndGet() >= level) { + for (Task tbr : new ArrayList<>(tasks)) { + tbr.run(); + tasks.remove(tbr); + } + } + } + } + + private static class Task { + private final Context context; + private final Runnable runnable; + + private Task(Context context, Runnable runnable) { + this.context = context; + this.runnable = runnable; + } + + void run() { + context.runOnContext(x -> runnable.run()); + } + } + + @RequestScoped + public static class MyRequestScopeBean { + + private int value = -1; + + public void setValue(int v) { + if (value != -1) { + throw new IllegalStateException("Already initialized"); + } + value = v; + } + + public int getValue() { + return value; + } + + } + + public static class Foo { + + public final String value; + + public Foo(String value) { + this.value = value; + } + } + +} diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/QuarkusResteasyReactiveRequestContext.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/QuarkusResteasyReactiveRequestContext.java index fb47a5faf78e4..9ca38e98e4942 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/QuarkusResteasyReactiveRequestContext.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/QuarkusResteasyReactiveRequestContext.java @@ -45,6 +45,13 @@ protected void handleRequestScopeActivation() { } } + @Override + protected void requestScopeDeactivated() { + // we intentionally don't call 'CurrentRequestManager.set(null)' + // because there is no need to clear the current request + // as that is backed by a DuplicatedContext and not accessible to other requests anyway + } + protected SecurityContext createSecurityContext() { return new ResteasyReactiveSecurityContext(context); } diff --git a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/RequestLeakDetectionTest.java b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/RequestLeakDetectionTest.java new file mode 100644 index 0000000000000..d1d2b403783a5 --- /dev/null +++ b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/RequestLeakDetectionTest.java @@ -0,0 +1,309 @@ +package io.quarkus.rest.client.reactive; + +import static io.quarkus.rest.client.reactive.RestClientTestUtil.setUrlForClass; +import static org.awaitility.Awaitility.await; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.context.RequestScoped; +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.core.MediaType; + +import org.eclipse.microprofile.rest.client.inject.RegisterRestClient; +import org.eclipse.microprofile.rest.client.inject.RestClient; +import org.jboss.shrinkwrap.api.asset.EmptyAsset; +import org.jboss.shrinkwrap.api.asset.StringAsset; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; +import io.restassured.response.ResponseBody; +import io.smallrye.common.vertx.VertxContext; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Context; +import io.vertx.core.Vertx; + +public class RequestLeakDetectionTest { + + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addClasses(MyRestAPI.class, MyRequestScopeBean.class, Barrier.class, Task.class, RemoteClient.class, + RemoteService.class) + .addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml") + .addAsResource( + new StringAsset(setUrlForClass(RemoteClient.class)), "application.properties")); + + @Inject + Barrier barrier; + + @ParameterizedTest + @ValueSource(strings = { + "reactive-server-and-client", + "blocking-server-and-reactive-client", + "blocking-server-and-client", + "reactive-server-and-blocking-client" + }) + public void testWithConcurrentCallsWithReactiveClientAndServer(String path) { + List results = new CopyOnWriteArrayList<>(); + int count = 100; + barrier.setMaxConcurrency(count); + for (int i = 0; i < count; i++) { + int c = i; + new Thread(() -> { + ResponseBody body = RestAssured.given().pathParam("val", c).contentType(MediaType.APPLICATION_JSON) + .get("/test/" + path + "/{val}").thenReturn().body(); + String value = body.toString(); + results.add(value); + }).start(); + } + await().until(() -> results.size() == count); + Set asSet = new HashSet<>(results); + Assertions.assertEquals(asSet.size(), count); + } + + @ApplicationScoped + @Path("/test") + public static class MyRestAPI { + + @Inject + MyRequestScopeBean bean; + + @Inject + Barrier barrier; + + @RestClient + RemoteClient client; + + @GET + @Path("/reactive-server-and-client/{val}") + public Uni reactiveServerAndClient(int val) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Vertx.currentContext().putLocal("count", val); + bean.setValue(val); + + return Uni.createFrom(). emitter(e -> { + barrier.enqueue(Vertx.currentContext(), () -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Assertions.assertEquals(bean.getValue(), val); + int rBefore = Vertx.currentContext().getLocal("count"); + client.invokeReactive(Integer.toString(val)) + .invoke(s -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int rAfter = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(s, "hello " + rAfter); + Assertions.assertEquals(rBefore, rAfter); + Assertions.assertEquals(rAfter, val); + Assertions.assertEquals(bean.getValue(), val); + }).subscribe().with(x -> e.complete(val), e::fail); + }); + }).map(i -> { + Assertions.assertEquals(bean.getValue(), val); + return new Foo(Integer.toString(i)); + }); + } + + @GET + @Path("/blocking-server-and-reactive-client/{val}") + public Foo blockingServerWithReactiveClient(int val) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Vertx.currentContext().putLocal("count", val); + bean.setValue(val); + + return Uni.createFrom(). emitter(e -> { + barrier.enqueue(Vertx.currentContext(), () -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int rBefore = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(bean.getValue(), val); + client.invokeReactive(Integer.toString(val)) + .invoke(s -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int rAfter = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(s, "hello " + rAfter); + Assertions.assertEquals(rBefore, rAfter); + Assertions.assertEquals(rAfter, val); + Assertions.assertEquals(bean.getValue(), val); + }).subscribe().with(x -> e.complete(val), e::fail); + }); + }).map(i -> { + Assertions.assertEquals(bean.getValue(), val); + return new Foo(Integer.toString(i)); + }) + .await().indefinitely(); + } + + @GET + @Path("/blocking-server-and-client/{val}") + public Foo blockingServerAndBlockingClient(int val) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Vertx.currentContext().putLocal("count", val); + bean.setValue(val); + + return Uni.createFrom(). emitter(e -> { + barrier.enqueue(Vertx.currentContext(), () -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int rBefore = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(bean.getValue(), val); + String s = client.invokeBlocking(Integer.toString(val)); + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int rAfter = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(s, "hello " + rAfter); + Assertions.assertEquals(rBefore, rAfter); + Assertions.assertEquals(rAfter, val); + Assertions.assertEquals(bean.getValue(), val); + e.complete(val); + }, true); + }).map(i -> { + Assertions.assertEquals(bean.getValue(), val); + return new Foo(Integer.toString(i)); + }) + .await().indefinitely(); + } + + @GET + @Path("/reactive-server-and-blocking-client/{val}") + public Uni reactiveServerWithBlockingClient(int val) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Vertx.currentContext().putLocal("count", val); + bean.setValue(val); + + return Uni.createFrom(). emitter(e -> { + barrier.enqueue(Vertx.currentContext(), () -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int rBefore = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(bean.getValue(), val); + String s = client.invokeBlocking(Integer.toString(val)); + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int rAfter = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(s, "hello " + rAfter); + Assertions.assertEquals(rBefore, rAfter); + Assertions.assertEquals(rAfter, val); + Assertions.assertEquals(bean.getValue(), val); + e.complete(val); + }, true); + }).map(i -> { + Assertions.assertEquals(bean.getValue(), val); + return new Foo(Integer.toString(i)); + }); + } + } + + @Path("/remote") + public static class RemoteService { + + @GET + @Path("/reactive/{name}") + public Uni hello(String name) { + return Uni.createFrom().item("hello " + name); + } + + @GET + @Path("/blocking/{name}") + public String helloBlocking(String name) { + return "hello " + name; + } + } + + @RegisterRestClient + @Path("/remote") + public interface RemoteClient { + @GET + @Path("/reactive/{name}") + Uni invokeReactive(String name); + + @GET + @Path("/blocking/{name}") + String invokeBlocking(String name); + } + + @ApplicationScoped + public static class Barrier { + + private int level; + + public void setMaxConcurrency(int level) { + this.level = level; + } + + private final AtomicInteger counter = new AtomicInteger(); + private final List tasks = new CopyOnWriteArrayList<>(); + + public void enqueue(Context context, Runnable runnable) { + enqueue(context, runnable, false); + } + + public void enqueue(Context context, Runnable runnable, boolean blocking) { + Task task = new Task(context, runnable, blocking); + tasks.add(task); + if (counter.incrementAndGet() >= level) { + for (Task tbr : new ArrayList<>(tasks)) { + tbr.run(); + tasks.remove(tbr); + } + } + } + } + + private static class Task { + private final Context context; + private final Runnable runnable; + + private final boolean blocking; + + private Task(Context context, Runnable runnable, boolean blocking) { + this.context = context; + this.runnable = runnable; + this.blocking = blocking; + } + + void run() { + if (blocking) { + context.executeBlocking(p -> { + runnable.run(); + p.complete(); + }); + } else { + context.runOnContext(x -> runnable.run()); + } + } + } + + @RequestScoped + public static class MyRequestScopeBean { + + private int value = -1; + + public void setValue(int v) { + if (value != -1) { + throw new IllegalStateException("Already initialized"); + } + value = v; + } + + public int getValue() { + return value; + } + + } + + public static class Foo { + + public final String value; + + public Foo(String value) { + this.value = value; + } + } + +} diff --git a/extensions/resteasy-reactive/rest-client-reactive/runtime/pom.xml b/extensions/resteasy-reactive/rest-client-reactive/runtime/pom.xml index b80c852a476bb..fb0ff6936531a 100644 --- a/extensions/resteasy-reactive/rest-client-reactive/runtime/pom.xml +++ b/extensions/resteasy-reactive/rest-client-reactive/runtime/pom.xml @@ -79,7 +79,7 @@ quarkus-extension-maven-plugin - io.quarkus.rest.client + io.quarkus.rest.client.reactive diff --git a/extensions/smallrye-graphql/deployment/pom.xml b/extensions/smallrye-graphql/deployment/pom.xml index e2feaa37733a6..81a7272060bae 100644 --- a/extensions/smallrye-graphql/deployment/pom.xml +++ b/extensions/smallrye-graphql/deployment/pom.xml @@ -83,7 +83,7 @@ io.quarkus - quarkus-resteasy-deployment + quarkus-resteasy-reactive-deployment test diff --git a/extensions/smallrye-graphql/deployment/src/test/java/io/quarkus/smallrye/graphql/deployment/RequestLeakDetectionTest.java b/extensions/smallrye-graphql/deployment/src/test/java/io/quarkus/smallrye/graphql/deployment/RequestLeakDetectionTest.java new file mode 100644 index 0000000000000..a05c20265ce85 --- /dev/null +++ b/extensions/smallrye-graphql/deployment/src/test/java/io/quarkus/smallrye/graphql/deployment/RequestLeakDetectionTest.java @@ -0,0 +1,185 @@ +package io.quarkus.smallrye.graphql.deployment; + +import static org.awaitility.Awaitility.await; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.context.RequestScoped; +import javax.inject.Inject; + +import org.eclipse.microprofile.graphql.GraphQLApi; +import org.eclipse.microprofile.graphql.Query; +import org.eclipse.microprofile.graphql.Source; +import org.jboss.shrinkwrap.api.asset.EmptyAsset; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; +import io.restassured.response.ResponseBody; +import io.smallrye.common.vertx.VertxContext; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Context; +import io.vertx.core.Vertx; + +public class RequestLeakDetectionTest extends AbstractGraphQLTest { + + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest().withApplicationRoot((jar) -> jar + .addClasses(RequestLeakDetectionTest.MyGraphQLApi.class, MyRequestScopeBean.class, Barrier.class, Task.class) + .addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml")); + + @Inject + Barrier barrier; + + @Test + public void testWithConcurrentCalls() { + List results = new CopyOnWriteArrayList<>(); + List nested = new CopyOnWriteArrayList<>(); + List nestedUni = new CopyOnWriteArrayList<>(); + int count = 100; + barrier.setMaxConcurrency(count); + for (int i = 0; i < count; i++) { + int c = i; + new Thread(() -> { + String query = getPayload("{ foo(val:" + c + ") { value nested{ value } nestedUni { value } } }"); + ResponseBody body = RestAssured.given().body(query).contentType(MEDIATYPE_JSON).post("/graphql/") + .thenReturn().body(); + String value = body.path("data.foo.value"); + String nestedValue = body.path("data.foo.nested.value"); + String nestedUniValue = body.path("data.foo.nestedUni.value"); + results.add(value); + nested.add(nestedValue); + nestedUni.add(nestedUniValue); + }).start(); + } + await().until(() -> results.size() == count); + await().until(() -> nested.size() == count); + await().until(() -> nestedUni.size() == count); + Set asSet = new HashSet<>(results); + Assertions.assertEquals(count, asSet.size()); + asSet = new HashSet<>(nested); + Assertions.assertEquals(count, asSet.size()); + asSet = new HashSet<>(nestedUni); + Assertions.assertEquals(count, asSet.size()); + } + + @ApplicationScoped + @GraphQLApi + public static class MyGraphQLApi { + + @Inject + MyRequestScopeBean bean; + + @Inject + Barrier barrier; + + @Query + public Uni foo(int val) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Vertx.currentContext().putLocal("count", val); + bean.setValue(val); + + return Uni.createFrom(). emitter(e -> { + barrier.enqueue(Vertx.currentContext(), () -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int r = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(r, val); + e.complete(bean.getValue()); + }); + }).map(i -> new Foo(Integer.toString(i))); + } + + public Foo nested(@Source Foo foo) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int r = Vertx.currentContext().getLocal("count"); + String rAsString = Integer.toString(r); + Assertions.assertEquals(rAsString, foo.value); + Assertions.assertEquals(bean.getValue(), r); + return new Foo("source field on foo " + foo.value); + } + + public Uni nestedUni(@Source Foo foo) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int r = Vertx.currentContext().getLocal("count"); + String rAsString = Integer.toString(r); + Assertions.assertEquals(rAsString, foo.value); + Assertions.assertEquals(bean.getValue(), r); + return Uni.createFrom().item(new Foo("uni source field on foo " + foo.value)); + } + + } + + @ApplicationScoped + public static class Barrier { + + private int level; + + public void setMaxConcurrency(int level) { + this.level = level; + } + + private final AtomicInteger counter = new AtomicInteger(); + private final List tasks = new CopyOnWriteArrayList<>(); + + public void enqueue(Context context, Runnable runnable) { + Task task = new Task(context, runnable); + tasks.add(task); + if (counter.incrementAndGet() >= level) { + for (Task tbr : new ArrayList<>(tasks)) { + tbr.run(); + tasks.remove(tbr); + } + } + } + } + + private static class Task { + private final Context context; + private final Runnable runnable; + + private Task(Context context, Runnable runnable) { + this.context = context; + this.runnable = runnable; + } + + void run() { + context.runOnContext(x -> runnable.run()); + } + } + + @RequestScoped + public static class MyRequestScopeBean { + + private int value = -1; + + public void setValue(int v) { + if (value != -1) { + throw new IllegalStateException("Already initialized"); + } + value = v; + } + + public int getValue() { + return value; + } + + } + + public static class Foo { + + public final String value; + + public Foo(String value) { + this.value = value; + } + } + +} diff --git a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java index 15903bb7dec40..e90ce7c644bd0 100644 --- a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java +++ b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java @@ -20,6 +20,7 @@ import io.quarkus.arc.deployment.AutoAddScopeBuildItem; import io.quarkus.arc.deployment.BeanRegistrationPhaseBuildItem; import io.quarkus.arc.deployment.BeanRegistrationPhaseBuildItem.BeanConfiguratorBuildItem; +import io.quarkus.arc.deployment.CurrentContextFactoryBuildItem; import io.quarkus.arc.deployment.UnremovableBeanBuildItem; import io.quarkus.arc.deployment.UnremovableBeanBuildItem.BeanClassAnnotationExclusion; import io.quarkus.arc.processor.AnnotationStore; @@ -98,6 +99,12 @@ VertxBuildItem build(CoreVertxBuildItem vertx, VertxRecorder recorder, return new VertxBuildItem(recorder.forceStart(vertx.getVertx())); } + @BuildStep + @Record(ExecutionTime.STATIC_INIT) + CurrentContextFactoryBuildItem currentContextFactory(VertxRecorder recorder) { + return new CurrentContextFactoryBuildItem(recorder.currentContextFactory()); + } + @BuildStep public UnremovableBeanBuildItem unremovableBeans() { return new UnremovableBeanBuildItem(new BeanClassAnnotationExclusion(CONSUME_EVENT)); diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxCurrentContextFactory.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxCurrentContextFactory.java new file mode 100644 index 0000000000000..a2806ebc62899 --- /dev/null +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxCurrentContextFactory.java @@ -0,0 +1,58 @@ +package io.quarkus.vertx.runtime; + +import java.lang.annotation.Annotation; + +import io.netty.util.concurrent.FastThreadLocal; +import io.quarkus.arc.CurrentContext; +import io.quarkus.arc.CurrentContextFactory; +import io.quarkus.arc.InjectableContext; +import io.quarkus.arc.InjectableContext.ContextState; +import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle; +import io.smallrye.common.vertx.VertxContext; +import io.vertx.core.Context; +import io.vertx.core.Vertx; + +public class VertxCurrentContextFactory implements CurrentContextFactory { + + @Override + public CurrentContext create(Class scope) { + return new VertxCurrentContext<>(); + } + + private static final class VertxCurrentContext implements CurrentContext { + + private final FastThreadLocal fallback = new FastThreadLocal<>(); + + @Override + public T get() { + Context context = Vertx.currentContext(); + if (context != null && VertxContext.isDuplicatedContext(context)) { + return context.getLocal(this); + } + return fallback.get(); + } + + @Override + public void set(T state) { + Context context = Vertx.currentContext(); + if (context != null && VertxContext.isDuplicatedContext(context)) { + VertxContextSafetyToggle.setContextSafe(context, true); + context.putLocal(this, state); + } else { + fallback.set(state); + } + } + + @Override + public void remove() { + Context context = Vertx.currentContext(); + if (context != null && VertxContext.isDuplicatedContext(context)) { + // NOOP - the DC should not be shared. + // context.removeLocal(this); + } else { + fallback.remove(); + } + } + + } +} diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxRecorder.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxRecorder.java index 9fdfd9def6ec9..12dbb305919cf 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxRecorder.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxRecorder.java @@ -14,6 +14,7 @@ import org.jboss.logging.Logger; +import io.quarkus.arc.CurrentContextFactory; import io.quarkus.runtime.LaunchMode; import io.quarkus.runtime.RuntimeValue; import io.quarkus.runtime.ShutdownContext; @@ -66,6 +67,10 @@ public void run() { } } + public RuntimeValue currentContextFactory() { + return new RuntimeValue<>(new VertxCurrentContextFactory()); + } + public static Vertx getVertx() { return vertx; } diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientRequestContextImpl.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientRequestContextImpl.java index d31f5eba7ee00..56bdb0ca7e63f 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientRequestContextImpl.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientRequestContextImpl.java @@ -5,6 +5,7 @@ import io.smallrye.common.vertx.VertxContext; import io.vertx.core.Context; +import io.vertx.core.Vertx; import java.io.OutputStream; import java.lang.annotation.Annotation; import java.lang.reflect.Type; @@ -54,10 +55,16 @@ public ClientRequestContextImpl(RestClientRequestContext restClientRequestContex this.configuration = configuration; this.headersMap = new ClientRequestHeadersMap(); //restClientRequestContext.requestHeaders.getHeaders() + // TODO This needs to be challenged: // Always create a duplicated context because each REST Client invocation must have its own context // A separate context allows integrations like OTel to create a separate Span for each invocation (expected) - Context current = client.vertx.getOrCreateContext(); - this.context = VertxContext.createNewDuplicatedContext(current); + Context ctxt = Vertx.currentContext(); + if (ctxt != null && VertxContext.isDuplicatedContext(ctxt)) { + this.context = ctxt; + } else { + Context current = client.vertx.getOrCreateContext(); + this.context = VertxContext.createNewDuplicatedContext(current); + } restClientRequestContext.properties.put(VERTX_CONTEXT_PROPERTY, context); } diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/RestClientRequestContext.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/RestClientRequestContext.java index d3216ecb939a7..24db57060dace 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/RestClientRequestContext.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/RestClientRequestContext.java @@ -527,4 +527,9 @@ private Boolean getBooleanProperty(String name, Boolean defaultValue) { } return defaultValue; } + + @Override + protected boolean isRequestScopeManagementRequired() { + return false; + } } diff --git a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/core/AbstractResteasyReactiveContext.java b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/core/AbstractResteasyReactiveContext.java index 345f4cf991d46..19f98396b6824 100644 --- a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/core/AbstractResteasyReactiveContext.java +++ b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/core/AbstractResteasyReactiveContext.java @@ -130,7 +130,7 @@ public void run() { boolean processingSuspended = false; //if this is a blocking target we don't activate for the initial non-blocking part //unless there are pre-mapping filters as these may require CDI - boolean disasociateRequestScope = false; + boolean disassociateRequestScope = false; boolean aborted = false; try { while (position < handlers.length) { @@ -146,7 +146,7 @@ public void run() { running = true; if (isRequestScopeManagementRequired()) { if (requestScopeActivated) { - disasociateRequestScope = true; + disassociateRequestScope = true; requestScopeActivated = false; } } else { @@ -190,7 +190,7 @@ public void run() { } close(); } else { - if (disasociateRequestScope) { + if (disassociateRequestScope) { requestScopeDeactivated(); currentRequestScope.deactivate(); } diff --git a/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/ReactiveResource.java b/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/ReactiveResource.java index a0f6bc2099bb6..a1018019cdafd 100644 --- a/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/ReactiveResource.java +++ b/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/ReactiveResource.java @@ -30,9 +30,18 @@ public Uni helloGet(@QueryParam("name") String name) { } @GET - @Path("/multiple") - public Uni helloMultiple() { - return Uni.combine().all().unis(client.helloGet("Naruto"), client.helloGet("Goku")) + @Path("/multiple-chain") + public Uni helloMultipleUsingChain() { + return client.helloGet("Naruto") + .chain(s1 -> client.helloGet("Goku").map(s2 -> s1 + " and " + s2)); + } + + @GET + @Path("/multiple-combine") + public Uni helloMultipleUsingCombine() { + return Uni.combine().all().unis( + client.helloGet("Naruto"), + client.helloGet("Goku")) .combinedWith((s, s2) -> s + " and " + s2); } diff --git a/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveTest.java b/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveTest.java index 277ae2fcb5101..61bb0108934fd 100644 --- a/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveTest.java +++ b/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveTest.java @@ -1,20 +1,29 @@ package io.quarkus.it.opentelemetry.reactive; +import static io.opentelemetry.api.trace.SpanKind.CLIENT; +import static io.opentelemetry.api.trace.SpanKind.INTERNAL; +import static io.opentelemetry.api.trace.SpanKind.SERVER; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_TARGET; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_URL; import static io.restassured.RestAssured.given; import static io.restassured.RestAssured.when; import static java.net.HttpURLConnection.HTTP_OK; +import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import io.opentelemetry.api.trace.SpanKind; import io.quarkus.test.junit.QuarkusTest; import io.restassured.common.mapper.TypeRef; @@ -29,7 +38,6 @@ void reset() { @Test void get() { given() - .contentType("application/json") .when() .queryParam("name", "Naruto") .get("/reactive") @@ -46,7 +54,6 @@ void get() { @Test void post() { given() - .contentType("application/json") .when() .body("Naruto") .post("/reactive") @@ -61,11 +68,10 @@ void post() { } @Test - void multiple() { + void multipleUsingChain() { given() - .contentType("application/json") .when() - .get("/reactive/multiple") + .get("/reactive/multiple-chain") .then() .statusCode(200) .body(equalTo("Hello Naruto and Hello Goku")); @@ -75,10 +81,106 @@ void multiple() { List> spans = getSpans(); assertEquals(7, spans.size()); assertEquals(1, spans.stream().map(map -> map.get("traceId")).collect(toSet()).size()); + + // First span is the call getting into the server. It does not have a parent span. + Map parent = getSpanByKindAndParentId(spans, SERVER, "0000000000000000"); + + // We should get 2 client spans originated by the server + List> clientSpans = getSpansByKindAndParentId(spans, CLIENT, parent.get("spanId")); + assertEquals(2, clientSpans.size()); + + // Each client calls the server and programmatically create a span, so each have a server and an internal span + + // Naruto Span + Optional> narutoSpan = clientSpans.stream() + .filter(map -> ((String) ((Map) map.get("attributes")).get(HTTP_URL.getKey())).contains("Naruto")) + .findFirst(); + assertTrue(narutoSpan.isPresent()); + Map naruto = narutoSpan.get(); + + Map narutoServer = getSpanByKindAndParentId(spans, SERVER, naruto.get("spanId")); + assertEquals("/reactive?name=Naruto", ((Map) narutoServer.get("attributes")).get(HTTP_TARGET.getKey())); + Map narutoInternal = getSpanByKindAndParentId(spans, INTERNAL, narutoServer.get("spanId")); + assertEquals("helloGet", narutoInternal.get("name")); + + // Goku Span + Optional> gokuSpan = clientSpans.stream() + .filter(map -> ((String) ((Map) map.get("attributes")).get(HTTP_URL.getKey())).contains("Goku")) + .findFirst(); + assertTrue(gokuSpan.isPresent()); + Map goku = gokuSpan.get(); + + Map gokuServer = getSpanByKindAndParentId(spans, SERVER, goku.get("spanId")); + assertEquals("/reactive?name=Goku", ((Map) gokuServer.get("attributes")).get(HTTP_TARGET.getKey())); + Map gokuInternal = getSpanByKindAndParentId(spans, INTERNAL, gokuServer.get("spanId")); + assertEquals("helloGet", gokuInternal.get("name")); + } + + @Test + void multipleUsingCombine() { + given() + .when() + .get("/reactive/multiple-combine") + .then() + .statusCode(200) + .body(equalTo("Hello Naruto and Hello Goku")); + + await().atMost(5, TimeUnit.SECONDS).until(() -> getSpans().size() == 7); + + List> spans = getSpans(); + assertEquals(7, spans.size()); + assertEquals(1, spans.stream().map(map -> map.get("traceId")).collect(toSet()).size()); + + // First span is the call getting into the server. It does not have a parent span. + Map parent = getSpanByKindAndParentId(spans, SERVER, "0000000000000000"); + + // We should get 2 client spans originated by the server + List> clientSpans = getSpansByKindAndParentId(spans, CLIENT, parent.get("spanId")); + assertEquals(2, clientSpans.size()); + + // Each client calls the server and programmatically create a span, so each have a server and an internal span + + // Naruto Span + Optional> narutoSpan = clientSpans.stream() + .filter(map -> ((String) ((Map) map.get("attributes")).get(HTTP_URL.getKey())).contains("Naruto")) + .findFirst(); + assertTrue(narutoSpan.isPresent()); + Map naruto = narutoSpan.get(); + + Map narutoServer = getSpanByKindAndParentId(spans, SERVER, naruto.get("spanId")); + assertEquals("/reactive?name=Naruto", ((Map) narutoServer.get("attributes")).get(HTTP_TARGET.getKey())); + Map narutoInternal = getSpanByKindAndParentId(spans, INTERNAL, narutoServer.get("spanId")); + assertEquals("helloGet", narutoInternal.get("name")); + + // Goku Span + Optional> gokuSpan = clientSpans.stream() + .filter(map -> ((String) ((Map) map.get("attributes")).get(HTTP_URL.getKey())).contains("Goku")) + .findFirst(); + assertTrue(gokuSpan.isPresent()); + Map goku = gokuSpan.get(); + + Map gokuServer = getSpanByKindAndParentId(spans, SERVER, goku.get("spanId")); + assertEquals("/reactive?name=Goku", ((Map) gokuServer.get("attributes")).get(HTTP_TARGET.getKey())); + Map gokuInternal = getSpanByKindAndParentId(spans, INTERNAL, gokuServer.get("spanId")); + assertEquals("helloGet", gokuInternal.get("name")); } private static List> getSpans() { return when().get("/export").body().as(new TypeRef<>() { }); } + + private static Map getSpanByKindAndParentId(List> spans, SpanKind kind, + Object parentSpanId) { + List> span = getSpansByKindAndParentId(spans, kind, parentSpanId); + assertEquals(1, span.size()); + return span.get(0); + } + + private static List> getSpansByKindAndParentId(List> spans, SpanKind kind, + Object parentSpanId) { + return spans.stream() + .filter(map -> map.get("kind").equals(kind.toString())) + .filter(map -> map.get("parentSpanId").equals(parentSpanId)).collect(toList()); + } } From c24360edcfe2bd697d3c5433f99b70761fc0e201 Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Wed, 7 Sep 2022 15:03:04 +0200 Subject: [PATCH 2/3] Add a config property to disable the VertxCurrentContextFactory --- .../vertx/deployment/VertxBuildConfig.java | 17 ++++++++++++++ .../vertx/deployment/VertxProcessor.java | 7 ++++-- ...ertxCurrentContextFactoryDisabledTest.java | 23 +++++++++++++++++++ 3 files changed, 45 insertions(+), 2 deletions(-) create mode 100644 extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxBuildConfig.java create mode 100644 extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/currentcontextfactory/VertxCurrentContextFactoryDisabledTest.java diff --git a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxBuildConfig.java b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxBuildConfig.java new file mode 100644 index 0000000000000..b2c13043ae971 --- /dev/null +++ b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxBuildConfig.java @@ -0,0 +1,17 @@ +package io.quarkus.vertx.deployment; + +import io.quarkus.runtime.annotations.ConfigItem; +import io.quarkus.runtime.annotations.ConfigPhase; +import io.quarkus.runtime.annotations.ConfigRoot; + +@ConfigRoot(name = "vertx", phase = ConfigPhase.BUILD_TIME) +public class VertxBuildConfig { + + /** + * If set to {@code true} then a customized current context factory (backed by a Vert.x duplicated local context) is used + * for normal scopes in ArC. + */ + @ConfigItem(generateDocumentation = false, defaultValue = "true") + public boolean customizeArcContext; + +} diff --git a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java index e90ce7c644bd0..43b5c21d71156 100644 --- a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java +++ b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java @@ -101,8 +101,11 @@ VertxBuildItem build(CoreVertxBuildItem vertx, VertxRecorder recorder, @BuildStep @Record(ExecutionTime.STATIC_INIT) - CurrentContextFactoryBuildItem currentContextFactory(VertxRecorder recorder) { - return new CurrentContextFactoryBuildItem(recorder.currentContextFactory()); + void currentContextFactory(BuildProducer currentContextFactory, + VertxBuildConfig buildConfig, VertxRecorder recorder) { + if (buildConfig.customizeArcContext) { + currentContextFactory.produce(new CurrentContextFactoryBuildItem(recorder.currentContextFactory())); + } } @BuildStep diff --git a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/currentcontextfactory/VertxCurrentContextFactoryDisabledTest.java b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/currentcontextfactory/VertxCurrentContextFactoryDisabledTest.java new file mode 100644 index 0000000000000..4f17aed87680b --- /dev/null +++ b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/currentcontextfactory/VertxCurrentContextFactoryDisabledTest.java @@ -0,0 +1,23 @@ +package io.quarkus.vertx.deployment.currentcontextfactory; + +import static org.junit.jupiter.api.Assertions.assertFalse; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.arc.Arc; +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.vertx.runtime.VertxCurrentContextFactory; + +public class VertxCurrentContextFactoryDisabledTest { + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .overrideConfigKey("quarkus.vertx.customize-arc-context", "false"); + + @Test + public void testCustomizedFactoryNotUsed() { + assertFalse(Arc.container().getCurrentContextFactory() instanceof VertxCurrentContextFactory); + } + +} From 2b64b819a5f1e1e70cee16b6bbda7c0f8fcb3ec5 Mon Sep 17 00:00:00 2001 From: Roberto Cortez Date: Wed, 7 Sep 2022 16:05:14 +0100 Subject: [PATCH 3/3] Properly check for REST Client Capability --- .../quarkus/narayana/lra/deployment/NarayanaLRAProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/narayana-lra/deployment/src/main/java/io/quarkus/narayana/lra/deployment/NarayanaLRAProcessor.java b/extensions/narayana-lra/deployment/src/main/java/io/quarkus/narayana/lra/deployment/NarayanaLRAProcessor.java index 3f77956e3179e..24948418b2cd2 100644 --- a/extensions/narayana-lra/deployment/src/main/java/io/quarkus/narayana/lra/deployment/NarayanaLRAProcessor.java +++ b/extensions/narayana-lra/deployment/src/main/java/io/quarkus/narayana/lra/deployment/NarayanaLRAProcessor.java @@ -45,7 +45,7 @@ void registerFeature(BuildProducer feature, Capabilities capab "'quarkus-narayana-lra' can only work if 'quarkus-resteasy-jackson' or 'quarkus-resteasy-reactive-jackson' is present"); } - if (!capabilities.isPresent(Capability.REST_CLIENT)) { + if (!capabilities.isCapabilityWithPrefixPresent(Capability.REST_CLIENT)) { throw new IllegalStateException( "'quarkus-narayana-lra' can only work if 'quarkus-rest-client' or 'quarkus-rest-client-reactive' is present"); }