Skip to content

Commit

Permalink
Merge pull request quarkusio#35539 from cescoffier/vitural-threads-sc…
Browse files Browse the repository at this point in the history
…heduled

Integrate @RunOnVirtualThread with the Quarkus scheduler
  • Loading branch information
mkouba authored Sep 1, 2023
2 parents 632f571 + c951629 commit f2d6cc1
Show file tree
Hide file tree
Showing 29 changed files with 836 additions and 28 deletions.
4 changes: 2 additions & 2 deletions .github/virtual-threads-tests.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
"include": [
{
"category": "Main",
"timeout": 45,
"test-modules": "grpc-virtual-threads, mailer-virtual-threads, redis-virtual-threads, rest-client-reactive-virtual-threads, resteasy-reactive-virtual-threads, vertx-event-bus-virtual-threads",
"timeout": 50,
"test-modules": "grpc-virtual-threads, mailer-virtual-threads, redis-virtual-threads, rest-client-reactive-virtual-threads, resteasy-reactive-virtual-threads, vertx-event-bus-virtual-threads, scheduler-virtual-threads, quartz-virtual-threads",
"os-name": "ubuntu-latest"
},
{
Expand Down
12 changes: 12 additions & 0 deletions docs/src/main/asciidoc/quartz.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,18 @@ public class MyListenerManager {
}
----

[[virtual-threads]]
== Run scheduled methods on virtual threads

Methods annotated with `@Scheduled` can also be annotated with `@RunOnVirtualThread`.
In this case, the method is invoked on a virtual thread.

The method must return `void` and your Java runtime must provide support for virtual threads.
Read xref:./virtual-threads.adoc[the virtual thread guide] for more details.

WARNING: This feature cannot be combined with the `run-blocking-method-on-quartz-thread` option.
If `run-blocking-method-on-quartz-thread` is set, the scheduled method runs on a (platform) thread managed by Quartz.

[[quartz-configuration-reference]]
== Quartz Configuration Reference

Expand Down
8 changes: 8 additions & 0 deletions docs/src/main/asciidoc/scheduler-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,14 @@ If the xref:smallrye-metrics.adoc[SmallRye Metrics extension] is present, then a

If `quarkus.scheduler.tracing.enabled` is set to `true` and the xref:opentelemetry.adoc[OpenTelemetry extension] is present then the `@io.opentelemetry.instrumentation.annotations.WithSpan` annotation is added automatically to every `@Scheduled` method. As a result, each execution of this method has a new `io.opentelemetry.api.trace.Span` associated.

== Run @Scheduled methods on virtual threads

Methods annotated with `@Scheduled` can also be annotated with `@RunOnVirtualThread`.
In this case, the method is invoked on a virtual thread.

The method must return `void` and your Java runtime must provide support for virtual threads.
Read xref:./virtual-threads.adoc[the virtual thread guide] for more details.

== Configuration Reference

include::{generated-dir}/config/quarkus-scheduler.adoc[leveloffset=+1, opts=optional]
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig.StartMode;
import io.quarkus.scheduler.runtime.SimpleScheduler;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.quarkus.virtual.threads.VirtualThreadsRecorder;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Context;
import io.vertx.core.Handler;
Expand Down Expand Up @@ -782,6 +783,11 @@ public CompletionStage<Void> invokeBean(ScheduledExecution execution) {
return CompletableFuture.failedStage(e);
}
}

@Override
public boolean isRunningOnVirtualThread() {
return runOnVirtualThread;
}
};
} else {
invoker = new DefaultInvoker() {
Expand Down Expand Up @@ -868,17 +874,38 @@ public void execute(JobExecutionContext jobExecutionContext) throws JobExecution
} else {
Context context = VertxContext.getOrCreateDuplicatedContext(vertx);
VertxContextSafetyToggle.setContextSafe(context, true);
context.executeBlocking(new Handler<Promise<Object>>() {
@Override
public void handle(Promise<Object> p) {
try {
trigger.invoker.invoke(new QuartzScheduledExecution(trigger, jobExecutionContext));
p.complete();
} catch (Exception e) {
p.tryFail(e);
if (trigger.invoker.isRunningOnVirtualThread()) {
// While counter-intuitive, we switch to a safe context, so that context is captured and attached
// to the virtual thread.
context.runOnContext(new Handler<Void>() {
@Override
public void handle(Void event) {
VirtualThreadsRecorder.getCurrent().execute(new Runnable() {
@Override
public void run() {
try {
trigger.invoker
.invoke(new QuartzScheduledExecution(trigger, jobExecutionContext));
} catch (Exception ignored) {
// already logged by the StatusEmitterInvoker
}
}
});
}
}
}, false);
});
} else {
context.executeBlocking(new Handler<Promise<Object>>() {
@Override
public void handle(Promise<Object> p) {
try {
trigger.invoker.invoke(new QuartzScheduledExecution(trigger, jobExecutionContext));
p.complete();
} catch (Exception e) {
p.tryFail(e);
}
}
}, false);
}
}
} else {
Context context = VertxContext.getOrCreateDuplicatedContext(vertx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,18 @@ interface JobDefinition {
* @param task
* @return self
*/
JobDefinition setTask(Consumer<ScheduledExecution> task);
default JobDefinition setTask(Consumer<ScheduledExecution> task) {
return setTask(task, false);
}

/**
* Configures the task to schedule.
*
* @param task the task, must not be {@code null}
* @param runOnVirtualThread whether the task must be run on a virtual thread if the JVM allows it.
* @return self the current job definition
*/
JobDefinition setTask(Consumer<ScheduledExecution> task, boolean runOnVirtualThread);

/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public abstract class AbstractJobDefinition implements JobDefinition {
protected Function<ScheduledExecution, Uni<Void>> asyncTask;
protected boolean scheduled = false;
protected String timeZone = Scheduled.DEFAULT_TIMEZONE;
protected boolean runOnVirtualThread;

public AbstractJobDefinition(String identity) {
this.identity = identity;
Expand Down Expand Up @@ -78,12 +79,13 @@ public JobDefinition setTimeZone(String timeZone) {
}

@Override
public JobDefinition setTask(Consumer<ScheduledExecution> task) {
public JobDefinition setTask(Consumer<ScheduledExecution> task, boolean runOnVirtualThread) {
checkScheduled();
if (asyncTask != null) {
throw new IllegalStateException("Async task was already set");
}
this.task = task;
this.runOnVirtualThread = runOnVirtualThread;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,8 @@ public boolean isBlocking() {
return delegate.isBlocking();
}

@Override
public boolean isRunningOnVirtualThread() {
return delegate.isRunningOnVirtualThread();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
public interface ScheduledInvoker {

/**
*
* @param execution
* @return the result
* @throws Exception
Expand All @@ -27,4 +26,14 @@ default boolean isBlocking() {
return true;
}

/**
* Indicates that the invoker used the virtual thread executor to execute the tasks.
* Note that the method must use a synchronous signature.
*
* @return {@code true} if the scheduled method runs on a virtual thread.
*/
default boolean isRunningOnVirtualThread() {
return false;
}

}
4 changes: 4 additions & 0 deletions extensions/scheduler/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx-deployment</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,20 @@ public final class ScheduledBusinessMethodItem extends MultiBuildItem {
private final List<AnnotationInstance> schedules;
private final MethodInfo method;
private final boolean nonBlocking;
private final boolean runOnVirtualThread;

public ScheduledBusinessMethodItem(BeanInfo bean, MethodInfo method, List<AnnotationInstance> schedules) {
this(bean, method, schedules, false);
this(bean, method, schedules, false, false);
}

public ScheduledBusinessMethodItem(BeanInfo bean, MethodInfo method, List<AnnotationInstance> schedules,
boolean hasNonBlockingAnnotation) {
boolean hasNonBlockingAnnotation, boolean hasRunOnVirtualThreadAnnotation) {
this.bean = bean;
this.method = method;
this.schedules = schedules;
this.nonBlocking = hasNonBlockingAnnotation || SchedulerDotNames.COMPLETION_STAGE.equals(method.returnType().name())
|| SchedulerDotNames.UNI.equals(method.returnType().name()) || KotlinUtil.isSuspendMethod(method);
this.runOnVirtualThread = hasRunOnVirtualThreadAnnotation;
}

/**
Expand All @@ -48,6 +50,10 @@ public boolean isNonBlocking() {
return nonBlocking;
}

public boolean isRunOnVirtualThread() {
return runOnVirtualThread;
}

public String getMethodDescription() {
return method.declaringClass().name() + "#" + method.name() + "()";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import io.quarkus.scheduler.Scheduled;
import io.smallrye.common.annotation.NonBlocking;
import io.smallrye.common.annotation.RunOnVirtualThread;

class SchedulerDotNames {

Expand All @@ -23,4 +24,6 @@ class SchedulerDotNames {
static final DotName ABSTRACT_COROUTINE_INVOKER = DotName
.createSimple("io.quarkus.scheduler.kotlin.runtime.AbstractCoroutineInvoker");

static final DotName RUN_ON_VIRTUAL_THREAD = DotName.createSimple(RunOnVirtualThread.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ void collectScheduledMethods(BeanArchiveIndexBuildItem beanArchives, BeanDiscove
MethodInfo method = annotationInstance.target().asMethod();
if (Modifier.isStatic(method.flags()) && !KotlinUtil.isSuspendMethod(method)) {
scheduledBusinessMethods.produce(new ScheduledBusinessMethodItem(null, method, schedules,
transformedAnnotations.hasAnnotation(method, SchedulerDotNames.NON_BLOCKING)));
transformedAnnotations.hasAnnotation(method, SchedulerDotNames.NON_BLOCKING),
transformedAnnotations.hasAnnotation(method, SchedulerDotNames.RUN_ON_VIRTUAL_THREAD)));
LOGGER.debugf("Found scheduled static method %s declared on %s", method, method.declaringClass().name());
}
}
Expand Down Expand Up @@ -176,7 +177,8 @@ private void collectScheduledMethods(IndexView index, TransformedAnnotationsBuil
}
if (schedules != null) {
scheduledBusinessMethods.produce(new ScheduledBusinessMethodItem(bean, method, schedules,
transformedAnnotations.hasAnnotation(method, SchedulerDotNames.NON_BLOCKING)));
transformedAnnotations.hasAnnotation(method, SchedulerDotNames.NON_BLOCKING),
transformedAnnotations.hasAnnotation(method, SchedulerDotNames.RUN_ON_VIRTUAL_THREAD)));
LOGGER.debugf("Found scheduled business method %s declared on %s", method, bean);
}
}
Expand Down Expand Up @@ -207,6 +209,11 @@ void validateScheduledBusinessMethods(SchedulerConfig config, List<ScheduledBusi
continue;
}

if (scheduledMethod.isNonBlocking() && scheduledMethod.isRunOnVirtualThread()) {
errors.add(new IllegalStateException("@Scheduled method cannot be non-blocking and annotated " +
"with @RunOnVirtualThread: " + scheduledMethod.getMethodDescription()));
}

boolean isSuspendMethod = KotlinUtil.isSuspendMethod(method);

// Validate method params and return type
Expand Down Expand Up @@ -510,6 +517,13 @@ private String generateInvoker(ScheduledBusinessMethodItem scheduledMethod, Clas
if (scheduledMethod.isNonBlocking()) {
MethodCreator isBlocking = invokerCreator.getMethodCreator("isBlocking", boolean.class);
isBlocking.returnValue(isBlocking.load(false));
isBlocking.close();
}

if (scheduledMethod.isRunOnVirtualThread()) {
MethodCreator isRunOnVirtualThread = invokerCreator.getMethodCreator("isRunningOnVirtualThread", boolean.class);
isRunOnVirtualThread.returnValue(isRunOnVirtualThread.load(true));
isRunOnVirtualThread.close();
}

invokerCreator.close();
Expand Down
4 changes: 4 additions & 0 deletions extensions/scheduler/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler-kotlin</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import io.quarkus.scheduler.common.runtime.util.SchedulerUtils;
import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig.StartMode;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.quarkus.virtual.threads.VirtualThreadsRecorder;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Context;
import io.vertx.core.Handler;
Expand Down Expand Up @@ -390,16 +391,32 @@ void execute(ZonedDateTime now, Vertx vertx) {
Context context = VertxContext.getOrCreateDuplicatedContext(vertx);
VertxContextSafetyToggle.setContextSafe(context, true);
if (invoker.isBlocking()) {
context.executeBlocking(new Handler<Promise<Object>>() {
@Override
public void handle(Promise<Object> p) {
try {
doInvoke(now, scheduledFireTime);
} finally {
p.complete();
if (invoker.isRunningOnVirtualThread()) {
// While counter-intuitive, we switch to a safe context, so that context is captured and attached
// to the virtual thread.
context.runOnContext(new Handler<Void>() {
@Override
public void handle(Void event) {
VirtualThreadsRecorder.getCurrent().execute(new Runnable() {
@Override
public void run() {
doInvoke(now, scheduledFireTime);
}
});
}
}
}, false);
});
} else {
context.executeBlocking(new Handler<Promise<Object>>() {
@Override
public void handle(Promise<Object> p) {
try {
doInvoke(now, scheduledFireTime);
} finally {
p.complete();
}
}
}, false);
}
} else {
context.runOnContext(new Handler<Void>() {
@Override
Expand Down Expand Up @@ -639,6 +656,11 @@ public CompletionStage<Void> invokeBean(ScheduledExecution execution) {
return CompletableFuture.failedStage(e);
}
}

@Override
public boolean isRunningOnVirtualThread() {
return runOnVirtualThread;
}
};
} else {
invoker = new DefaultInvoker() {
Expand Down
2 changes: 2 additions & 0 deletions integration-tests/virtual-threads/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
<module>amqp-virtual-threads</module>
<module>jms-virtual-threads</module>
<module>vertx-event-bus-virtual-threads</module>
<module>scheduler-virtual-threads</module>
<module>quartz-virtual-threads</module>
</modules>

<build>
Expand Down
Loading

0 comments on commit f2d6cc1

Please sign in to comment.