From 5661a27ce0d477560d81708739d82c6d425bd5df Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Mon, 28 Aug 2023 09:21:41 +0200 Subject: [PATCH] Integrate @RunOnVirtualThread with the @ConsumeEvent Allows @ConsumeEvent method to run on a virtual thread. Also verify that sending and receiving from the event bus is not pinning the carrier thread. --- .github/virtual-threads-tests.json | 2 +- docs/src/main/asciidoc/vertx-reference.adoc | 9 +++ extensions/vertx/deployment/pom.xml | 4 + .../vertx/deployment/EventBusConsumer.java | 12 ++- .../vertx/deployment/VertxProcessor.java | 13 +++- extensions/vertx/runtime/pom.xml | 4 + .../vertx/runtime/EventConsumerInvoker.java | 10 ++- ...ava => VertxEventBusConsumerRecorder.java} | 64 +++++++++++----- .../vertx/runtime/VertxProducerTest.java | 4 +- extensions/virtual-threads/deployment/pom.xml | 4 - extensions/virtual-threads/runtime/pom.xml | 19 ++--- integration-tests/virtual-threads/pom.xml | 1 + .../vertx-event-bus-virtual-threads/pom.xml | 73 ++++++++++++++++++ .../quarkus/virtual/vertx/AssertHelper.java | 71 +++++++++++++++++ .../virtual/vertx/EventBusConsumer.java | 30 ++++++++ .../virtual/vertx/VertxEventBusResource.java | 38 ++++++++++ .../src/main/resources/application.properties | 1 + .../virtual/vertx/NoPinningVerify.java | 76 +++++++++++++++++++ .../virtual/vertx/RunOnVirtualThreadIT.java | 8 ++ .../virtual/vertx/RunOnVirtualThreadTest.java | 35 +++++++++ 20 files changed, 435 insertions(+), 43 deletions(-) rename extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/{VertxRecorder.java => VertxEventBusConsumerRecorder.java} (76%) create mode 100644 integration-tests/virtual-threads/vertx-event-bus-virtual-threads/pom.xml create mode 100644 integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/main/java/io/quarkus/virtual/vertx/AssertHelper.java create mode 100644 integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/main/java/io/quarkus/virtual/vertx/EventBusConsumer.java create mode 100644 integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/main/java/io/quarkus/virtual/vertx/VertxEventBusResource.java create mode 100644 integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/main/resources/application.properties create mode 100644 integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/test/java/io/quarkus/virtual/vertx/NoPinningVerify.java create mode 100644 integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/test/java/io/quarkus/virtual/vertx/RunOnVirtualThreadIT.java create mode 100644 integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/test/java/io/quarkus/virtual/vertx/RunOnVirtualThreadTest.java diff --git a/.github/virtual-threads-tests.json b/.github/virtual-threads-tests.json index 22fd2166fa3177..7230a628bf3c94 100644 --- a/.github/virtual-threads-tests.json +++ b/.github/virtual-threads-tests.json @@ -3,7 +3,7 @@ { "category": "Main", "timeout": 45, - "test-modules": "grpc-virtual-threads, mailer-virtual-threads, redis-virtual-threads, rest-client-reactive-virtual-threads, resteasy-reactive-virtual-threads", + "test-modules": "grpc-virtual-threads, mailer-virtual-threads, redis-virtual-threads, rest-client-reactive-virtual-threads, resteasy-reactive-virtual-threads, vertx-event-bus-virtual-threads", "os-name": "ubuntu-latest" }, { diff --git a/docs/src/main/asciidoc/vertx-reference.adoc b/docs/src/main/asciidoc/vertx-reference.adoc index 6e0a63757ef361..38f84c0ceee25e 100644 --- a/docs/src/main/asciidoc/vertx-reference.adoc +++ b/docs/src/main/asciidoc/vertx-reference.adoc @@ -694,6 +694,15 @@ Uni response = bus.request("address", "hello, how are you?") .onItem().transform(Message::body); ---- +=== Process events on virtual threads + +Methods annotated with `@ConsumeEvent` can also be annotated with `@RunOnVirtualThread`. +In this case, the method is invoked on a virtual thread. +Each event is invoked on a different virtual thread. + +The method must be _blocking_ and your Java runtime must provide support for virtual threads. +Read xref:./virtual-threads.adoc[the virtual thread guide] for more details. + === Use codecs The https://vertx.io/docs/vertx-core/java/#event_bus[Vert.x Event Bus] uses codecs to _serialize_ and _deserialize_ objects. diff --git a/extensions/vertx/deployment/pom.xml b/extensions/vertx/deployment/pom.xml index 1066bf68b64b18..aad294e6680e0f 100644 --- a/extensions/vertx/deployment/pom.xml +++ b/extensions/vertx/deployment/pom.xml @@ -25,6 +25,10 @@ io.quarkus quarkus-vertx + + io.quarkus + quarkus-virtual-threads-deployment + io.quarkus quarkus-mutiny-deployment diff --git a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/EventBusConsumer.java b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/EventBusConsumer.java index a4069c07d46096..b21eed01ccd400 100644 --- a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/EventBusConsumer.java +++ b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/EventBusConsumer.java @@ -37,6 +37,7 @@ import io.quarkus.runtime.util.HashUtil; import io.quarkus.vertx.runtime.EventConsumerInvoker; import io.smallrye.common.annotation.Blocking; +import io.smallrye.common.annotation.RunOnVirtualThread; import io.smallrye.mutiny.Uni; import io.vertx.core.MultiMap; import io.vertx.core.eventbus.Message; @@ -78,6 +79,7 @@ class EventBusConsumer { protected static final MethodDescriptor THROWABLE_TO_STRING = MethodDescriptor .ofMethod(Throwable.class, "toString", String.class); protected static final DotName BLOCKING = DotName.createSimple(Blocking.class.getName()); + protected static final DotName RUN_ON_VIRTUAL_THREAD = DotName.createSimple(RunOnVirtualThread.class.getName()); static String generateInvoker(BeanInfo bean, MethodInfo method, AnnotationInstance consumeEvent, @@ -100,9 +102,10 @@ static String generateInvoker(BeanInfo bean, MethodInfo method, String generatedName = targetPackage + baseName + INVOKER_SUFFIX + "_" + method.name() + "_" + HashUtil.sha1(sigBuilder.toString()); - boolean blocking; + boolean blocking, runOnVirtualThread; AnnotationValue blockingValue = consumeEvent.value("blocking"); blocking = method.hasAnnotation(BLOCKING) || (blockingValue != null && blockingValue.asBoolean()); + runOnVirtualThread = method.hasAnnotation(RUN_ON_VIRTUAL_THREAD); ClassCreator invokerCreator = ClassCreator.builder().classOutput(classOutput).className(generatedName) .superClass(EventConsumerInvoker.class).build(); @@ -113,11 +116,16 @@ static String generateInvoker(BeanInfo bean, MethodInfo method, FieldCreator containerField = invokerCreator.getFieldCreator("container", ArcContainer.class) .setModifiers(ACC_PRIVATE | ACC_FINAL); - if (blocking) { + if (blocking || runOnVirtualThread) { MethodCreator isBlocking = invokerCreator.getMethodCreator("isBlocking", boolean.class); isBlocking.returnValue(isBlocking.load(true)); } + if (runOnVirtualThread) { + MethodCreator isRunOnVirtualThread = invokerCreator.getMethodCreator("isRunningOnVirtualThread", boolean.class); + isRunOnVirtualThread.returnValue(isRunOnVirtualThread.load(true)); + } + AnnotationValue orderedValue = consumeEvent.value("ordered"); boolean ordered = orderedValue != null && orderedValue.asBoolean(); if (ordered) { diff --git a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java index 389d8374682665..0fd069a0777e0e 100644 --- a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java +++ b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java @@ -49,8 +49,9 @@ import io.quarkus.gizmo.ClassOutput; import io.quarkus.vertx.ConsumeEvent; import io.quarkus.vertx.core.deployment.CoreVertxBuildItem; +import io.quarkus.vertx.runtime.VertxEventBusConsumerRecorder; import io.quarkus.vertx.runtime.VertxProducer; -import io.quarkus.vertx.runtime.VertxRecorder; +import io.smallrye.common.annotation.RunOnVirtualThread; class VertxProcessor { @@ -68,7 +69,7 @@ AdditionalBeanBuildItem registerBean() { @BuildStep @Record(ExecutionTime.RUNTIME_INIT) - VertxBuildItem build(CoreVertxBuildItem vertx, VertxRecorder recorder, + VertxBuildItem build(CoreVertxBuildItem vertx, VertxEventBusConsumerRecorder recorder, List messageConsumerBusinessMethods, BuildProducer generatedClass, AnnotationProxyBuildItem annotationProxy, LaunchModeBuildItem launchMode, ShutdownContextBuildItem shutdown, @@ -102,7 +103,7 @@ VertxBuildItem build(CoreVertxBuildItem vertx, VertxRecorder recorder, @BuildStep @Record(ExecutionTime.STATIC_INIT) void currentContextFactory(BuildProducer currentContextFactory, - VertxBuildConfig buildConfig, VertxRecorder recorder) { + VertxBuildConfig buildConfig, VertxEventBusConsumerRecorder recorder) { if (buildConfig.customizeArcContext()) { currentContextFactory.produce(new CurrentContextFactoryBuildItem(recorder.currentContextFactory())); } @@ -150,6 +151,12 @@ void collectEventConsumers( "An event consumer business method that accepts io.vertx.core.eventbus.Message or io.vertx.mutiny.core.eventbus.Message must return void [method: %s, bean:%s]", method, bean)); } + if (method.hasAnnotation(RunOnVirtualThread.class) && consumeEvent.value("ordered") != null + && consumeEvent.value("ordered").asBoolean()) { + throw new IllegalStateException(String.format( + "An event consumer business method that cannot use @RunOnVirtualThread and set the ordered attribute to true [method: %s, bean:%s]", + method, bean)); + } messageConsumerBusinessMethods .produce(new EventConsumerBusinessMethodItem(bean, method, consumeEvent)); LOGGER.debugf("Found event consumer business method %s declared on %s", method, bean); diff --git a/extensions/vertx/runtime/pom.xml b/extensions/vertx/runtime/pom.xml index 156d6bd29def2c..bc3a997e767ec5 100644 --- a/extensions/vertx/runtime/pom.xml +++ b/extensions/vertx/runtime/pom.xml @@ -42,6 +42,10 @@ io.quarkus quarkus-mutiny + + io.quarkus + quarkus-virtual-threads + io.quarkus quarkus-vertx-latebound-mdc-provider diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/EventConsumerInvoker.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/EventConsumerInvoker.java index 38acccc61684fe..24b8f18ef01665 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/EventConsumerInvoker.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/EventConsumerInvoker.java @@ -18,6 +18,10 @@ public boolean isBlocking() { return false; } + public boolean isRunningOnVirtualThread() { + return false; + } + public boolean isOrdered() { return false; } @@ -77,7 +81,7 @@ public void accept(Object result, Throwable failure) { if (failure != null) { if (message.replyAddress() == null) { // No reply handler - throw VertxRecorder.wrapIfNecessary(failure); + throw VertxEventBusConsumerRecorder.wrapIfNecessary(failure); } else { message.fail(ConsumeEvent.EXPLICIT_FAILURE_CODE, failure.getMessage()); } @@ -105,12 +109,12 @@ public void accept(Object result, Throwable failure) { try { requestContext.destroy(endState); } catch (Exception e) { - throw VertxRecorder.wrapIfNecessary(e); + throw VertxEventBusConsumerRecorder.wrapIfNecessary(e); } if (failure != null) { if (message.replyAddress() == null) { // No reply handler - throw VertxRecorder.wrapIfNecessary(failure); + throw VertxEventBusConsumerRecorder.wrapIfNecessary(failure); } else { message.fail(ConsumeEvent.EXPLICIT_FAILURE_CODE, failure.getMessage()); } diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxRecorder.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java similarity index 76% rename from extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxRecorder.java rename to extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java index bd2e203ea3dd02..5c9539b93c6512 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxRecorder.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java @@ -21,6 +21,7 @@ import io.quarkus.runtime.annotations.Recorder; import io.quarkus.runtime.configuration.ProfileManager; import io.quarkus.vertx.ConsumeEvent; +import io.quarkus.virtual.threads.VirtualThreadsRecorder; import io.smallrye.common.vertx.VertxContext; import io.vertx.core.AsyncResult; import io.vertx.core.Context; @@ -35,17 +36,17 @@ import io.vertx.core.impl.VertxInternal; @Recorder -public class VertxRecorder { +public class VertxEventBusConsumerRecorder { - private static final Logger LOGGER = Logger.getLogger(VertxRecorder.class.getName()); + private static final Logger LOGGER = Logger.getLogger(VertxEventBusConsumerRecorder.class.getName()); static volatile Vertx vertx; static volatile List> messageConsumers; public void configureVertx(Supplier vertx, Map messageConsumerConfigurations, LaunchMode launchMode, ShutdownContext shutdown, Map, Class> codecByClass) { - VertxRecorder.vertx = vertx.get(); - VertxRecorder.messageConsumers = new CopyOnWriteArrayList<>(); + VertxEventBusConsumerRecorder.vertx = vertx.get(); + VertxEventBusConsumerRecorder.messageConsumers = new CopyOnWriteArrayList<>(); registerMessageConsumers(messageConsumerConfigurations); registerCodecs(codecByClass); @@ -83,7 +84,7 @@ void destroy() { void registerMessageConsumers(Map messageConsumerConfigurations) { if (!messageConsumerConfigurations.isEmpty()) { EventBus eventBus = vertx.eventBus(); - VertxInternal vi = (VertxInternal) VertxRecorder.vertx; + VertxInternal vi = (VertxInternal) VertxEventBusConsumerRecorder.vertx; CountDownLatch latch = new CountDownLatch(messageConsumerConfigurations.size()); final List registrationFailures = new ArrayList<>(); for (Entry entry : messageConsumerConfigurations.entrySet()) { @@ -110,22 +111,47 @@ public void handle(Message m) { // We need to create a duplicated context from the "context" Context dup = VertxContext.getOrCreateDuplicatedContext(context); setContextSafe(dup, true); - dup.executeBlocking(new Handler>() { - @Override - public void handle(Promise event) { - try { - invoker.invoke(m); - } catch (Exception e) { - if (m.replyAddress() == null) { - // No reply handler - throw wrapIfNecessary(e); - } else { - m.fail(ConsumeEvent.FAILURE_CODE, e.toString()); + + if (invoker.isRunningOnVirtualThread()) { + // Switch to a Vert.x context to capture it and use it during the invocation. + dup.runOnContext(new Handler() { + @Override + public void handle(Void event) { + VirtualThreadsRecorder.getCurrent().execute(new Runnable() { + @Override + public void run() { + try { + invoker.invoke(m); + } catch (Exception e) { + if (m.replyAddress() == null) { + // No reply handler + throw wrapIfNecessary(e); + } else { + m.fail(ConsumeEvent.FAILURE_CODE, e.toString()); + } + } + } + }); + } + }); + } else { + dup.executeBlocking(new Handler>() { + @Override + public void handle(Promise event) { + try { + invoker.invoke(m); + } catch (Exception e) { + if (m.replyAddress() == null) { + // No reply handler + throw wrapIfNecessary(e); + } else { + m.fail(ConsumeEvent.FAILURE_CODE, e.toString()); + } } + event.complete(); } - event.complete(); - } - }, invoker.isOrdered(), null); + }, invoker.isOrdered(), null); + } } else { // Will run on the context used for the consumer registration. // It's a duplicated context, but we need to mark it as safe. diff --git a/extensions/vertx/runtime/src/test/java/io/quarkus/vertx/runtime/VertxProducerTest.java b/extensions/vertx/runtime/src/test/java/io/quarkus/vertx/runtime/VertxProducerTest.java index a1485b6fbc5f74..dae6077d49eaa8 100644 --- a/extensions/vertx/runtime/src/test/java/io/quarkus/vertx/runtime/VertxProducerTest.java +++ b/extensions/vertx/runtime/src/test/java/io/quarkus/vertx/runtime/VertxProducerTest.java @@ -13,13 +13,13 @@ public class VertxProducerTest { - private VertxRecorder recorder; + private VertxEventBusConsumerRecorder recorder; private VertxProducer producer; @BeforeEach public void setUp() { producer = new VertxProducer(); - recorder = new VertxRecorder(); + recorder = new VertxEventBusConsumerRecorder(); } @AfterEach diff --git a/extensions/virtual-threads/deployment/pom.xml b/extensions/virtual-threads/deployment/pom.xml index 67a18d2b614798..3a2dfb35503159 100644 --- a/extensions/virtual-threads/deployment/pom.xml +++ b/extensions/virtual-threads/deployment/pom.xml @@ -27,10 +27,6 @@ io.quarkus quarkus-core-deployment - - io.quarkus - quarkus-vertx-deployment - diff --git a/extensions/virtual-threads/runtime/pom.xml b/extensions/virtual-threads/runtime/pom.xml index 948c7ce99de647..56e0b36cb510d6 100644 --- a/extensions/virtual-threads/runtime/pom.xml +++ b/extensions/virtual-threads/runtime/pom.xml @@ -25,8 +25,9 @@ quarkus-arc - io.quarkus - quarkus-vertx + + io.vertx + vertx-core io.quarkus @@ -58,13 +59,13 @@ - - org.apache.maven.plugins - maven-surefire-plugin - - --enable-preview - - + + org.apache.maven.plugins + maven-surefire-plugin + + --enable-preview + + diff --git a/integration-tests/virtual-threads/pom.xml b/integration-tests/virtual-threads/pom.xml index 930f59067da085..43d45f5191b9b0 100644 --- a/integration-tests/virtual-threads/pom.xml +++ b/integration-tests/virtual-threads/pom.xml @@ -32,6 +32,7 @@ kafka-virtual-threads amqp-virtual-threads jms-virtual-threads + vertx-event-bus-virtual-threads diff --git a/integration-tests/virtual-threads/vertx-event-bus-virtual-threads/pom.xml b/integration-tests/virtual-threads/vertx-event-bus-virtual-threads/pom.xml new file mode 100644 index 00000000000000..52ce0825438bf7 --- /dev/null +++ b/integration-tests/virtual-threads/vertx-event-bus-virtual-threads/pom.xml @@ -0,0 +1,73 @@ + + + 4.0.0 + + + quarkus-virtual-threads-integration-tests-parent + io.quarkus + 999-SNAPSHOT + + + quarkus-integration-test-virtual-threads-vertx-event-bus + Quarkus - Integration Tests - Virtual Threads - Vert.x Event Bus + + + + io.quarkus + quarkus-resteasy-reactive-jackson + + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + org.awaitility + awaitility + test + + + org.assertj + assertj-core + test + + + + + + io.quarkus + quarkus-resteasy-reactive-jackson-deployment + ${project.version} + pom + test + + + * + * + + + + + + + + + io.quarkus + quarkus-maven-plugin + + + org.apache.maven.plugins + maven-surefire-plugin + + + + + diff --git a/integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/main/java/io/quarkus/virtual/vertx/AssertHelper.java b/integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/main/java/io/quarkus/virtual/vertx/AssertHelper.java new file mode 100644 index 00000000000000..ff75aa955dfa58 --- /dev/null +++ b/integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/main/java/io/quarkus/virtual/vertx/AssertHelper.java @@ -0,0 +1,71 @@ +package io.quarkus.virtual.vertx; + +import java.lang.reflect.Method; + +import io.quarkus.arc.Arc; +import io.smallrye.common.vertx.VertxContext; +import io.vertx.core.Vertx; + +public class AssertHelper { + + /** + * Asserts that the current method: + * - runs on a duplicated context + * - runs on a virtual thread + * - has the request scope activated + */ + public static void assertEverything() { + assertThatTheRequestScopeIsActive(); + assertThatItRunsOnVirtualThread(); + assertThatItRunsOnADuplicatedContext(); + } + + public static void assertThatTheRequestScopeIsActive() { + if (!Arc.container().requestContext().isActive()) { + throw new AssertionError(("Expected the request scope to be active")); + } + } + + public static void assertThatItRunsOnADuplicatedContext() { + var context = Vertx.currentContext(); + if (context == null) { + throw new AssertionError("The method does not run on a Vert.x context"); + } + if (!VertxContext.isOnDuplicatedContext()) { + throw new AssertionError("The method does not run on a Vert.x **duplicated** context"); + } + } + + public static void assertThatItRunsOnVirtualThread() { + // We cannot depend on a Java 20. + try { + Method isVirtual = Thread.class.getMethod("isVirtual"); + isVirtual.setAccessible(true); + boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread()); + if (!virtual) { + throw new AssertionError("Thread " + Thread.currentThread() + " is not a virtual thread"); + } + } catch (Exception e) { + throw new AssertionError( + "Thread " + Thread.currentThread() + " is not a virtual thread - cannot invoke Thread.isVirtual()", e); + } + } + + public static void assertNotOnVirtualThread() { + // We cannot depend on a Java 20. + try { + Method isVirtual = Thread.class.getMethod("isVirtual"); + isVirtual.setAccessible(true); + boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread()); + if (virtual) { + throw new AssertionError("Thread " + Thread.currentThread() + " is a virtual thread"); + } + } catch (Exception e) { + // Trying using Thread name. + var name = Thread.currentThread().toString(); + if (name.toLowerCase().contains("virtual")) { + throw new AssertionError("Thread " + Thread.currentThread() + " seems to be a virtual thread"); + } + } + } +} diff --git a/integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/main/java/io/quarkus/virtual/vertx/EventBusConsumer.java b/integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/main/java/io/quarkus/virtual/vertx/EventBusConsumer.java new file mode 100644 index 00000000000000..04dec854bc8b91 --- /dev/null +++ b/integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/main/java/io/quarkus/virtual/vertx/EventBusConsumer.java @@ -0,0 +1,30 @@ +package io.quarkus.virtual.vertx; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import jakarta.enterprise.context.ApplicationScoped; + +import io.quarkus.vertx.ConsumeEvent; +import io.smallrye.common.annotation.RunOnVirtualThread; + +@ApplicationScoped +public class EventBusConsumer { + + public static final List ONE_WAY = new CopyOnWriteArrayList<>(); + + @ConsumeEvent("one-way") + @RunOnVirtualThread + void receive(String m) { + AssertHelper.assertEverything(); + ONE_WAY.add(m); + } + + @ConsumeEvent("request-reply") + @RunOnVirtualThread + String process(String m) { + AssertHelper.assertEverything(); + return m.toUpperCase(); + } + +} diff --git a/integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/main/java/io/quarkus/virtual/vertx/VertxEventBusResource.java b/integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/main/java/io/quarkus/virtual/vertx/VertxEventBusResource.java new file mode 100644 index 00000000000000..7a5c5128cb34be --- /dev/null +++ b/integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/main/java/io/quarkus/virtual/vertx/VertxEventBusResource.java @@ -0,0 +1,38 @@ +package io.quarkus.virtual.vertx; + +import java.time.Duration; +import java.util.List; + +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; + +import io.smallrye.common.annotation.RunOnVirtualThread; +import io.vertx.mutiny.core.eventbus.EventBus; + +@RunOnVirtualThread +@Path("/") +public class VertxEventBusResource { + + @Inject + EventBus bus; + + @GET + @Path("/one-way") + public void sentOneWay() { + bus.send("one-way", "hello"); + } + + @GET + @Path("/one-way-verify") + public List oneWayVerification() { + return EventBusConsumer.ONE_WAY; + } + + @GET + @Path("/request-reply") + public String requestReply() { + return bus. request("request-reply", "hello").map(m -> m.body()).await().atMost(Duration.ofSeconds(10)); + } + +} diff --git a/integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/main/resources/application.properties b/integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/main/resources/application.properties new file mode 100644 index 00000000000000..0bdbbf4a0cfe71 --- /dev/null +++ b/integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/main/resources/application.properties @@ -0,0 +1 @@ +quarkus.native.additional-build-args=--enable-preview diff --git a/integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/test/java/io/quarkus/virtual/vertx/NoPinningVerify.java b/integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/test/java/io/quarkus/virtual/vertx/NoPinningVerify.java new file mode 100644 index 00000000000000..84f96f5700fd53 --- /dev/null +++ b/integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/test/java/io/quarkus/virtual/vertx/NoPinningVerify.java @@ -0,0 +1,76 @@ +package io.quarkus.virtual.vertx; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.w3c.dom.Document; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.xml.sax.SAXException; + +/** + * An integration test reading the output of the unit test to verify that no tests where pinning the carrier thread. + * It reads the reports generated by surefire. + */ +public class NoPinningVerify { + + @Test + void verify() throws IOException, ParserConfigurationException, SAXException { + var reports = new File("target", "surefire-reports"); + Assertions.assertTrue(reports.isDirectory(), + "Unable to find " + reports.getAbsolutePath() + ", did you run the tests with Maven before?"); + var list = reports.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.startsWith("TEST") && name.endsWith("Test.xml"); + } + }); + Assertions.assertNotNull(list, + "Unable to find " + reports.getAbsolutePath() + ", did you run the tests with Maven before?"); + + for (File report : list) { + Document document = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(report); + var suite = document.getFirstChild(); + var cases = getChildren(suite.getChildNodes(), "testcase"); + for (Node c : cases) { + verify(report, c); + } + } + + } + + private void verify(File file, Node ca) { + var fullname = ca.getAttributes().getNamedItem("classname").getTextContent() + "." + + ca.getAttributes().getNamedItem("name").getTextContent(); + var output = getChildren(ca.getChildNodes(), "system-out"); + if (output.isEmpty()) { + return; + } + var sout = output.get(0).getTextContent(); + if (sout.contains("VThreadContinuation.onPinned")) { + throw new AssertionError("The test case " + fullname + " pinned the carrier thread, check " + file.getAbsolutePath() + + " for details (or the log of the test)"); + } + + } + + private List getChildren(NodeList nodes, String name) { + List list = new ArrayList<>(); + for (int i = 0; i < nodes.getLength(); i++) { + var node = nodes.item(i); + if (node.getNodeName().equalsIgnoreCase(name)) { + list.add(node); + } + } + return list; + } + +} diff --git a/integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/test/java/io/quarkus/virtual/vertx/RunOnVirtualThreadIT.java b/integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/test/java/io/quarkus/virtual/vertx/RunOnVirtualThreadIT.java new file mode 100644 index 00000000000000..fd37b78557ffd5 --- /dev/null +++ b/integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/test/java/io/quarkus/virtual/vertx/RunOnVirtualThreadIT.java @@ -0,0 +1,8 @@ +package io.quarkus.virtual.vertx; + +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +class RunOnVirtualThreadIT extends RunOnVirtualThreadTest { + +} diff --git a/integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/test/java/io/quarkus/virtual/vertx/RunOnVirtualThreadTest.java b/integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/test/java/io/quarkus/virtual/vertx/RunOnVirtualThreadTest.java new file mode 100644 index 00000000000000..599e02b94dc293 --- /dev/null +++ b/integration-tests/virtual-threads/vertx-event-bus-virtual-threads/src/test/java/io/quarkus/virtual/vertx/RunOnVirtualThreadTest.java @@ -0,0 +1,35 @@ +package io.quarkus.virtual.vertx; + +import static org.hamcrest.Matchers.is; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.RestAssured; + +@QuarkusTest +class RunOnVirtualThreadTest { + + @Test + @Timeout(10) + void testOneWay() { + RestAssured.get("/one-way").then() + .assertThat().statusCode(204); + + Awaitility.await().untilAsserted(() -> { + RestAssured.get("/one-way-verify").then() + .assertThat().statusCode(200) + .body("size()", is(1)); + }); + } + + @Test + @Timeout(10) + void testRequestReply() { + RestAssured.get("/request-reply").then() + .assertThat().statusCode(200) + .body(is("HELLO")); + } +}