diff --git a/.github/virtual-threads-tests.json b/.github/virtual-threads-tests.json index 7230a628bf3c9..30b0dbe464441 100644 --- a/.github/virtual-threads-tests.json +++ b/.github/virtual-threads-tests.json @@ -2,8 +2,8 @@ "include": [ { "category": "Main", - "timeout": 45, - "test-modules": "grpc-virtual-threads, mailer-virtual-threads, redis-virtual-threads, rest-client-reactive-virtual-threads, resteasy-reactive-virtual-threads, vertx-event-bus-virtual-threads", + "timeout": 50, + "test-modules": "grpc-virtual-threads, mailer-virtual-threads, redis-virtual-threads, rest-client-reactive-virtual-threads, resteasy-reactive-virtual-threads, vertx-event-bus-virtual-threads, scheduler-virtual-threads, quartz-virtual-threads", "os-name": "ubuntu-latest" }, { diff --git a/docs/src/main/asciidoc/quartz.adoc b/docs/src/main/asciidoc/quartz.adoc index 5f8182ac9b96c..7bb4f8b913ef7 100644 --- a/docs/src/main/asciidoc/quartz.adoc +++ b/docs/src/main/asciidoc/quartz.adoc @@ -464,6 +464,18 @@ public class MyListenerManager { } ---- +[[virtual-threads]] +== Run scheduled methods on virtual threads + +Methods annotated with `@Scheduled` can also be annotated with `@RunOnVirtualThread`. +In this case, the method is invoked on a virtual thread. + +The method must return `void` and your Java runtime must provide support for virtual threads. +Read xref:./virtual-threads.adoc[the virtual thread guide] for more details. + +WARNING: This feature cannot be combined with the `run-blocking-method-on-quartz-thread` option. +If `run-blocking-method-on-quartz-thread` is set, the scheduled method runs on a (platform) thread managed by Quartz. + [[quartz-configuration-reference]] == Quartz Configuration Reference diff --git a/docs/src/main/asciidoc/scheduler-reference.adoc b/docs/src/main/asciidoc/scheduler-reference.adoc index 93ea045bf50bd..f32e6f534d0e0 100644 --- a/docs/src/main/asciidoc/scheduler-reference.adoc +++ b/docs/src/main/asciidoc/scheduler-reference.adoc @@ -417,6 +417,14 @@ If the xref:smallrye-metrics.adoc[SmallRye Metrics extension] is present, then a If `quarkus.scheduler.tracing.enabled` is set to `true` and the xref:opentelemetry.adoc[OpenTelemetry extension] is present then the `@io.opentelemetry.instrumentation.annotations.WithSpan` annotation is added automatically to every `@Scheduled` method. As a result, each execution of this method has a new `io.opentelemetry.api.trace.Span` associated. +== Run @Scheduled methods on virtual threads + +Methods annotated with `@Scheduled` can also be annotated with `@RunOnVirtualThread`. +In this case, the method is invoked on a virtual thread. + +The method must return `void` and your Java runtime must provide support for virtual threads. +Read xref:./virtual-threads.adoc[the virtual thread guide] for more details. + == Configuration Reference include::{generated-dir}/config/quarkus-scheduler.adoc[leveloffset=+1, opts=optional] diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java index 0490e6f68ef1b..bbd7c36e9adc3 100644 --- a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java @@ -88,6 +88,7 @@ import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig.StartMode; import io.quarkus.scheduler.runtime.SimpleScheduler; import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle; +import io.quarkus.virtual.threads.VirtualThreadsRecorder; import io.smallrye.common.vertx.VertxContext; import io.vertx.core.Context; import io.vertx.core.Handler; @@ -782,6 +783,11 @@ public CompletionStage invokeBean(ScheduledExecution execution) { return CompletableFuture.failedStage(e); } } + + @Override + public boolean isRunningOnVirtualThread() { + return runOnVirtualThread; + } }; } else { invoker = new DefaultInvoker() { @@ -868,17 +874,38 @@ public void execute(JobExecutionContext jobExecutionContext) throws JobExecution } else { Context context = VertxContext.getOrCreateDuplicatedContext(vertx); VertxContextSafetyToggle.setContextSafe(context, true); - context.executeBlocking(new Handler>() { - @Override - public void handle(Promise p) { - try { - trigger.invoker.invoke(new QuartzScheduledExecution(trigger, jobExecutionContext)); - p.complete(); - } catch (Exception e) { - p.tryFail(e); + if (trigger.invoker.isRunningOnVirtualThread()) { + // While counter-intuitive, we switch to a safe context, so that context is captured and attached + // to the virtual thread. + context.runOnContext(new Handler() { + @Override + public void handle(Void event) { + VirtualThreadsRecorder.getCurrent().execute(new Runnable() { + @Override + public void run() { + try { + trigger.invoker + .invoke(new QuartzScheduledExecution(trigger, jobExecutionContext)); + } catch (Exception ignored) { + // already logged by the StatusEmitterInvoker + } + } + }); } - } - }, false); + }); + } else { + context.executeBlocking(new Handler>() { + @Override + public void handle(Promise p) { + try { + trigger.invoker.invoke(new QuartzScheduledExecution(trigger, jobExecutionContext)); + p.complete(); + } catch (Exception e) { + p.tryFail(e); + } + } + }, false); + } } } else { Context context = VertxContext.getOrCreateDuplicatedContext(vertx); diff --git a/extensions/scheduler/api/src/main/java/io/quarkus/scheduler/Scheduler.java b/extensions/scheduler/api/src/main/java/io/quarkus/scheduler/Scheduler.java index 861123f297b08..86a506b94d108 100644 --- a/extensions/scheduler/api/src/main/java/io/quarkus/scheduler/Scheduler.java +++ b/extensions/scheduler/api/src/main/java/io/quarkus/scheduler/Scheduler.java @@ -168,7 +168,18 @@ interface JobDefinition { * @param task * @return self */ - JobDefinition setTask(Consumer task); + default JobDefinition setTask(Consumer task) { + return setTask(task, false); + } + + /** + * Configures the task to schedule. + * + * @param task the task, must not be {@code null} + * @param runOnVirtualThread whether the task must be run on a virtual thread if the JVM allows it. + * @return self the current job definition + */ + JobDefinition setTask(Consumer task, boolean runOnVirtualThread); /** * diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/AbstractJobDefinition.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/AbstractJobDefinition.java index 257288d5d649e..5b192d6c4305b 100644 --- a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/AbstractJobDefinition.java +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/AbstractJobDefinition.java @@ -23,6 +23,7 @@ public abstract class AbstractJobDefinition implements JobDefinition { protected Function> asyncTask; protected boolean scheduled = false; protected String timeZone = Scheduled.DEFAULT_TIMEZONE; + protected boolean runOnVirtualThread; public AbstractJobDefinition(String identity) { this.identity = identity; @@ -78,12 +79,13 @@ public JobDefinition setTimeZone(String timeZone) { } @Override - public JobDefinition setTask(Consumer task) { + public JobDefinition setTask(Consumer task, boolean runOnVirtualThread) { checkScheduled(); if (asyncTask != null) { throw new IllegalStateException("Async task was already set"); } this.task = task; + this.runOnVirtualThread = runOnVirtualThread; return this; } diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/DelegateInvoker.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/DelegateInvoker.java index 36ca1d9c66fbd..ff7571ab10352 100644 --- a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/DelegateInvoker.java +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/DelegateInvoker.java @@ -13,4 +13,8 @@ public boolean isBlocking() { return delegate.isBlocking(); } + @Override + public boolean isRunningOnVirtualThread() { + return delegate.isRunningOnVirtualThread(); + } } diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/ScheduledInvoker.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/ScheduledInvoker.java index 16e9cee3d5e71..a7f1f6a80e702 100644 --- a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/ScheduledInvoker.java +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/ScheduledInvoker.java @@ -10,7 +10,6 @@ public interface ScheduledInvoker { /** - * * @param execution * @return the result * @throws Exception @@ -27,4 +26,14 @@ default boolean isBlocking() { return true; } + /** + * Indicates that the invoker used the virtual thread executor to execute the tasks. + * Note that the method must use a synchronous signature. + * + * @return {@code true} if the scheduled method runs on a virtual thread. + */ + default boolean isRunningOnVirtualThread() { + return false; + } + } diff --git a/extensions/scheduler/deployment/pom.xml b/extensions/scheduler/deployment/pom.xml index 18123b3c006d6..8271a16616d4a 100644 --- a/extensions/scheduler/deployment/pom.xml +++ b/extensions/scheduler/deployment/pom.xml @@ -18,6 +18,10 @@ io.quarkus quarkus-arc-deployment + + io.quarkus + quarkus-virtual-threads-deployment + io.quarkus quarkus-vertx-deployment diff --git a/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/ScheduledBusinessMethodItem.java b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/ScheduledBusinessMethodItem.java index b303fc18369e0..65f34de4c12db 100644 --- a/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/ScheduledBusinessMethodItem.java +++ b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/ScheduledBusinessMethodItem.java @@ -14,18 +14,20 @@ public final class ScheduledBusinessMethodItem extends MultiBuildItem { private final List schedules; private final MethodInfo method; private final boolean nonBlocking; + private final boolean runOnVirtualThread; public ScheduledBusinessMethodItem(BeanInfo bean, MethodInfo method, List schedules) { - this(bean, method, schedules, false); + this(bean, method, schedules, false, false); } public ScheduledBusinessMethodItem(BeanInfo bean, MethodInfo method, List schedules, - boolean hasNonBlockingAnnotation) { + boolean hasNonBlockingAnnotation, boolean hasRunOnVirtualThreadAnnotation) { this.bean = bean; this.method = method; this.schedules = schedules; this.nonBlocking = hasNonBlockingAnnotation || SchedulerDotNames.COMPLETION_STAGE.equals(method.returnType().name()) || SchedulerDotNames.UNI.equals(method.returnType().name()) || KotlinUtil.isSuspendMethod(method); + this.runOnVirtualThread = hasRunOnVirtualThreadAnnotation; } /** @@ -48,6 +50,10 @@ public boolean isNonBlocking() { return nonBlocking; } + public boolean isRunOnVirtualThread() { + return runOnVirtualThread; + } + public String getMethodDescription() { return method.declaringClass().name() + "#" + method.name() + "()"; } diff --git a/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerDotNames.java b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerDotNames.java index 639d1247c25fa..c1fedd5f1cb37 100644 --- a/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerDotNames.java +++ b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerDotNames.java @@ -6,6 +6,7 @@ import io.quarkus.scheduler.Scheduled; import io.smallrye.common.annotation.NonBlocking; +import io.smallrye.common.annotation.RunOnVirtualThread; class SchedulerDotNames { @@ -23,4 +24,6 @@ class SchedulerDotNames { static final DotName ABSTRACT_COROUTINE_INVOKER = DotName .createSimple("io.quarkus.scheduler.kotlin.runtime.AbstractCoroutineInvoker"); + static final DotName RUN_ON_VIRTUAL_THREAD = DotName.createSimple(RunOnVirtualThread.class); + } diff --git a/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java index 068f4b21748dc..60196c55e4190 100644 --- a/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java +++ b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java @@ -136,7 +136,8 @@ void collectScheduledMethods(BeanArchiveIndexBuildItem beanArchives, BeanDiscove MethodInfo method = annotationInstance.target().asMethod(); if (Modifier.isStatic(method.flags()) && !KotlinUtil.isSuspendMethod(method)) { scheduledBusinessMethods.produce(new ScheduledBusinessMethodItem(null, method, schedules, - transformedAnnotations.hasAnnotation(method, SchedulerDotNames.NON_BLOCKING))); + transformedAnnotations.hasAnnotation(method, SchedulerDotNames.NON_BLOCKING), + transformedAnnotations.hasAnnotation(method, SchedulerDotNames.RUN_ON_VIRTUAL_THREAD))); LOGGER.debugf("Found scheduled static method %s declared on %s", method, method.declaringClass().name()); } } @@ -176,7 +177,8 @@ private void collectScheduledMethods(IndexView index, TransformedAnnotationsBuil } if (schedules != null) { scheduledBusinessMethods.produce(new ScheduledBusinessMethodItem(bean, method, schedules, - transformedAnnotations.hasAnnotation(method, SchedulerDotNames.NON_BLOCKING))); + transformedAnnotations.hasAnnotation(method, SchedulerDotNames.NON_BLOCKING), + transformedAnnotations.hasAnnotation(method, SchedulerDotNames.RUN_ON_VIRTUAL_THREAD))); LOGGER.debugf("Found scheduled business method %s declared on %s", method, bean); } } @@ -207,6 +209,11 @@ void validateScheduledBusinessMethods(SchedulerConfig config, Listio.quarkus quarkus-scheduler-kotlin + + io.quarkus + quarkus-virtual-threads + io.quarkus quarkus-arc diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java index 3ab9cff4e6294..eca7095ac87f6 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java @@ -64,6 +64,7 @@ import io.quarkus.scheduler.common.runtime.util.SchedulerUtils; import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig.StartMode; import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle; +import io.quarkus.virtual.threads.VirtualThreadsRecorder; import io.smallrye.common.vertx.VertxContext; import io.vertx.core.Context; import io.vertx.core.Handler; @@ -390,16 +391,32 @@ void execute(ZonedDateTime now, Vertx vertx) { Context context = VertxContext.getOrCreateDuplicatedContext(vertx); VertxContextSafetyToggle.setContextSafe(context, true); if (invoker.isBlocking()) { - context.executeBlocking(new Handler>() { - @Override - public void handle(Promise p) { - try { - doInvoke(now, scheduledFireTime); - } finally { - p.complete(); + if (invoker.isRunningOnVirtualThread()) { + // While counter-intuitive, we switch to a safe context, so that context is captured and attached + // to the virtual thread. + context.runOnContext(new Handler() { + @Override + public void handle(Void event) { + VirtualThreadsRecorder.getCurrent().execute(new Runnable() { + @Override + public void run() { + doInvoke(now, scheduledFireTime); + } + }); } - } - }, false); + }); + } else { + context.executeBlocking(new Handler>() { + @Override + public void handle(Promise p) { + try { + doInvoke(now, scheduledFireTime); + } finally { + p.complete(); + } + } + }, false); + } } else { context.runOnContext(new Handler() { @Override @@ -639,6 +656,11 @@ public CompletionStage invokeBean(ScheduledExecution execution) { return CompletableFuture.failedStage(e); } } + + @Override + public boolean isRunningOnVirtualThread() { + return runOnVirtualThread; + } }; } else { invoker = new DefaultInvoker() { diff --git a/integration-tests/virtual-threads/pom.xml b/integration-tests/virtual-threads/pom.xml index 43d45f5191b9b..3b56ac4f95d08 100644 --- a/integration-tests/virtual-threads/pom.xml +++ b/integration-tests/virtual-threads/pom.xml @@ -33,6 +33,8 @@ amqp-virtual-threads jms-virtual-threads vertx-event-bus-virtual-threads + scheduler-virtual-threads + quartz-virtual-threads diff --git a/integration-tests/virtual-threads/quartz-virtual-threads/pom.xml b/integration-tests/virtual-threads/quartz-virtual-threads/pom.xml new file mode 100644 index 0000000000000..ba61edfaeb22b --- /dev/null +++ b/integration-tests/virtual-threads/quartz-virtual-threads/pom.xml @@ -0,0 +1,89 @@ + + + 4.0.0 + + + quarkus-virtual-threads-integration-tests-parent + io.quarkus + 999-SNAPSHOT + + + quarkus-integration-test-virtual-threads-quartz + Quarkus - Integration Tests - Virtual Threads - Quartz Scheduler + + + + io.quarkus + quarkus-resteasy-reactive-jackson + + + io.quarkus + quarkus-quartz + + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + org.awaitility + awaitility + test + + + org.assertj + assertj-core + test + + + + + io.quarkus + quarkus-resteasy-reactive-jackson-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-quartz-deployment + ${project.version} + pom + test + + + * + * + + + + + + + + + io.quarkus + quarkus-maven-plugin + + + org.apache.maven.plugins + maven-surefire-plugin + + + + + diff --git a/integration-tests/virtual-threads/quartz-virtual-threads/src/main/java/io/quarkus/virtual/scheduler/AssertHelper.java b/integration-tests/virtual-threads/quartz-virtual-threads/src/main/java/io/quarkus/virtual/scheduler/AssertHelper.java new file mode 100644 index 0000000000000..c967414c5b3f8 --- /dev/null +++ b/integration-tests/virtual-threads/quartz-virtual-threads/src/main/java/io/quarkus/virtual/scheduler/AssertHelper.java @@ -0,0 +1,71 @@ +package io.quarkus.virtual.scheduler; + +import java.lang.reflect.Method; + +import io.quarkus.arc.Arc; +import io.smallrye.common.vertx.VertxContext; +import io.vertx.core.Vertx; + +public class AssertHelper { + + /** + * Asserts that the current method: + * - runs on a duplicated context + * - runs on a virtual thread + * - has the request scope activated + */ + public static void assertEverything() { + assertThatTheRequestScopeIsActive(); + assertThatItRunsOnVirtualThread(); + assertThatItRunsOnADuplicatedContext(); + } + + public static void assertThatTheRequestScopeIsActive() { + if (!Arc.container().requestContext().isActive()) { + throw new AssertionError(("Expected the request scope to be active")); + } + } + + public static void assertThatItRunsOnADuplicatedContext() { + var context = Vertx.currentContext(); + if (context == null) { + throw new AssertionError("The method does not run on a Vert.x context"); + } + if (!VertxContext.isOnDuplicatedContext()) { + throw new AssertionError("The method does not run on a Vert.x **duplicated** context"); + } + } + + public static void assertThatItRunsOnVirtualThread() { + // We cannot depend on a Java 20. + try { + Method isVirtual = Thread.class.getMethod("isVirtual"); + isVirtual.setAccessible(true); + boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread()); + if (!virtual) { + throw new AssertionError("Thread " + Thread.currentThread() + " is not a virtual thread"); + } + } catch (Exception e) { + throw new AssertionError( + "Thread " + Thread.currentThread() + " is not a virtual thread - cannot invoke Thread.isVirtual()", e); + } + } + + public static void assertNotOnVirtualThread() { + // We cannot depend on a Java 20. + try { + Method isVirtual = Thread.class.getMethod("isVirtual"); + isVirtual.setAccessible(true); + boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread()); + if (virtual) { + throw new AssertionError("Thread " + Thread.currentThread() + " is a virtual thread"); + } + } catch (Exception e) { + // Trying using Thread name. + var name = Thread.currentThread().toString(); + if (name.toLowerCase().contains("virtual")) { + throw new AssertionError("Thread " + Thread.currentThread() + " seems to be a virtual thread"); + } + } + } +} diff --git a/integration-tests/virtual-threads/quartz-virtual-threads/src/main/java/io/quarkus/virtual/scheduler/ScheduledResource.java b/integration-tests/virtual-threads/quartz-virtual-threads/src/main/java/io/quarkus/virtual/scheduler/ScheduledResource.java new file mode 100644 index 0000000000000..0b4588c16dbfb --- /dev/null +++ b/integration-tests/virtual-threads/quartz-virtual-threads/src/main/java/io/quarkus/virtual/scheduler/ScheduledResource.java @@ -0,0 +1,50 @@ +package io.quarkus.virtual.scheduler; + +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +import jakarta.enterprise.event.Observes; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; + +import io.quarkus.runtime.StartupEvent; +import io.quarkus.scheduler.Scheduled; +import io.quarkus.scheduler.Scheduler; +import io.smallrye.common.annotation.RunOnVirtualThread; + +@Path("/") +public class ScheduledResource { + + Set executions = new CopyOnWriteArraySet<>(); + Set programmaticExecutions = new CopyOnWriteArraySet<>(); + + public void init(@Observes StartupEvent ev, Scheduler scheduler) { + scheduler.newJob("my-programmatic-job") + .setInterval("1s") + .setTask(ex -> { + AssertHelper.assertEverything(); + // Quarkus specific - each VT has a unique name + programmaticExecutions.add(Thread.currentThread().getName()); + }, true) + .schedule(); + } + + @Scheduled(every = "1s") + @RunOnVirtualThread + void run() { + AssertHelper.assertEverything(); + // Quarkus specific - each VT has a unique name + executions.add(Thread.currentThread().getName()); + } + + @GET + public Set getExecutions() { + return executions; + } + + @GET + @Path("/programmatic") + public Set getProgrammaticExecutions() { + return programmaticExecutions; + } +} diff --git a/integration-tests/virtual-threads/quartz-virtual-threads/src/main/resources/application.properties b/integration-tests/virtual-threads/quartz-virtual-threads/src/main/resources/application.properties new file mode 100644 index 0000000000000..43b1e230c2184 --- /dev/null +++ b/integration-tests/virtual-threads/quartz-virtual-threads/src/main/resources/application.properties @@ -0,0 +1,3 @@ +quarkus.native.additional-build-args=--enable-preview + +quarkus.package.quiltflower.enabled=true \ No newline at end of file diff --git a/integration-tests/virtual-threads/quartz-virtual-threads/src/test/java/io/quarkus/virtual/mail/NoPinningVerify.java b/integration-tests/virtual-threads/quartz-virtual-threads/src/test/java/io/quarkus/virtual/mail/NoPinningVerify.java new file mode 100644 index 0000000000000..99ce0563fdbb4 --- /dev/null +++ b/integration-tests/virtual-threads/quartz-virtual-threads/src/test/java/io/quarkus/virtual/mail/NoPinningVerify.java @@ -0,0 +1,76 @@ +package io.quarkus.virtual.mail; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.w3c.dom.Document; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.xml.sax.SAXException; + +/** + * An integration test reading the output of the unit test to verify that no tests where pinning the carrier thread. + * It reads the reports generated by surefire. + */ +public class NoPinningVerify { + + @Test + void verify() throws IOException, ParserConfigurationException, SAXException { + var reports = new File("target", "surefire-reports"); + Assertions.assertTrue(reports.isDirectory(), + "Unable to find " + reports.getAbsolutePath() + ", did you run the tests with Maven before?"); + var list = reports.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.startsWith("TEST") && name.endsWith("Test.xml"); + } + }); + Assertions.assertNotNull(list, + "Unable to find " + reports.getAbsolutePath() + ", did you run the tests with Maven before?"); + + for (File report : list) { + Document document = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(report); + var suite = document.getFirstChild(); + var cases = getChildren(suite.getChildNodes(), "testcase"); + for (Node c : cases) { + verify(report, c); + } + } + + } + + private void verify(File file, Node ca) { + var fullname = ca.getAttributes().getNamedItem("classname").getTextContent() + "." + + ca.getAttributes().getNamedItem("name").getTextContent(); + var output = getChildren(ca.getChildNodes(), "system-out"); + if (output.isEmpty()) { + return; + } + var sout = output.get(0).getTextContent(); + if (sout.contains("VThreadContinuation.onPinned")) { + throw new AssertionError("The test case " + fullname + " pinned the carrier thread, check " + file.getAbsolutePath() + + " for details (or the log of the test)"); + } + + } + + private List getChildren(NodeList nodes, String name) { + List list = new ArrayList<>(); + for (int i = 0; i < nodes.getLength(); i++) { + var node = nodes.item(i); + if (node.getNodeName().equalsIgnoreCase(name)) { + list.add(node); + } + } + return list; + } + +} diff --git a/integration-tests/virtual-threads/quartz-virtual-threads/src/test/java/io/quarkus/virtual/mail/RunOnVirtualThreadIT.java b/integration-tests/virtual-threads/quartz-virtual-threads/src/test/java/io/quarkus/virtual/mail/RunOnVirtualThreadIT.java new file mode 100644 index 0000000000000..22abcdce9792e --- /dev/null +++ b/integration-tests/virtual-threads/quartz-virtual-threads/src/test/java/io/quarkus/virtual/mail/RunOnVirtualThreadIT.java @@ -0,0 +1,8 @@ +package io.quarkus.virtual.mail; + +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +class RunOnVirtualThreadIT extends RunOnVirtualThreadTest { + +} diff --git a/integration-tests/virtual-threads/quartz-virtual-threads/src/test/java/io/quarkus/virtual/mail/RunOnVirtualThreadTest.java b/integration-tests/virtual-threads/quartz-virtual-threads/src/test/java/io/quarkus/virtual/mail/RunOnVirtualThreadTest.java new file mode 100644 index 0000000000000..7f2bec0ae9ba7 --- /dev/null +++ b/integration-tests/virtual-threads/quartz-virtual-threads/src/test/java/io/quarkus/virtual/mail/RunOnVirtualThreadTest.java @@ -0,0 +1,43 @@ +package io.quarkus.virtual.mail; + +import java.time.Duration; +import java.util.List; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.RestAssured; +import io.restassured.common.mapper.TypeRef; + +@QuarkusTest +class RunOnVirtualThreadTest { + + @Test + void testScheduledMethods() { + Awaitility.await() + .pollDelay(Duration.ofSeconds(2)) + .untilAsserted(() -> { + var list = RestAssured.get().then() + .assertThat().statusCode(200) + .extract().as(new TypeRef>() { + }); + Assertions.assertTrue(list.size() > 3); + }); + } + + @Test + void testScheduledMethodsUsingApi() { + Awaitility.await() + .pollDelay(Duration.ofSeconds(2)) + .untilAsserted(() -> { + var list = RestAssured.get("/programmatic").then() + .assertThat().statusCode(200) + .extract().as(new TypeRef>() { + }); + Assertions.assertTrue(list.size() > 3); + }); + } + +} diff --git a/integration-tests/virtual-threads/scheduler-virtual-threads/pom.xml b/integration-tests/virtual-threads/scheduler-virtual-threads/pom.xml new file mode 100644 index 0000000000000..0811268cd06e2 --- /dev/null +++ b/integration-tests/virtual-threads/scheduler-virtual-threads/pom.xml @@ -0,0 +1,89 @@ + + + 4.0.0 + + + quarkus-virtual-threads-integration-tests-parent + io.quarkus + 999-SNAPSHOT + + + quarkus-integration-test-virtual-threads-scheduler + Quarkus - Integration Tests - Virtual Threads - Scheduler + + + + io.quarkus + quarkus-resteasy-reactive-jackson + + + io.quarkus + quarkus-scheduler + + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + org.awaitility + awaitility + test + + + org.assertj + assertj-core + test + + + + + io.quarkus + quarkus-resteasy-reactive-jackson-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-scheduler-deployment + ${project.version} + pom + test + + + * + * + + + + + + + + + io.quarkus + quarkus-maven-plugin + + + org.apache.maven.plugins + maven-surefire-plugin + + + + + diff --git a/integration-tests/virtual-threads/scheduler-virtual-threads/src/main/java/io/quarkus/virtual/scheduler/AssertHelper.java b/integration-tests/virtual-threads/scheduler-virtual-threads/src/main/java/io/quarkus/virtual/scheduler/AssertHelper.java new file mode 100644 index 0000000000000..c967414c5b3f8 --- /dev/null +++ b/integration-tests/virtual-threads/scheduler-virtual-threads/src/main/java/io/quarkus/virtual/scheduler/AssertHelper.java @@ -0,0 +1,71 @@ +package io.quarkus.virtual.scheduler; + +import java.lang.reflect.Method; + +import io.quarkus.arc.Arc; +import io.smallrye.common.vertx.VertxContext; +import io.vertx.core.Vertx; + +public class AssertHelper { + + /** + * Asserts that the current method: + * - runs on a duplicated context + * - runs on a virtual thread + * - has the request scope activated + */ + public static void assertEverything() { + assertThatTheRequestScopeIsActive(); + assertThatItRunsOnVirtualThread(); + assertThatItRunsOnADuplicatedContext(); + } + + public static void assertThatTheRequestScopeIsActive() { + if (!Arc.container().requestContext().isActive()) { + throw new AssertionError(("Expected the request scope to be active")); + } + } + + public static void assertThatItRunsOnADuplicatedContext() { + var context = Vertx.currentContext(); + if (context == null) { + throw new AssertionError("The method does not run on a Vert.x context"); + } + if (!VertxContext.isOnDuplicatedContext()) { + throw new AssertionError("The method does not run on a Vert.x **duplicated** context"); + } + } + + public static void assertThatItRunsOnVirtualThread() { + // We cannot depend on a Java 20. + try { + Method isVirtual = Thread.class.getMethod("isVirtual"); + isVirtual.setAccessible(true); + boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread()); + if (!virtual) { + throw new AssertionError("Thread " + Thread.currentThread() + " is not a virtual thread"); + } + } catch (Exception e) { + throw new AssertionError( + "Thread " + Thread.currentThread() + " is not a virtual thread - cannot invoke Thread.isVirtual()", e); + } + } + + public static void assertNotOnVirtualThread() { + // We cannot depend on a Java 20. + try { + Method isVirtual = Thread.class.getMethod("isVirtual"); + isVirtual.setAccessible(true); + boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread()); + if (virtual) { + throw new AssertionError("Thread " + Thread.currentThread() + " is a virtual thread"); + } + } catch (Exception e) { + // Trying using Thread name. + var name = Thread.currentThread().toString(); + if (name.toLowerCase().contains("virtual")) { + throw new AssertionError("Thread " + Thread.currentThread() + " seems to be a virtual thread"); + } + } + } +} diff --git a/integration-tests/virtual-threads/scheduler-virtual-threads/src/main/java/io/quarkus/virtual/scheduler/ScheduledResource.java b/integration-tests/virtual-threads/scheduler-virtual-threads/src/main/java/io/quarkus/virtual/scheduler/ScheduledResource.java new file mode 100644 index 0000000000000..0b4588c16dbfb --- /dev/null +++ b/integration-tests/virtual-threads/scheduler-virtual-threads/src/main/java/io/quarkus/virtual/scheduler/ScheduledResource.java @@ -0,0 +1,50 @@ +package io.quarkus.virtual.scheduler; + +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +import jakarta.enterprise.event.Observes; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; + +import io.quarkus.runtime.StartupEvent; +import io.quarkus.scheduler.Scheduled; +import io.quarkus.scheduler.Scheduler; +import io.smallrye.common.annotation.RunOnVirtualThread; + +@Path("/") +public class ScheduledResource { + + Set executions = new CopyOnWriteArraySet<>(); + Set programmaticExecutions = new CopyOnWriteArraySet<>(); + + public void init(@Observes StartupEvent ev, Scheduler scheduler) { + scheduler.newJob("my-programmatic-job") + .setInterval("1s") + .setTask(ex -> { + AssertHelper.assertEverything(); + // Quarkus specific - each VT has a unique name + programmaticExecutions.add(Thread.currentThread().getName()); + }, true) + .schedule(); + } + + @Scheduled(every = "1s") + @RunOnVirtualThread + void run() { + AssertHelper.assertEverything(); + // Quarkus specific - each VT has a unique name + executions.add(Thread.currentThread().getName()); + } + + @GET + public Set getExecutions() { + return executions; + } + + @GET + @Path("/programmatic") + public Set getProgrammaticExecutions() { + return programmaticExecutions; + } +} diff --git a/integration-tests/virtual-threads/scheduler-virtual-threads/src/main/resources/application.properties b/integration-tests/virtual-threads/scheduler-virtual-threads/src/main/resources/application.properties new file mode 100644 index 0000000000000..43b1e230c2184 --- /dev/null +++ b/integration-tests/virtual-threads/scheduler-virtual-threads/src/main/resources/application.properties @@ -0,0 +1,3 @@ +quarkus.native.additional-build-args=--enable-preview + +quarkus.package.quiltflower.enabled=true \ No newline at end of file diff --git a/integration-tests/virtual-threads/scheduler-virtual-threads/src/test/java/io/quarkus/virtual/mail/NoPinningVerify.java b/integration-tests/virtual-threads/scheduler-virtual-threads/src/test/java/io/quarkus/virtual/mail/NoPinningVerify.java new file mode 100644 index 0000000000000..99ce0563fdbb4 --- /dev/null +++ b/integration-tests/virtual-threads/scheduler-virtual-threads/src/test/java/io/quarkus/virtual/mail/NoPinningVerify.java @@ -0,0 +1,76 @@ +package io.quarkus.virtual.mail; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.w3c.dom.Document; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.xml.sax.SAXException; + +/** + * An integration test reading the output of the unit test to verify that no tests where pinning the carrier thread. + * It reads the reports generated by surefire. + */ +public class NoPinningVerify { + + @Test + void verify() throws IOException, ParserConfigurationException, SAXException { + var reports = new File("target", "surefire-reports"); + Assertions.assertTrue(reports.isDirectory(), + "Unable to find " + reports.getAbsolutePath() + ", did you run the tests with Maven before?"); + var list = reports.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.startsWith("TEST") && name.endsWith("Test.xml"); + } + }); + Assertions.assertNotNull(list, + "Unable to find " + reports.getAbsolutePath() + ", did you run the tests with Maven before?"); + + for (File report : list) { + Document document = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(report); + var suite = document.getFirstChild(); + var cases = getChildren(suite.getChildNodes(), "testcase"); + for (Node c : cases) { + verify(report, c); + } + } + + } + + private void verify(File file, Node ca) { + var fullname = ca.getAttributes().getNamedItem("classname").getTextContent() + "." + + ca.getAttributes().getNamedItem("name").getTextContent(); + var output = getChildren(ca.getChildNodes(), "system-out"); + if (output.isEmpty()) { + return; + } + var sout = output.get(0).getTextContent(); + if (sout.contains("VThreadContinuation.onPinned")) { + throw new AssertionError("The test case " + fullname + " pinned the carrier thread, check " + file.getAbsolutePath() + + " for details (or the log of the test)"); + } + + } + + private List getChildren(NodeList nodes, String name) { + List list = new ArrayList<>(); + for (int i = 0; i < nodes.getLength(); i++) { + var node = nodes.item(i); + if (node.getNodeName().equalsIgnoreCase(name)) { + list.add(node); + } + } + return list; + } + +} diff --git a/integration-tests/virtual-threads/scheduler-virtual-threads/src/test/java/io/quarkus/virtual/mail/RunOnVirtualThreadIT.java b/integration-tests/virtual-threads/scheduler-virtual-threads/src/test/java/io/quarkus/virtual/mail/RunOnVirtualThreadIT.java new file mode 100644 index 0000000000000..22abcdce9792e --- /dev/null +++ b/integration-tests/virtual-threads/scheduler-virtual-threads/src/test/java/io/quarkus/virtual/mail/RunOnVirtualThreadIT.java @@ -0,0 +1,8 @@ +package io.quarkus.virtual.mail; + +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +class RunOnVirtualThreadIT extends RunOnVirtualThreadTest { + +} diff --git a/integration-tests/virtual-threads/scheduler-virtual-threads/src/test/java/io/quarkus/virtual/mail/RunOnVirtualThreadTest.java b/integration-tests/virtual-threads/scheduler-virtual-threads/src/test/java/io/quarkus/virtual/mail/RunOnVirtualThreadTest.java new file mode 100644 index 0000000000000..7f2bec0ae9ba7 --- /dev/null +++ b/integration-tests/virtual-threads/scheduler-virtual-threads/src/test/java/io/quarkus/virtual/mail/RunOnVirtualThreadTest.java @@ -0,0 +1,43 @@ +package io.quarkus.virtual.mail; + +import java.time.Duration; +import java.util.List; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.RestAssured; +import io.restassured.common.mapper.TypeRef; + +@QuarkusTest +class RunOnVirtualThreadTest { + + @Test + void testScheduledMethods() { + Awaitility.await() + .pollDelay(Duration.ofSeconds(2)) + .untilAsserted(() -> { + var list = RestAssured.get().then() + .assertThat().statusCode(200) + .extract().as(new TypeRef>() { + }); + Assertions.assertTrue(list.size() > 3); + }); + } + + @Test + void testScheduledMethodsUsingApi() { + Awaitility.await() + .pollDelay(Duration.ofSeconds(2)) + .untilAsserted(() -> { + var list = RestAssured.get("/programmatic").then() + .assertThat().statusCode(200) + .extract().as(new TypeRef>() { + }); + Assertions.assertTrue(list.size() > 3); + }); + } + +}