diff --git a/core/deployment/src/main/java/io/quarkus/deployment/Capability.java b/core/deployment/src/main/java/io/quarkus/deployment/Capability.java index 5af46ef4d0b2f..e5463d19ed91a 100644 --- a/core/deployment/src/main/java/io/quarkus/deployment/Capability.java +++ b/core/deployment/src/main/java/io/quarkus/deployment/Capability.java @@ -39,6 +39,7 @@ public interface Capability { String REST = QUARKUS_PREFIX + "rest"; String REST_CLIENT = REST + ".client"; + String REST_CLIENT_REACTIVE = REST_CLIENT + ".reactive"; String REST_JACKSON = REST + ".jackson"; String REST_JSONB = REST + ".jsonb"; diff --git a/extensions/narayana-lra/deployment/src/main/java/io/quarkus/narayana/lra/deployment/NarayanaLRAProcessor.java b/extensions/narayana-lra/deployment/src/main/java/io/quarkus/narayana/lra/deployment/NarayanaLRAProcessor.java index 3f77956e3179e..24948418b2cd2 100644 --- a/extensions/narayana-lra/deployment/src/main/java/io/quarkus/narayana/lra/deployment/NarayanaLRAProcessor.java +++ b/extensions/narayana-lra/deployment/src/main/java/io/quarkus/narayana/lra/deployment/NarayanaLRAProcessor.java @@ -45,7 +45,7 @@ void registerFeature(BuildProducer feature, Capabilities capab "'quarkus-narayana-lra' can only work if 'quarkus-resteasy-jackson' or 'quarkus-resteasy-reactive-jackson' is present"); } - if (!capabilities.isPresent(Capability.REST_CLIENT)) { + if (!capabilities.isCapabilityWithPrefixPresent(Capability.REST_CLIENT)) { throw new IllegalStateException( "'quarkus-narayana-lra' can only work if 'quarkus-rest-client' or 'quarkus-rest-client-reactive' is present"); } diff --git a/extensions/opentelemetry/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/OpenTelemetryProcessor.java b/extensions/opentelemetry/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/OpenTelemetryProcessor.java index 004bc8cc6dc20..4a54dc1c8ea00 100644 --- a/extensions/opentelemetry/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/OpenTelemetryProcessor.java +++ b/extensions/opentelemetry/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/OpenTelemetryProcessor.java @@ -3,7 +3,6 @@ import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.function.BooleanSupplier; import java.util.stream.Collectors; import org.jboss.jandex.AnnotationInstance; @@ -21,6 +20,8 @@ import io.quarkus.arc.processor.AnnotationsTransformer; import io.quarkus.arc.processor.InterceptorBindingRegistrar; import io.quarkus.bootstrap.classloading.QuarkusClassLoader; +import io.quarkus.deployment.Capabilities; +import io.quarkus.deployment.Capability; import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; import io.quarkus.deployment.annotations.BuildSteps; @@ -53,15 +54,6 @@ public class OpenTelemetryProcessor { io.opentelemetry.extension.annotations.SpanAttribute.class.getName());; private static final DotName SPAN_ATTRIBUTE = DotName.createSimple(SpanAttribute.class.getName()); - static class RestClientAvailable implements BooleanSupplier { - private static final boolean IS_REST_CLIENT_AVAILABLE = isClassPresent("javax.ws.rs.client.ClientRequestFilter"); - - @Override - public boolean getAsBoolean() { - return IS_REST_CLIENT_AVAILABLE; - } - } - @BuildStep AdditionalBeanBuildItem ensureProducerIsRetained() { return AdditionalBeanBuildItem.builder() @@ -138,11 +130,15 @@ public void transform(TransformationContext context) { })); } - @BuildStep(onlyIf = RestClientAvailable.class) - void registerProvider(BuildProducer additionalIndexed, + @BuildStep + void registerRestClientClassicProvider( + Capabilities capabilities, + BuildProducer additionalIndexed, BuildProducer additionalBeans) { - additionalIndexed.produce(new AdditionalIndexedClassesBuildItem(OpenTelemetryClientFilter.class.getName())); - additionalBeans.produce(new AdditionalBeanBuildItem(OpenTelemetryClientFilter.class)); + if (capabilities.isPresent(Capability.REST_CLIENT) && capabilities.isMissing(Capability.REST_CLIENT_REACTIVE)) { + additionalIndexed.produce(new AdditionalIndexedClassesBuildItem(OpenTelemetryClientFilter.class.getName())); + additionalBeans.produce(new AdditionalBeanBuildItem(OpenTelemetryClientFilter.class)); + } } @BuildStep diff --git a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java index 7452f6dcfcf8a..e37a29e68ff84 100644 --- a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java +++ b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java @@ -1,7 +1,6 @@ package io.quarkus.opentelemetry.runtime; import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setContextSafe; -import static io.smallrye.common.vertx.VertxContext.getOrCreateDuplicatedContext; import static io.smallrye.common.vertx.VertxContext.isDuplicatedContext; import org.jboss.logging.Logger; @@ -9,6 +8,7 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.ContextStorage; import io.opentelemetry.context.Scope; +import io.smallrye.common.vertx.VertxContext; import io.vertx.core.Vertx; /** @@ -118,8 +118,10 @@ public static Context getContext(io.vertx.core.Context vertxContext) { */ public static io.vertx.core.Context getVertxContext() { io.vertx.core.Context context = Vertx.currentContext(); - if (context != null) { - io.vertx.core.Context dc = getOrCreateDuplicatedContext(context); + if (context != null && VertxContext.isOnDuplicatedContext()) { + return context; + } else if (context != null) { + io.vertx.core.Context dc = VertxContext.createNewDuplicatedContext(context); setContextSafe(dc, true); return dc; } diff --git a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/restclient/OpenTelemetryClientFilter.java b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/restclient/OpenTelemetryClientFilter.java index 2da1d59d039f2..dcd8222698a39 100644 --- a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/restclient/OpenTelemetryClientFilter.java +++ b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/restclient/OpenTelemetryClientFilter.java @@ -30,7 +30,9 @@ import io.quarkus.opentelemetry.runtime.QuarkusContextStorage; /** - * A client filter for the JAX-RS Client and MicroProfile REST Client that records OpenTelemetry data. + * A client filter for the JAX-RS Client and MicroProfile REST Client that records OpenTelemetry data. This is only used + * by RESTEasy Classic, because the handling implementation is provided by RESTEasy. This is not used by RESTEasy + * Reactive because tracing is handled by Vert.x. */ @Unremovable @Provider @@ -77,9 +79,19 @@ public void filter(final ClientRequestContext request) { if (parentContext == null) { parentContext = io.opentelemetry.context.Context.current(); } + + // For each request, we need a new OTel Context from the **current one** + // the parent context needs to be the one from which the call originates. + if (instrumenter.shouldStart(parentContext, request)) { Context spanContext = instrumenter.start(parentContext, request); - Scope scope = QuarkusContextStorage.INSTANCE.attach(vertxContext, spanContext); + // Create a new scope with an empty termination callback. + Scope scope = new Scope() { + @Override + public void close() { + + } + }; request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_CONTEXT, spanContext); request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_PARENT_CONTEXT, parentContext); request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_SCOPE, scope); diff --git a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/vertx/InstrumenterVertxTracer.java b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/vertx/InstrumenterVertxTracer.java index 272c1a83bb9fd..6fe1bb57cdb3f 100644 --- a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/vertx/InstrumenterVertxTracer.java +++ b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/vertx/InstrumenterVertxTracer.java @@ -1,7 +1,5 @@ package io.quarkus.opentelemetry.runtime.tracing.vertx; -import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setContextSafe; - import java.util.Map; import java.util.function.BiConsumer; @@ -9,7 +7,6 @@ import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.quarkus.opentelemetry.runtime.QuarkusContextStorage; import io.quarkus.opentelemetry.runtime.tracing.vertx.OpenTelemetryVertxTracer.SpanOperation; -import io.smallrye.common.vertx.VertxContext; import io.vertx.core.Context; import io.vertx.core.MultiMap; import io.vertx.core.http.impl.headers.HeadersAdaptor; @@ -92,10 +89,14 @@ default SpanOperation sendRequest( if (instrumenter.shouldStart(parentContext, (REQ) request)) { io.opentelemetry.context.Context spanContext = instrumenter.start(parentContext, writableHeaders((REQ) request, headers)); - Context duplicatedContext = VertxContext.createNewDuplicatedContext(context); - setContextSafe(duplicatedContext, true); - Scope scope = QuarkusContextStorage.INSTANCE.attach(duplicatedContext, spanContext); - return spanOperation(duplicatedContext, (REQ) request, toMultiMap(headers), spanContext, scope); + // Create a new scope with an empty termination callback. + Scope scope = new Scope() { + @Override + public void close() { + + } + }; + return spanOperation(context, (REQ) request, toMultiMap(headers), spanContext, scope); } return null; diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/RequestLeakDetectionTest.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/RequestLeakDetectionTest.java new file mode 100644 index 0000000000000..b62fe45b9a2f8 --- /dev/null +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/RequestLeakDetectionTest.java @@ -0,0 +1,192 @@ +package io.quarkus.resteasy.reactive.server.test; + +import static org.awaitility.Awaitility.await; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.context.RequestScoped; +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.core.MediaType; + +import org.jboss.shrinkwrap.api.asset.EmptyAsset; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; +import io.restassured.response.ResponseBody; +import io.smallrye.common.vertx.VertxContext; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Context; +import io.vertx.core.Vertx; + +public class RequestLeakDetectionTest { + + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar.addClasses(MyRestAPI.class, MyRequestScopeBean.class, Barrier.class, Task.class) + .addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml")); + + @Inject + Barrier barrier; + + @Test + public void testWithConcurrentCalls() { + List results = new CopyOnWriteArrayList<>(); + int count = 100; + barrier.setMaxConcurrency(count); + for (int i = 0; i < count; i++) { + int c = i; + new Thread(() -> { + ResponseBody body = RestAssured.given().pathParam("val", c).contentType(MediaType.APPLICATION_JSON) + .get("/test/{val}").thenReturn().body(); + String value = body.toString(); + results.add(value); + }).start(); + } + await().until(() -> results.size() == count); + Set asSet = new HashSet<>(results); + Assertions.assertEquals(asSet.size(), count); + } + + @Test + public void testWithConcurrentBlockingCalls() { + List results = new CopyOnWriteArrayList<>(); + int count = 100; + barrier.setMaxConcurrency(count); + for (int i = 0; i < count; i++) { + int c = i; + new Thread(() -> { + ResponseBody body = RestAssured.given().pathParam("val", c).contentType(MediaType.APPLICATION_JSON) + .get("/test/blocking/{val}").thenReturn().body(); + String value = body.toString(); + results.add(value); + }).start(); + } + await().until(() -> results.size() == count); + Set asSet = new HashSet<>(results); + Assertions.assertEquals(asSet.size(), count); + } + + @ApplicationScoped + @Path("/test") + public static class MyRestAPI { + + @Inject + MyRequestScopeBean bean; + + @Inject + Barrier barrier; + + @GET + @Path("/{val}") + public Uni foo(int val) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Vertx.currentContext().putLocal("count", val); + bean.setValue(val); + + return Uni.createFrom(). emitter(e -> { + barrier.enqueue(Vertx.currentContext(), () -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int r = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(r, val); + e.complete(bean.getValue()); + }); + }).map(i -> new Foo(Integer.toString(i))); + } + + @GET + @Path("/blocking/{val}") + public Foo blocking(int val) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Vertx.currentContext().putLocal("count", val); + bean.setValue(val); + + return Uni.createFrom(). emitter(e -> { + barrier.enqueue(Vertx.currentContext(), () -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int r = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(r, val); + e.complete(bean.getValue()); + }); + }) + .map(i -> new Foo(Integer.toString(i))) + .await().indefinitely(); + } + } + + @ApplicationScoped + public static class Barrier { + + private int level; + + public void setMaxConcurrency(int level) { + this.level = level; + } + + private final AtomicInteger counter = new AtomicInteger(); + private final List tasks = new CopyOnWriteArrayList<>(); + + public void enqueue(Context context, Runnable runnable) { + Task task = new Task(context, runnable); + tasks.add(task); + if (counter.incrementAndGet() >= level) { + for (Task tbr : new ArrayList<>(tasks)) { + tbr.run(); + tasks.remove(tbr); + } + } + } + } + + private static class Task { + private final Context context; + private final Runnable runnable; + + private Task(Context context, Runnable runnable) { + this.context = context; + this.runnable = runnable; + } + + void run() { + context.runOnContext(x -> runnable.run()); + } + } + + @RequestScoped + public static class MyRequestScopeBean { + + private int value = -1; + + public void setValue(int v) { + if (value != -1) { + throw new IllegalStateException("Already initialized"); + } + value = v; + } + + public int getValue() { + return value; + } + + } + + public static class Foo { + + public final String value; + + public Foo(String value) { + this.value = value; + } + } + +} diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/QuarkusResteasyReactiveRequestContext.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/QuarkusResteasyReactiveRequestContext.java index fb47a5faf78e4..9ca38e98e4942 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/QuarkusResteasyReactiveRequestContext.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/QuarkusResteasyReactiveRequestContext.java @@ -45,6 +45,13 @@ protected void handleRequestScopeActivation() { } } + @Override + protected void requestScopeDeactivated() { + // we intentionally don't call 'CurrentRequestManager.set(null)' + // because there is no need to clear the current request + // as that is backed by a DuplicatedContext and not accessible to other requests anyway + } + protected SecurityContext createSecurityContext() { return new ResteasyReactiveSecurityContext(context); } diff --git a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/RequestLeakDetectionTest.java b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/RequestLeakDetectionTest.java new file mode 100644 index 0000000000000..d1d2b403783a5 --- /dev/null +++ b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/RequestLeakDetectionTest.java @@ -0,0 +1,309 @@ +package io.quarkus.rest.client.reactive; + +import static io.quarkus.rest.client.reactive.RestClientTestUtil.setUrlForClass; +import static org.awaitility.Awaitility.await; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.context.RequestScoped; +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.core.MediaType; + +import org.eclipse.microprofile.rest.client.inject.RegisterRestClient; +import org.eclipse.microprofile.rest.client.inject.RestClient; +import org.jboss.shrinkwrap.api.asset.EmptyAsset; +import org.jboss.shrinkwrap.api.asset.StringAsset; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; +import io.restassured.response.ResponseBody; +import io.smallrye.common.vertx.VertxContext; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Context; +import io.vertx.core.Vertx; + +public class RequestLeakDetectionTest { + + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addClasses(MyRestAPI.class, MyRequestScopeBean.class, Barrier.class, Task.class, RemoteClient.class, + RemoteService.class) + .addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml") + .addAsResource( + new StringAsset(setUrlForClass(RemoteClient.class)), "application.properties")); + + @Inject + Barrier barrier; + + @ParameterizedTest + @ValueSource(strings = { + "reactive-server-and-client", + "blocking-server-and-reactive-client", + "blocking-server-and-client", + "reactive-server-and-blocking-client" + }) + public void testWithConcurrentCallsWithReactiveClientAndServer(String path) { + List results = new CopyOnWriteArrayList<>(); + int count = 100; + barrier.setMaxConcurrency(count); + for (int i = 0; i < count; i++) { + int c = i; + new Thread(() -> { + ResponseBody body = RestAssured.given().pathParam("val", c).contentType(MediaType.APPLICATION_JSON) + .get("/test/" + path + "/{val}").thenReturn().body(); + String value = body.toString(); + results.add(value); + }).start(); + } + await().until(() -> results.size() == count); + Set asSet = new HashSet<>(results); + Assertions.assertEquals(asSet.size(), count); + } + + @ApplicationScoped + @Path("/test") + public static class MyRestAPI { + + @Inject + MyRequestScopeBean bean; + + @Inject + Barrier barrier; + + @RestClient + RemoteClient client; + + @GET + @Path("/reactive-server-and-client/{val}") + public Uni reactiveServerAndClient(int val) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Vertx.currentContext().putLocal("count", val); + bean.setValue(val); + + return Uni.createFrom(). emitter(e -> { + barrier.enqueue(Vertx.currentContext(), () -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Assertions.assertEquals(bean.getValue(), val); + int rBefore = Vertx.currentContext().getLocal("count"); + client.invokeReactive(Integer.toString(val)) + .invoke(s -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int rAfter = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(s, "hello " + rAfter); + Assertions.assertEquals(rBefore, rAfter); + Assertions.assertEquals(rAfter, val); + Assertions.assertEquals(bean.getValue(), val); + }).subscribe().with(x -> e.complete(val), e::fail); + }); + }).map(i -> { + Assertions.assertEquals(bean.getValue(), val); + return new Foo(Integer.toString(i)); + }); + } + + @GET + @Path("/blocking-server-and-reactive-client/{val}") + public Foo blockingServerWithReactiveClient(int val) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Vertx.currentContext().putLocal("count", val); + bean.setValue(val); + + return Uni.createFrom(). emitter(e -> { + barrier.enqueue(Vertx.currentContext(), () -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int rBefore = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(bean.getValue(), val); + client.invokeReactive(Integer.toString(val)) + .invoke(s -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int rAfter = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(s, "hello " + rAfter); + Assertions.assertEquals(rBefore, rAfter); + Assertions.assertEquals(rAfter, val); + Assertions.assertEquals(bean.getValue(), val); + }).subscribe().with(x -> e.complete(val), e::fail); + }); + }).map(i -> { + Assertions.assertEquals(bean.getValue(), val); + return new Foo(Integer.toString(i)); + }) + .await().indefinitely(); + } + + @GET + @Path("/blocking-server-and-client/{val}") + public Foo blockingServerAndBlockingClient(int val) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Vertx.currentContext().putLocal("count", val); + bean.setValue(val); + + return Uni.createFrom(). emitter(e -> { + barrier.enqueue(Vertx.currentContext(), () -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int rBefore = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(bean.getValue(), val); + String s = client.invokeBlocking(Integer.toString(val)); + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int rAfter = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(s, "hello " + rAfter); + Assertions.assertEquals(rBefore, rAfter); + Assertions.assertEquals(rAfter, val); + Assertions.assertEquals(bean.getValue(), val); + e.complete(val); + }, true); + }).map(i -> { + Assertions.assertEquals(bean.getValue(), val); + return new Foo(Integer.toString(i)); + }) + .await().indefinitely(); + } + + @GET + @Path("/reactive-server-and-blocking-client/{val}") + public Uni reactiveServerWithBlockingClient(int val) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Vertx.currentContext().putLocal("count", val); + bean.setValue(val); + + return Uni.createFrom(). emitter(e -> { + barrier.enqueue(Vertx.currentContext(), () -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int rBefore = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(bean.getValue(), val); + String s = client.invokeBlocking(Integer.toString(val)); + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int rAfter = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(s, "hello " + rAfter); + Assertions.assertEquals(rBefore, rAfter); + Assertions.assertEquals(rAfter, val); + Assertions.assertEquals(bean.getValue(), val); + e.complete(val); + }, true); + }).map(i -> { + Assertions.assertEquals(bean.getValue(), val); + return new Foo(Integer.toString(i)); + }); + } + } + + @Path("/remote") + public static class RemoteService { + + @GET + @Path("/reactive/{name}") + public Uni hello(String name) { + return Uni.createFrom().item("hello " + name); + } + + @GET + @Path("/blocking/{name}") + public String helloBlocking(String name) { + return "hello " + name; + } + } + + @RegisterRestClient + @Path("/remote") + public interface RemoteClient { + @GET + @Path("/reactive/{name}") + Uni invokeReactive(String name); + + @GET + @Path("/blocking/{name}") + String invokeBlocking(String name); + } + + @ApplicationScoped + public static class Barrier { + + private int level; + + public void setMaxConcurrency(int level) { + this.level = level; + } + + private final AtomicInteger counter = new AtomicInteger(); + private final List tasks = new CopyOnWriteArrayList<>(); + + public void enqueue(Context context, Runnable runnable) { + enqueue(context, runnable, false); + } + + public void enqueue(Context context, Runnable runnable, boolean blocking) { + Task task = new Task(context, runnable, blocking); + tasks.add(task); + if (counter.incrementAndGet() >= level) { + for (Task tbr : new ArrayList<>(tasks)) { + tbr.run(); + tasks.remove(tbr); + } + } + } + } + + private static class Task { + private final Context context; + private final Runnable runnable; + + private final boolean blocking; + + private Task(Context context, Runnable runnable, boolean blocking) { + this.context = context; + this.runnable = runnable; + this.blocking = blocking; + } + + void run() { + if (blocking) { + context.executeBlocking(p -> { + runnable.run(); + p.complete(); + }); + } else { + context.runOnContext(x -> runnable.run()); + } + } + } + + @RequestScoped + public static class MyRequestScopeBean { + + private int value = -1; + + public void setValue(int v) { + if (value != -1) { + throw new IllegalStateException("Already initialized"); + } + value = v; + } + + public int getValue() { + return value; + } + + } + + public static class Foo { + + public final String value; + + public Foo(String value) { + this.value = value; + } + } + +} diff --git a/extensions/resteasy-reactive/rest-client-reactive/runtime/pom.xml b/extensions/resteasy-reactive/rest-client-reactive/runtime/pom.xml index b80c852a476bb..fb0ff6936531a 100644 --- a/extensions/resteasy-reactive/rest-client-reactive/runtime/pom.xml +++ b/extensions/resteasy-reactive/rest-client-reactive/runtime/pom.xml @@ -79,7 +79,7 @@ quarkus-extension-maven-plugin - io.quarkus.rest.client + io.quarkus.rest.client.reactive diff --git a/extensions/smallrye-graphql/deployment/pom.xml b/extensions/smallrye-graphql/deployment/pom.xml index e2feaa37733a6..81a7272060bae 100644 --- a/extensions/smallrye-graphql/deployment/pom.xml +++ b/extensions/smallrye-graphql/deployment/pom.xml @@ -83,7 +83,7 @@ io.quarkus - quarkus-resteasy-deployment + quarkus-resteasy-reactive-deployment test diff --git a/extensions/smallrye-graphql/deployment/src/test/java/io/quarkus/smallrye/graphql/deployment/RequestLeakDetectionTest.java b/extensions/smallrye-graphql/deployment/src/test/java/io/quarkus/smallrye/graphql/deployment/RequestLeakDetectionTest.java new file mode 100644 index 0000000000000..a05c20265ce85 --- /dev/null +++ b/extensions/smallrye-graphql/deployment/src/test/java/io/quarkus/smallrye/graphql/deployment/RequestLeakDetectionTest.java @@ -0,0 +1,185 @@ +package io.quarkus.smallrye.graphql.deployment; + +import static org.awaitility.Awaitility.await; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.context.RequestScoped; +import javax.inject.Inject; + +import org.eclipse.microprofile.graphql.GraphQLApi; +import org.eclipse.microprofile.graphql.Query; +import org.eclipse.microprofile.graphql.Source; +import org.jboss.shrinkwrap.api.asset.EmptyAsset; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; +import io.restassured.response.ResponseBody; +import io.smallrye.common.vertx.VertxContext; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Context; +import io.vertx.core.Vertx; + +public class RequestLeakDetectionTest extends AbstractGraphQLTest { + + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest().withApplicationRoot((jar) -> jar + .addClasses(RequestLeakDetectionTest.MyGraphQLApi.class, MyRequestScopeBean.class, Barrier.class, Task.class) + .addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml")); + + @Inject + Barrier barrier; + + @Test + public void testWithConcurrentCalls() { + List results = new CopyOnWriteArrayList<>(); + List nested = new CopyOnWriteArrayList<>(); + List nestedUni = new CopyOnWriteArrayList<>(); + int count = 100; + barrier.setMaxConcurrency(count); + for (int i = 0; i < count; i++) { + int c = i; + new Thread(() -> { + String query = getPayload("{ foo(val:" + c + ") { value nested{ value } nestedUni { value } } }"); + ResponseBody body = RestAssured.given().body(query).contentType(MEDIATYPE_JSON).post("/graphql/") + .thenReturn().body(); + String value = body.path("data.foo.value"); + String nestedValue = body.path("data.foo.nested.value"); + String nestedUniValue = body.path("data.foo.nestedUni.value"); + results.add(value); + nested.add(nestedValue); + nestedUni.add(nestedUniValue); + }).start(); + } + await().until(() -> results.size() == count); + await().until(() -> nested.size() == count); + await().until(() -> nestedUni.size() == count); + Set asSet = new HashSet<>(results); + Assertions.assertEquals(count, asSet.size()); + asSet = new HashSet<>(nested); + Assertions.assertEquals(count, asSet.size()); + asSet = new HashSet<>(nestedUni); + Assertions.assertEquals(count, asSet.size()); + } + + @ApplicationScoped + @GraphQLApi + public static class MyGraphQLApi { + + @Inject + MyRequestScopeBean bean; + + @Inject + Barrier barrier; + + @Query + public Uni foo(int val) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Vertx.currentContext().putLocal("count", val); + bean.setValue(val); + + return Uni.createFrom(). emitter(e -> { + barrier.enqueue(Vertx.currentContext(), () -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int r = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(r, val); + e.complete(bean.getValue()); + }); + }).map(i -> new Foo(Integer.toString(i))); + } + + public Foo nested(@Source Foo foo) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int r = Vertx.currentContext().getLocal("count"); + String rAsString = Integer.toString(r); + Assertions.assertEquals(rAsString, foo.value); + Assertions.assertEquals(bean.getValue(), r); + return new Foo("source field on foo " + foo.value); + } + + public Uni nestedUni(@Source Foo foo) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int r = Vertx.currentContext().getLocal("count"); + String rAsString = Integer.toString(r); + Assertions.assertEquals(rAsString, foo.value); + Assertions.assertEquals(bean.getValue(), r); + return Uni.createFrom().item(new Foo("uni source field on foo " + foo.value)); + } + + } + + @ApplicationScoped + public static class Barrier { + + private int level; + + public void setMaxConcurrency(int level) { + this.level = level; + } + + private final AtomicInteger counter = new AtomicInteger(); + private final List tasks = new CopyOnWriteArrayList<>(); + + public void enqueue(Context context, Runnable runnable) { + Task task = new Task(context, runnable); + tasks.add(task); + if (counter.incrementAndGet() >= level) { + for (Task tbr : new ArrayList<>(tasks)) { + tbr.run(); + tasks.remove(tbr); + } + } + } + } + + private static class Task { + private final Context context; + private final Runnable runnable; + + private Task(Context context, Runnable runnable) { + this.context = context; + this.runnable = runnable; + } + + void run() { + context.runOnContext(x -> runnable.run()); + } + } + + @RequestScoped + public static class MyRequestScopeBean { + + private int value = -1; + + public void setValue(int v) { + if (value != -1) { + throw new IllegalStateException("Already initialized"); + } + value = v; + } + + public int getValue() { + return value; + } + + } + + public static class Foo { + + public final String value; + + public Foo(String value) { + this.value = value; + } + } + +} diff --git a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxBuildConfig.java b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxBuildConfig.java new file mode 100644 index 0000000000000..b2c13043ae971 --- /dev/null +++ b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxBuildConfig.java @@ -0,0 +1,17 @@ +package io.quarkus.vertx.deployment; + +import io.quarkus.runtime.annotations.ConfigItem; +import io.quarkus.runtime.annotations.ConfigPhase; +import io.quarkus.runtime.annotations.ConfigRoot; + +@ConfigRoot(name = "vertx", phase = ConfigPhase.BUILD_TIME) +public class VertxBuildConfig { + + /** + * If set to {@code true} then a customized current context factory (backed by a Vert.x duplicated local context) is used + * for normal scopes in ArC. + */ + @ConfigItem(generateDocumentation = false, defaultValue = "true") + public boolean customizeArcContext; + +} diff --git a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java index 15903bb7dec40..43b5c21d71156 100644 --- a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java +++ b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java @@ -20,6 +20,7 @@ import io.quarkus.arc.deployment.AutoAddScopeBuildItem; import io.quarkus.arc.deployment.BeanRegistrationPhaseBuildItem; import io.quarkus.arc.deployment.BeanRegistrationPhaseBuildItem.BeanConfiguratorBuildItem; +import io.quarkus.arc.deployment.CurrentContextFactoryBuildItem; import io.quarkus.arc.deployment.UnremovableBeanBuildItem; import io.quarkus.arc.deployment.UnremovableBeanBuildItem.BeanClassAnnotationExclusion; import io.quarkus.arc.processor.AnnotationStore; @@ -98,6 +99,15 @@ VertxBuildItem build(CoreVertxBuildItem vertx, VertxRecorder recorder, return new VertxBuildItem(recorder.forceStart(vertx.getVertx())); } + @BuildStep + @Record(ExecutionTime.STATIC_INIT) + void currentContextFactory(BuildProducer currentContextFactory, + VertxBuildConfig buildConfig, VertxRecorder recorder) { + if (buildConfig.customizeArcContext) { + currentContextFactory.produce(new CurrentContextFactoryBuildItem(recorder.currentContextFactory())); + } + } + @BuildStep public UnremovableBeanBuildItem unremovableBeans() { return new UnremovableBeanBuildItem(new BeanClassAnnotationExclusion(CONSUME_EVENT)); diff --git a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/currentcontextfactory/VertxCurrentContextFactoryDisabledTest.java b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/currentcontextfactory/VertxCurrentContextFactoryDisabledTest.java new file mode 100644 index 0000000000000..4f17aed87680b --- /dev/null +++ b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/currentcontextfactory/VertxCurrentContextFactoryDisabledTest.java @@ -0,0 +1,23 @@ +package io.quarkus.vertx.deployment.currentcontextfactory; + +import static org.junit.jupiter.api.Assertions.assertFalse; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.arc.Arc; +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.vertx.runtime.VertxCurrentContextFactory; + +public class VertxCurrentContextFactoryDisabledTest { + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .overrideConfigKey("quarkus.vertx.customize-arc-context", "false"); + + @Test + public void testCustomizedFactoryNotUsed() { + assertFalse(Arc.container().getCurrentContextFactory() instanceof VertxCurrentContextFactory); + } + +} diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxCurrentContextFactory.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxCurrentContextFactory.java new file mode 100644 index 0000000000000..a2806ebc62899 --- /dev/null +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxCurrentContextFactory.java @@ -0,0 +1,58 @@ +package io.quarkus.vertx.runtime; + +import java.lang.annotation.Annotation; + +import io.netty.util.concurrent.FastThreadLocal; +import io.quarkus.arc.CurrentContext; +import io.quarkus.arc.CurrentContextFactory; +import io.quarkus.arc.InjectableContext; +import io.quarkus.arc.InjectableContext.ContextState; +import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle; +import io.smallrye.common.vertx.VertxContext; +import io.vertx.core.Context; +import io.vertx.core.Vertx; + +public class VertxCurrentContextFactory implements CurrentContextFactory { + + @Override + public CurrentContext create(Class scope) { + return new VertxCurrentContext<>(); + } + + private static final class VertxCurrentContext implements CurrentContext { + + private final FastThreadLocal fallback = new FastThreadLocal<>(); + + @Override + public T get() { + Context context = Vertx.currentContext(); + if (context != null && VertxContext.isDuplicatedContext(context)) { + return context.getLocal(this); + } + return fallback.get(); + } + + @Override + public void set(T state) { + Context context = Vertx.currentContext(); + if (context != null && VertxContext.isDuplicatedContext(context)) { + VertxContextSafetyToggle.setContextSafe(context, true); + context.putLocal(this, state); + } else { + fallback.set(state); + } + } + + @Override + public void remove() { + Context context = Vertx.currentContext(); + if (context != null && VertxContext.isDuplicatedContext(context)) { + // NOOP - the DC should not be shared. + // context.removeLocal(this); + } else { + fallback.remove(); + } + } + + } +} diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxRecorder.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxRecorder.java index 9fdfd9def6ec9..12dbb305919cf 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxRecorder.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxRecorder.java @@ -14,6 +14,7 @@ import org.jboss.logging.Logger; +import io.quarkus.arc.CurrentContextFactory; import io.quarkus.runtime.LaunchMode; import io.quarkus.runtime.RuntimeValue; import io.quarkus.runtime.ShutdownContext; @@ -66,6 +67,10 @@ public void run() { } } + public RuntimeValue currentContextFactory() { + return new RuntimeValue<>(new VertxCurrentContextFactory()); + } + public static Vertx getVertx() { return vertx; } diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientRequestContextImpl.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientRequestContextImpl.java index d31f5eba7ee00..56bdb0ca7e63f 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientRequestContextImpl.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientRequestContextImpl.java @@ -5,6 +5,7 @@ import io.smallrye.common.vertx.VertxContext; import io.vertx.core.Context; +import io.vertx.core.Vertx; import java.io.OutputStream; import java.lang.annotation.Annotation; import java.lang.reflect.Type; @@ -54,10 +55,16 @@ public ClientRequestContextImpl(RestClientRequestContext restClientRequestContex this.configuration = configuration; this.headersMap = new ClientRequestHeadersMap(); //restClientRequestContext.requestHeaders.getHeaders() + // TODO This needs to be challenged: // Always create a duplicated context because each REST Client invocation must have its own context // A separate context allows integrations like OTel to create a separate Span for each invocation (expected) - Context current = client.vertx.getOrCreateContext(); - this.context = VertxContext.createNewDuplicatedContext(current); + Context ctxt = Vertx.currentContext(); + if (ctxt != null && VertxContext.isDuplicatedContext(ctxt)) { + this.context = ctxt; + } else { + Context current = client.vertx.getOrCreateContext(); + this.context = VertxContext.createNewDuplicatedContext(current); + } restClientRequestContext.properties.put(VERTX_CONTEXT_PROPERTY, context); } diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/RestClientRequestContext.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/RestClientRequestContext.java index d3216ecb939a7..24db57060dace 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/RestClientRequestContext.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/RestClientRequestContext.java @@ -527,4 +527,9 @@ private Boolean getBooleanProperty(String name, Boolean defaultValue) { } return defaultValue; } + + @Override + protected boolean isRequestScopeManagementRequired() { + return false; + } } diff --git a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/core/AbstractResteasyReactiveContext.java b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/core/AbstractResteasyReactiveContext.java index 345f4cf991d46..19f98396b6824 100644 --- a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/core/AbstractResteasyReactiveContext.java +++ b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/core/AbstractResteasyReactiveContext.java @@ -130,7 +130,7 @@ public void run() { boolean processingSuspended = false; //if this is a blocking target we don't activate for the initial non-blocking part //unless there are pre-mapping filters as these may require CDI - boolean disasociateRequestScope = false; + boolean disassociateRequestScope = false; boolean aborted = false; try { while (position < handlers.length) { @@ -146,7 +146,7 @@ public void run() { running = true; if (isRequestScopeManagementRequired()) { if (requestScopeActivated) { - disasociateRequestScope = true; + disassociateRequestScope = true; requestScopeActivated = false; } } else { @@ -190,7 +190,7 @@ public void run() { } close(); } else { - if (disasociateRequestScope) { + if (disassociateRequestScope) { requestScopeDeactivated(); currentRequestScope.deactivate(); } diff --git a/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/ReactiveResource.java b/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/ReactiveResource.java index a0f6bc2099bb6..a1018019cdafd 100644 --- a/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/ReactiveResource.java +++ b/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/ReactiveResource.java @@ -30,9 +30,18 @@ public Uni helloGet(@QueryParam("name") String name) { } @GET - @Path("/multiple") - public Uni helloMultiple() { - return Uni.combine().all().unis(client.helloGet("Naruto"), client.helloGet("Goku")) + @Path("/multiple-chain") + public Uni helloMultipleUsingChain() { + return client.helloGet("Naruto") + .chain(s1 -> client.helloGet("Goku").map(s2 -> s1 + " and " + s2)); + } + + @GET + @Path("/multiple-combine") + public Uni helloMultipleUsingCombine() { + return Uni.combine().all().unis( + client.helloGet("Naruto"), + client.helloGet("Goku")) .combinedWith((s, s2) -> s + " and " + s2); } diff --git a/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveTest.java b/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveTest.java index 277ae2fcb5101..61bb0108934fd 100644 --- a/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveTest.java +++ b/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveTest.java @@ -1,20 +1,29 @@ package io.quarkus.it.opentelemetry.reactive; +import static io.opentelemetry.api.trace.SpanKind.CLIENT; +import static io.opentelemetry.api.trace.SpanKind.INTERNAL; +import static io.opentelemetry.api.trace.SpanKind.SERVER; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_TARGET; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_URL; import static io.restassured.RestAssured.given; import static io.restassured.RestAssured.when; import static java.net.HttpURLConnection.HTTP_OK; +import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import io.opentelemetry.api.trace.SpanKind; import io.quarkus.test.junit.QuarkusTest; import io.restassured.common.mapper.TypeRef; @@ -29,7 +38,6 @@ void reset() { @Test void get() { given() - .contentType("application/json") .when() .queryParam("name", "Naruto") .get("/reactive") @@ -46,7 +54,6 @@ void get() { @Test void post() { given() - .contentType("application/json") .when() .body("Naruto") .post("/reactive") @@ -61,11 +68,10 @@ void post() { } @Test - void multiple() { + void multipleUsingChain() { given() - .contentType("application/json") .when() - .get("/reactive/multiple") + .get("/reactive/multiple-chain") .then() .statusCode(200) .body(equalTo("Hello Naruto and Hello Goku")); @@ -75,10 +81,106 @@ void multiple() { List> spans = getSpans(); assertEquals(7, spans.size()); assertEquals(1, spans.stream().map(map -> map.get("traceId")).collect(toSet()).size()); + + // First span is the call getting into the server. It does not have a parent span. + Map parent = getSpanByKindAndParentId(spans, SERVER, "0000000000000000"); + + // We should get 2 client spans originated by the server + List> clientSpans = getSpansByKindAndParentId(spans, CLIENT, parent.get("spanId")); + assertEquals(2, clientSpans.size()); + + // Each client calls the server and programmatically create a span, so each have a server and an internal span + + // Naruto Span + Optional> narutoSpan = clientSpans.stream() + .filter(map -> ((String) ((Map) map.get("attributes")).get(HTTP_URL.getKey())).contains("Naruto")) + .findFirst(); + assertTrue(narutoSpan.isPresent()); + Map naruto = narutoSpan.get(); + + Map narutoServer = getSpanByKindAndParentId(spans, SERVER, naruto.get("spanId")); + assertEquals("/reactive?name=Naruto", ((Map) narutoServer.get("attributes")).get(HTTP_TARGET.getKey())); + Map narutoInternal = getSpanByKindAndParentId(spans, INTERNAL, narutoServer.get("spanId")); + assertEquals("helloGet", narutoInternal.get("name")); + + // Goku Span + Optional> gokuSpan = clientSpans.stream() + .filter(map -> ((String) ((Map) map.get("attributes")).get(HTTP_URL.getKey())).contains("Goku")) + .findFirst(); + assertTrue(gokuSpan.isPresent()); + Map goku = gokuSpan.get(); + + Map gokuServer = getSpanByKindAndParentId(spans, SERVER, goku.get("spanId")); + assertEquals("/reactive?name=Goku", ((Map) gokuServer.get("attributes")).get(HTTP_TARGET.getKey())); + Map gokuInternal = getSpanByKindAndParentId(spans, INTERNAL, gokuServer.get("spanId")); + assertEquals("helloGet", gokuInternal.get("name")); + } + + @Test + void multipleUsingCombine() { + given() + .when() + .get("/reactive/multiple-combine") + .then() + .statusCode(200) + .body(equalTo("Hello Naruto and Hello Goku")); + + await().atMost(5, TimeUnit.SECONDS).until(() -> getSpans().size() == 7); + + List> spans = getSpans(); + assertEquals(7, spans.size()); + assertEquals(1, spans.stream().map(map -> map.get("traceId")).collect(toSet()).size()); + + // First span is the call getting into the server. It does not have a parent span. + Map parent = getSpanByKindAndParentId(spans, SERVER, "0000000000000000"); + + // We should get 2 client spans originated by the server + List> clientSpans = getSpansByKindAndParentId(spans, CLIENT, parent.get("spanId")); + assertEquals(2, clientSpans.size()); + + // Each client calls the server and programmatically create a span, so each have a server and an internal span + + // Naruto Span + Optional> narutoSpan = clientSpans.stream() + .filter(map -> ((String) ((Map) map.get("attributes")).get(HTTP_URL.getKey())).contains("Naruto")) + .findFirst(); + assertTrue(narutoSpan.isPresent()); + Map naruto = narutoSpan.get(); + + Map narutoServer = getSpanByKindAndParentId(spans, SERVER, naruto.get("spanId")); + assertEquals("/reactive?name=Naruto", ((Map) narutoServer.get("attributes")).get(HTTP_TARGET.getKey())); + Map narutoInternal = getSpanByKindAndParentId(spans, INTERNAL, narutoServer.get("spanId")); + assertEquals("helloGet", narutoInternal.get("name")); + + // Goku Span + Optional> gokuSpan = clientSpans.stream() + .filter(map -> ((String) ((Map) map.get("attributes")).get(HTTP_URL.getKey())).contains("Goku")) + .findFirst(); + assertTrue(gokuSpan.isPresent()); + Map goku = gokuSpan.get(); + + Map gokuServer = getSpanByKindAndParentId(spans, SERVER, goku.get("spanId")); + assertEquals("/reactive?name=Goku", ((Map) gokuServer.get("attributes")).get(HTTP_TARGET.getKey())); + Map gokuInternal = getSpanByKindAndParentId(spans, INTERNAL, gokuServer.get("spanId")); + assertEquals("helloGet", gokuInternal.get("name")); } private static List> getSpans() { return when().get("/export").body().as(new TypeRef<>() { }); } + + private static Map getSpanByKindAndParentId(List> spans, SpanKind kind, + Object parentSpanId) { + List> span = getSpansByKindAndParentId(spans, kind, parentSpanId); + assertEquals(1, span.size()); + return span.get(0); + } + + private static List> getSpansByKindAndParentId(List> spans, SpanKind kind, + Object parentSpanId) { + return spans.stream() + .filter(map -> map.get("kind").equals(kind.toString())) + .filter(map -> map.get("parentSpanId").equals(parentSpanId)).collect(toList()); + } }