Skip to content

Commit

Permalink
Integrate @RunOnVirtualThread with the Quarkus scheduler (quarkus-sch…
Browse files Browse the repository at this point in the history
…eduler and Quartz)

Allows @scheduled methods to run on a virtual thread.
Also extend the programmatic API to allow defining jobs that will run on virtual thread if the JVM offers this possibility.
cescoffier committed Sep 1, 2023

Verified

This commit was signed with the committer’s verified signature.
cescoffier Clement Escoffier
1 parent f034dd1 commit c951629
Showing 29 changed files with 836 additions and 28 deletions.
4 changes: 2 additions & 2 deletions .github/virtual-threads-tests.json
Original file line number Diff line number Diff line change
@@ -2,8 +2,8 @@
"include": [
{
"category": "Main",
"timeout": 45,
"test-modules": "grpc-virtual-threads, mailer-virtual-threads, redis-virtual-threads, rest-client-reactive-virtual-threads, resteasy-reactive-virtual-threads, vertx-event-bus-virtual-threads",
"timeout": 50,
"test-modules": "grpc-virtual-threads, mailer-virtual-threads, redis-virtual-threads, rest-client-reactive-virtual-threads, resteasy-reactive-virtual-threads, vertx-event-bus-virtual-threads, scheduler-virtual-threads, quartz-virtual-threads",
"os-name": "ubuntu-latest"
},
{
12 changes: 12 additions & 0 deletions docs/src/main/asciidoc/quartz.adoc
Original file line number Diff line number Diff line change
@@ -464,6 +464,18 @@ public class MyListenerManager {
}
----

[[virtual-threads]]
== Run scheduled methods on virtual threads

Methods annotated with `@Scheduled` can also be annotated with `@RunOnVirtualThread`.
In this case, the method is invoked on a virtual thread.

The method must return `void` and your Java runtime must provide support for virtual threads.
Read xref:./virtual-threads.adoc[the virtual thread guide] for more details.

WARNING: This feature cannot be combined with the `run-blocking-method-on-quartz-thread` option.
If `run-blocking-method-on-quartz-thread` is set, the scheduled method runs on a (platform) thread managed by Quartz.

[[quartz-configuration-reference]]
== Quartz Configuration Reference

8 changes: 8 additions & 0 deletions docs/src/main/asciidoc/scheduler-reference.adoc
Original file line number Diff line number Diff line change
@@ -417,6 +417,14 @@ If the xref:smallrye-metrics.adoc[SmallRye Metrics extension] is present, then a

If `quarkus.scheduler.tracing.enabled` is set to `true` and the xref:opentelemetry.adoc[OpenTelemetry extension] is present then the `@io.opentelemetry.instrumentation.annotations.WithSpan` annotation is added automatically to every `@Scheduled` method. As a result, each execution of this method has a new `io.opentelemetry.api.trace.Span` associated.

== Run @Scheduled methods on virtual threads

Methods annotated with `@Scheduled` can also be annotated with `@RunOnVirtualThread`.
In this case, the method is invoked on a virtual thread.

The method must return `void` and your Java runtime must provide support for virtual threads.
Read xref:./virtual-threads.adoc[the virtual thread guide] for more details.

== Configuration Reference

include::{generated-dir}/config/quarkus-scheduler.adoc[leveloffset=+1, opts=optional]
Original file line number Diff line number Diff line change
@@ -88,6 +88,7 @@
import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig.StartMode;
import io.quarkus.scheduler.runtime.SimpleScheduler;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.quarkus.virtual.threads.VirtualThreadsRecorder;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Context;
import io.vertx.core.Handler;
@@ -782,6 +783,11 @@ public CompletionStage<Void> invokeBean(ScheduledExecution execution) {
return CompletableFuture.failedStage(e);
}
}

@Override
public boolean isRunningOnVirtualThread() {
return runOnVirtualThread;
}
};
} else {
invoker = new DefaultInvoker() {
@@ -868,17 +874,38 @@ public void execute(JobExecutionContext jobExecutionContext) throws JobExecution
} else {
Context context = VertxContext.getOrCreateDuplicatedContext(vertx);
VertxContextSafetyToggle.setContextSafe(context, true);
context.executeBlocking(new Handler<Promise<Object>>() {
@Override
public void handle(Promise<Object> p) {
try {
trigger.invoker.invoke(new QuartzScheduledExecution(trigger, jobExecutionContext));
p.complete();
} catch (Exception e) {
p.tryFail(e);
if (trigger.invoker.isRunningOnVirtualThread()) {
// While counter-intuitive, we switch to a safe context, so that context is captured and attached
// to the virtual thread.
context.runOnContext(new Handler<Void>() {
@Override
public void handle(Void event) {
VirtualThreadsRecorder.getCurrent().execute(new Runnable() {
@Override
public void run() {
try {
trigger.invoker
.invoke(new QuartzScheduledExecution(trigger, jobExecutionContext));
} catch (Exception ignored) {
// already logged by the StatusEmitterInvoker
}
}
});
}
}
}, false);
});
} else {
context.executeBlocking(new Handler<Promise<Object>>() {
@Override
public void handle(Promise<Object> p) {
try {
trigger.invoker.invoke(new QuartzScheduledExecution(trigger, jobExecutionContext));
p.complete();
} catch (Exception e) {
p.tryFail(e);
}
}
}, false);
}
}
} else {
Context context = VertxContext.getOrCreateDuplicatedContext(vertx);
Original file line number Diff line number Diff line change
@@ -168,7 +168,18 @@ interface JobDefinition {
* @param task
* @return self
*/
JobDefinition setTask(Consumer<ScheduledExecution> task);
default JobDefinition setTask(Consumer<ScheduledExecution> task) {
return setTask(task, false);
}

/**
* Configures the task to schedule.
*
* @param task the task, must not be {@code null}
* @param runOnVirtualThread whether the task must be run on a virtual thread if the JVM allows it.
* @return self the current job definition
*/
JobDefinition setTask(Consumer<ScheduledExecution> task, boolean runOnVirtualThread);

/**
*
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ public abstract class AbstractJobDefinition implements JobDefinition {
protected Function<ScheduledExecution, Uni<Void>> asyncTask;
protected boolean scheduled = false;
protected String timeZone = Scheduled.DEFAULT_TIMEZONE;
protected boolean runOnVirtualThread;

public AbstractJobDefinition(String identity) {
this.identity = identity;
@@ -78,12 +79,13 @@ public JobDefinition setTimeZone(String timeZone) {
}

@Override
public JobDefinition setTask(Consumer<ScheduledExecution> task) {
public JobDefinition setTask(Consumer<ScheduledExecution> task, boolean runOnVirtualThread) {
checkScheduled();
if (asyncTask != null) {
throw new IllegalStateException("Async task was already set");
}
this.task = task;
this.runOnVirtualThread = runOnVirtualThread;
return this;
}

Original file line number Diff line number Diff line change
@@ -13,4 +13,8 @@ public boolean isBlocking() {
return delegate.isBlocking();
}

@Override
public boolean isRunningOnVirtualThread() {
return delegate.isRunningOnVirtualThread();
}
}
Original file line number Diff line number Diff line change
@@ -10,7 +10,6 @@
public interface ScheduledInvoker {

/**
*
* @param execution
* @return the result
* @throws Exception
@@ -27,4 +26,14 @@ default boolean isBlocking() {
return true;
}

/**
* Indicates that the invoker used the virtual thread executor to execute the tasks.
* Note that the method must use a synchronous signature.
*
* @return {@code true} if the scheduled method runs on a virtual thread.
*/
default boolean isRunningOnVirtualThread() {
return false;
}

}
4 changes: 4 additions & 0 deletions extensions/scheduler/deployment/pom.xml
Original file line number Diff line number Diff line change
@@ -18,6 +18,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx-deployment</artifactId>
Original file line number Diff line number Diff line change
@@ -14,18 +14,20 @@ public final class ScheduledBusinessMethodItem extends MultiBuildItem {
private final List<AnnotationInstance> schedules;
private final MethodInfo method;
private final boolean nonBlocking;
private final boolean runOnVirtualThread;

public ScheduledBusinessMethodItem(BeanInfo bean, MethodInfo method, List<AnnotationInstance> schedules) {
this(bean, method, schedules, false);
this(bean, method, schedules, false, false);
}

public ScheduledBusinessMethodItem(BeanInfo bean, MethodInfo method, List<AnnotationInstance> schedules,
boolean hasNonBlockingAnnotation) {
boolean hasNonBlockingAnnotation, boolean hasRunOnVirtualThreadAnnotation) {
this.bean = bean;
this.method = method;
this.schedules = schedules;
this.nonBlocking = hasNonBlockingAnnotation || SchedulerDotNames.COMPLETION_STAGE.equals(method.returnType().name())
|| SchedulerDotNames.UNI.equals(method.returnType().name()) || KotlinUtil.isSuspendMethod(method);
this.runOnVirtualThread = hasRunOnVirtualThreadAnnotation;
}

/**
@@ -48,6 +50,10 @@ public boolean isNonBlocking() {
return nonBlocking;
}

public boolean isRunOnVirtualThread() {
return runOnVirtualThread;
}

public String getMethodDescription() {
return method.declaringClass().name() + "#" + method.name() + "()";
}
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@

import io.quarkus.scheduler.Scheduled;
import io.smallrye.common.annotation.NonBlocking;
import io.smallrye.common.annotation.RunOnVirtualThread;

class SchedulerDotNames {

@@ -23,4 +24,6 @@ class SchedulerDotNames {
static final DotName ABSTRACT_COROUTINE_INVOKER = DotName
.createSimple("io.quarkus.scheduler.kotlin.runtime.AbstractCoroutineInvoker");

static final DotName RUN_ON_VIRTUAL_THREAD = DotName.createSimple(RunOnVirtualThread.class);

}
Original file line number Diff line number Diff line change
@@ -136,7 +136,8 @@ void collectScheduledMethods(BeanArchiveIndexBuildItem beanArchives, BeanDiscove
MethodInfo method = annotationInstance.target().asMethod();
if (Modifier.isStatic(method.flags()) && !KotlinUtil.isSuspendMethod(method)) {
scheduledBusinessMethods.produce(new ScheduledBusinessMethodItem(null, method, schedules,
transformedAnnotations.hasAnnotation(method, SchedulerDotNames.NON_BLOCKING)));
transformedAnnotations.hasAnnotation(method, SchedulerDotNames.NON_BLOCKING),
transformedAnnotations.hasAnnotation(method, SchedulerDotNames.RUN_ON_VIRTUAL_THREAD)));
LOGGER.debugf("Found scheduled static method %s declared on %s", method, method.declaringClass().name());
}
}
@@ -176,7 +177,8 @@ private void collectScheduledMethods(IndexView index, TransformedAnnotationsBuil
}
if (schedules != null) {
scheduledBusinessMethods.produce(new ScheduledBusinessMethodItem(bean, method, schedules,
transformedAnnotations.hasAnnotation(method, SchedulerDotNames.NON_BLOCKING)));
transformedAnnotations.hasAnnotation(method, SchedulerDotNames.NON_BLOCKING),
transformedAnnotations.hasAnnotation(method, SchedulerDotNames.RUN_ON_VIRTUAL_THREAD)));
LOGGER.debugf("Found scheduled business method %s declared on %s", method, bean);
}
}
@@ -207,6 +209,11 @@ void validateScheduledBusinessMethods(SchedulerConfig config, List<ScheduledBusi
continue;
}

if (scheduledMethod.isNonBlocking() && scheduledMethod.isRunOnVirtualThread()) {
errors.add(new IllegalStateException("@Scheduled method cannot be non-blocking and annotated " +
"with @RunOnVirtualThread: " + scheduledMethod.getMethodDescription()));
}

boolean isSuspendMethod = KotlinUtil.isSuspendMethod(method);

// Validate method params and return type
@@ -510,6 +517,13 @@ private String generateInvoker(ScheduledBusinessMethodItem scheduledMethod, Clas
if (scheduledMethod.isNonBlocking()) {
MethodCreator isBlocking = invokerCreator.getMethodCreator("isBlocking", boolean.class);
isBlocking.returnValue(isBlocking.load(false));
isBlocking.close();
}

if (scheduledMethod.isRunOnVirtualThread()) {
MethodCreator isRunOnVirtualThread = invokerCreator.getMethodCreator("isRunningOnVirtualThread", boolean.class);
isRunOnVirtualThread.returnValue(isRunOnVirtualThread.load(true));
isRunOnVirtualThread.close();
}

invokerCreator.close();
4 changes: 4 additions & 0 deletions extensions/scheduler/runtime/pom.xml
Original file line number Diff line number Diff line change
@@ -17,6 +17,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler-kotlin</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
Original file line number Diff line number Diff line change
@@ -64,6 +64,7 @@
import io.quarkus.scheduler.common.runtime.util.SchedulerUtils;
import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig.StartMode;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.quarkus.virtual.threads.VirtualThreadsRecorder;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Context;
import io.vertx.core.Handler;
@@ -390,16 +391,32 @@ void execute(ZonedDateTime now, Vertx vertx) {
Context context = VertxContext.getOrCreateDuplicatedContext(vertx);
VertxContextSafetyToggle.setContextSafe(context, true);
if (invoker.isBlocking()) {
context.executeBlocking(new Handler<Promise<Object>>() {
@Override
public void handle(Promise<Object> p) {
try {
doInvoke(now, scheduledFireTime);
} finally {
p.complete();
if (invoker.isRunningOnVirtualThread()) {
// While counter-intuitive, we switch to a safe context, so that context is captured and attached
// to the virtual thread.
context.runOnContext(new Handler<Void>() {
@Override
public void handle(Void event) {
VirtualThreadsRecorder.getCurrent().execute(new Runnable() {
@Override
public void run() {
doInvoke(now, scheduledFireTime);
}
});
}
}
}, false);
});
} else {
context.executeBlocking(new Handler<Promise<Object>>() {
@Override
public void handle(Promise<Object> p) {
try {
doInvoke(now, scheduledFireTime);
} finally {
p.complete();
}
}
}, false);
}
} else {
context.runOnContext(new Handler<Void>() {
@Override
@@ -639,6 +656,11 @@ public CompletionStage<Void> invokeBean(ScheduledExecution execution) {
return CompletableFuture.failedStage(e);
}
}

@Override
public boolean isRunningOnVirtualThread() {
return runOnVirtualThread;
}
};
} else {
invoker = new DefaultInvoker() {
2 changes: 2 additions & 0 deletions integration-tests/virtual-threads/pom.xml
Original file line number Diff line number Diff line change
@@ -33,6 +33,8 @@
<module>amqp-virtual-threads</module>
<module>jms-virtual-threads</module>
<module>vertx-event-bus-virtual-threads</module>
<module>scheduler-virtual-threads</module>
<module>quartz-virtual-threads</module>
</modules>

<build>
89 changes: 89 additions & 0 deletions integration-tests/virtual-threads/quartz-virtual-threads/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>quarkus-virtual-threads-integration-tests-parent</artifactId>
<groupId>io.quarkus</groupId>
<version>999-SNAPSHOT</version>
</parent>

<artifactId>quarkus-integration-test-virtual-threads-quartz</artifactId>
<name>Quarkus - Integration Tests - Virtual Threads - Quartz Scheduler</name>

<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-quartz</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

<!-- Minimal test dependencies to *-deployment artifacts for consistent build order -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jackson-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-quartz-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.quarkus.virtual.scheduler;

import java.lang.reflect.Method;

import io.quarkus.arc.Arc;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Vertx;

public class AssertHelper {

/**
* Asserts that the current method:
* - runs on a duplicated context
* - runs on a virtual thread
* - has the request scope activated
*/
public static void assertEverything() {
assertThatTheRequestScopeIsActive();
assertThatItRunsOnVirtualThread();
assertThatItRunsOnADuplicatedContext();
}

public static void assertThatTheRequestScopeIsActive() {
if (!Arc.container().requestContext().isActive()) {
throw new AssertionError(("Expected the request scope to be active"));
}
}

public static void assertThatItRunsOnADuplicatedContext() {
var context = Vertx.currentContext();
if (context == null) {
throw new AssertionError("The method does not run on a Vert.x context");
}
if (!VertxContext.isOnDuplicatedContext()) {
throw new AssertionError("The method does not run on a Vert.x **duplicated** context");
}
}

public static void assertThatItRunsOnVirtualThread() {
// We cannot depend on a Java 20.
try {
Method isVirtual = Thread.class.getMethod("isVirtual");
isVirtual.setAccessible(true);
boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread());
if (!virtual) {
throw new AssertionError("Thread " + Thread.currentThread() + " is not a virtual thread");
}
} catch (Exception e) {
throw new AssertionError(
"Thread " + Thread.currentThread() + " is not a virtual thread - cannot invoke Thread.isVirtual()", e);
}
}

public static void assertNotOnVirtualThread() {
// We cannot depend on a Java 20.
try {
Method isVirtual = Thread.class.getMethod("isVirtual");
isVirtual.setAccessible(true);
boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread());
if (virtual) {
throw new AssertionError("Thread " + Thread.currentThread() + " is a virtual thread");
}
} catch (Exception e) {
// Trying using Thread name.
var name = Thread.currentThread().toString();
if (name.toLowerCase().contains("virtual")) {
throw new AssertionError("Thread " + Thread.currentThread() + " seems to be a virtual thread");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.quarkus.virtual.scheduler;

import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

import jakarta.enterprise.event.Observes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;

import io.quarkus.runtime.StartupEvent;
import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.Scheduler;
import io.smallrye.common.annotation.RunOnVirtualThread;

@Path("/")
public class ScheduledResource {

Set<String> executions = new CopyOnWriteArraySet<>();
Set<String> programmaticExecutions = new CopyOnWriteArraySet<>();

public void init(@Observes StartupEvent ev, Scheduler scheduler) {
scheduler.newJob("my-programmatic-job")
.setInterval("1s")
.setTask(ex -> {
AssertHelper.assertEverything();
// Quarkus specific - each VT has a unique name
programmaticExecutions.add(Thread.currentThread().getName());
}, true)
.schedule();
}

@Scheduled(every = "1s")
@RunOnVirtualThread
void run() {
AssertHelper.assertEverything();
// Quarkus specific - each VT has a unique name
executions.add(Thread.currentThread().getName());
}

@GET
public Set<String> getExecutions() {
return executions;
}

@GET
@Path("/programmatic")
public Set<String> getProgrammaticExecutions() {
return programmaticExecutions;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
quarkus.native.additional-build-args=--enable-preview

quarkus.package.quiltflower.enabled=true
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.quarkus.virtual.mail;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;

/**
* An integration test reading the output of the unit test to verify that no tests where pinning the carrier thread.
* It reads the reports generated by surefire.
*/
public class NoPinningVerify {

@Test
void verify() throws IOException, ParserConfigurationException, SAXException {
var reports = new File("target", "surefire-reports");
Assertions.assertTrue(reports.isDirectory(),
"Unable to find " + reports.getAbsolutePath() + ", did you run the tests with Maven before?");
var list = reports.listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.startsWith("TEST") && name.endsWith("Test.xml");
}
});
Assertions.assertNotNull(list,
"Unable to find " + reports.getAbsolutePath() + ", did you run the tests with Maven before?");

for (File report : list) {
Document document = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(report);
var suite = document.getFirstChild();
var cases = getChildren(suite.getChildNodes(), "testcase");
for (Node c : cases) {
verify(report, c);
}
}

}

private void verify(File file, Node ca) {
var fullname = ca.getAttributes().getNamedItem("classname").getTextContent() + "."
+ ca.getAttributes().getNamedItem("name").getTextContent();
var output = getChildren(ca.getChildNodes(), "system-out");
if (output.isEmpty()) {
return;
}
var sout = output.get(0).getTextContent();
if (sout.contains("VThreadContinuation.onPinned")) {
throw new AssertionError("The test case " + fullname + " pinned the carrier thread, check " + file.getAbsolutePath()
+ " for details (or the log of the test)");
}

}

private List<Node> getChildren(NodeList nodes, String name) {
List<Node> list = new ArrayList<>();
for (int i = 0; i < nodes.getLength(); i++) {
var node = nodes.item(i);
if (node.getNodeName().equalsIgnoreCase(name)) {
list.add(node);
}
}
return list;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.quarkus.virtual.mail;

import io.quarkus.test.junit.QuarkusIntegrationTest;

@QuarkusIntegrationTest
class RunOnVirtualThreadIT extends RunOnVirtualThreadTest {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.quarkus.virtual.mail;

import java.time.Duration;
import java.util.List;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import io.restassured.common.mapper.TypeRef;

@QuarkusTest
class RunOnVirtualThreadTest {

@Test
void testScheduledMethods() {
Awaitility.await()
.pollDelay(Duration.ofSeconds(2))
.untilAsserted(() -> {
var list = RestAssured.get().then()
.assertThat().statusCode(200)
.extract().as(new TypeRef<List<String>>() {
});
Assertions.assertTrue(list.size() > 3);
});
}

@Test
void testScheduledMethodsUsingApi() {
Awaitility.await()
.pollDelay(Duration.ofSeconds(2))
.untilAsserted(() -> {
var list = RestAssured.get("/programmatic").then()
.assertThat().statusCode(200)
.extract().as(new TypeRef<List<String>>() {
});
Assertions.assertTrue(list.size() > 3);
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>quarkus-virtual-threads-integration-tests-parent</artifactId>
<groupId>io.quarkus</groupId>
<version>999-SNAPSHOT</version>
</parent>

<artifactId>quarkus-integration-test-virtual-threads-scheduler</artifactId>
<name>Quarkus - Integration Tests - Virtual Threads - Scheduler</name>

<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

<!-- Minimal test dependencies to *-deployment artifacts for consistent build order -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jackson-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.quarkus.virtual.scheduler;

import java.lang.reflect.Method;

import io.quarkus.arc.Arc;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Vertx;

public class AssertHelper {

/**
* Asserts that the current method:
* - runs on a duplicated context
* - runs on a virtual thread
* - has the request scope activated
*/
public static void assertEverything() {
assertThatTheRequestScopeIsActive();
assertThatItRunsOnVirtualThread();
assertThatItRunsOnADuplicatedContext();
}

public static void assertThatTheRequestScopeIsActive() {
if (!Arc.container().requestContext().isActive()) {
throw new AssertionError(("Expected the request scope to be active"));
}
}

public static void assertThatItRunsOnADuplicatedContext() {
var context = Vertx.currentContext();
if (context == null) {
throw new AssertionError("The method does not run on a Vert.x context");
}
if (!VertxContext.isOnDuplicatedContext()) {
throw new AssertionError("The method does not run on a Vert.x **duplicated** context");
}
}

public static void assertThatItRunsOnVirtualThread() {
// We cannot depend on a Java 20.
try {
Method isVirtual = Thread.class.getMethod("isVirtual");
isVirtual.setAccessible(true);
boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread());
if (!virtual) {
throw new AssertionError("Thread " + Thread.currentThread() + " is not a virtual thread");
}
} catch (Exception e) {
throw new AssertionError(
"Thread " + Thread.currentThread() + " is not a virtual thread - cannot invoke Thread.isVirtual()", e);
}
}

public static void assertNotOnVirtualThread() {
// We cannot depend on a Java 20.
try {
Method isVirtual = Thread.class.getMethod("isVirtual");
isVirtual.setAccessible(true);
boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread());
if (virtual) {
throw new AssertionError("Thread " + Thread.currentThread() + " is a virtual thread");
}
} catch (Exception e) {
// Trying using Thread name.
var name = Thread.currentThread().toString();
if (name.toLowerCase().contains("virtual")) {
throw new AssertionError("Thread " + Thread.currentThread() + " seems to be a virtual thread");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.quarkus.virtual.scheduler;

import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

import jakarta.enterprise.event.Observes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;

import io.quarkus.runtime.StartupEvent;
import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.Scheduler;
import io.smallrye.common.annotation.RunOnVirtualThread;

@Path("/")
public class ScheduledResource {

Set<String> executions = new CopyOnWriteArraySet<>();
Set<String> programmaticExecutions = new CopyOnWriteArraySet<>();

public void init(@Observes StartupEvent ev, Scheduler scheduler) {
scheduler.newJob("my-programmatic-job")
.setInterval("1s")
.setTask(ex -> {
AssertHelper.assertEverything();
// Quarkus specific - each VT has a unique name
programmaticExecutions.add(Thread.currentThread().getName());
}, true)
.schedule();
}

@Scheduled(every = "1s")
@RunOnVirtualThread
void run() {
AssertHelper.assertEverything();
// Quarkus specific - each VT has a unique name
executions.add(Thread.currentThread().getName());
}

@GET
public Set<String> getExecutions() {
return executions;
}

@GET
@Path("/programmatic")
public Set<String> getProgrammaticExecutions() {
return programmaticExecutions;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
quarkus.native.additional-build-args=--enable-preview

quarkus.package.quiltflower.enabled=true
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.quarkus.virtual.mail;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;

/**
* An integration test reading the output of the unit test to verify that no tests where pinning the carrier thread.
* It reads the reports generated by surefire.
*/
public class NoPinningVerify {

@Test
void verify() throws IOException, ParserConfigurationException, SAXException {
var reports = new File("target", "surefire-reports");
Assertions.assertTrue(reports.isDirectory(),
"Unable to find " + reports.getAbsolutePath() + ", did you run the tests with Maven before?");
var list = reports.listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.startsWith("TEST") && name.endsWith("Test.xml");
}
});
Assertions.assertNotNull(list,
"Unable to find " + reports.getAbsolutePath() + ", did you run the tests with Maven before?");

for (File report : list) {
Document document = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(report);
var suite = document.getFirstChild();
var cases = getChildren(suite.getChildNodes(), "testcase");
for (Node c : cases) {
verify(report, c);
}
}

}

private void verify(File file, Node ca) {
var fullname = ca.getAttributes().getNamedItem("classname").getTextContent() + "."
+ ca.getAttributes().getNamedItem("name").getTextContent();
var output = getChildren(ca.getChildNodes(), "system-out");
if (output.isEmpty()) {
return;
}
var sout = output.get(0).getTextContent();
if (sout.contains("VThreadContinuation.onPinned")) {
throw new AssertionError("The test case " + fullname + " pinned the carrier thread, check " + file.getAbsolutePath()
+ " for details (or the log of the test)");
}

}

private List<Node> getChildren(NodeList nodes, String name) {
List<Node> list = new ArrayList<>();
for (int i = 0; i < nodes.getLength(); i++) {
var node = nodes.item(i);
if (node.getNodeName().equalsIgnoreCase(name)) {
list.add(node);
}
}
return list;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.quarkus.virtual.mail;

import io.quarkus.test.junit.QuarkusIntegrationTest;

@QuarkusIntegrationTest
class RunOnVirtualThreadIT extends RunOnVirtualThreadTest {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.quarkus.virtual.mail;

import java.time.Duration;
import java.util.List;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import io.restassured.common.mapper.TypeRef;

@QuarkusTest
class RunOnVirtualThreadTest {

@Test
void testScheduledMethods() {
Awaitility.await()
.pollDelay(Duration.ofSeconds(2))
.untilAsserted(() -> {
var list = RestAssured.get().then()
.assertThat().statusCode(200)
.extract().as(new TypeRef<List<String>>() {
});
Assertions.assertTrue(list.size() > 3);
});
}

@Test
void testScheduledMethodsUsingApi() {
Awaitility.await()
.pollDelay(Duration.ofSeconds(2))
.untilAsserted(() -> {
var list = RestAssured.get("/programmatic").then()
.assertThat().statusCode(200)
.extract().as(new TypeRef<List<String>>() {
});
Assertions.assertTrue(list.size() > 3);
});
}

}

0 comments on commit c951629

Please sign in to comment.