Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add @RunOnVirtualThread support on classes in reactive messaging #34876

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 79 additions & 15 deletions docs/src/main/asciidoc/messaging-virtual-threads.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ You also need to make sure that you are using Java 19 or later, this can be enfo
.pom.xml
----
<properties>
<maven.compiler.source>19</maven.compiler.source>
<maven.compiler.target>19</maven.compiler.target>
<maven.compiler.source>20</maven.compiler.source>
<maven.compiler.target>20</maven.compiler.target>
</properties>
----

Expand All @@ -73,24 +73,16 @@ or to use the Quarkus Dev mode, insert the following to the `quarkus-maven-plugi
[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"]
.pom.xml
----
<maven.compiler.release>20</maven.compiler.release>
<!-- ... -->
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.version}</version>
<executions>
<execution>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
<artifactId>maven-compiler-plugin</artifactId>
<version>${compiler-plugin.version}</version>
<configuration>
<source>19</source>
<target>19</target>
<compilerArgs>
<arg>-parameters</arg>
<arg>--enable-preview</arg>
</compilerArgs>
<jvmArgs>--enable-preview --add-opens java.base/java.lang=ALL-UNNAMED</jvmArgs>
</configuration>
</plugin>
----
Expand Down Expand Up @@ -144,6 +136,78 @@ Note that by default Reactive Messaging message processing happens sequentially,
In the same way, `@Blocking(ordered = false)` annotation changes this behaviour,
using `@RunOnVirtualThread` enforces concurrent message processing without preserving the order.

== Use the @RunOnVirtualThread annotation

=== Methods signatures eligible to @RunOnVirtualThread

Only method can be annotated with `@Blocking` can use `@RunOnVirtualThreads`.
The eligible method signatures are:

- `@Outgoing("channel-out") O generator()`
- `@Outgoing("channel-out") Message<O> generator()`
- `@Incoming("channel-in") @Outgoing("channel-out") O process(I in)`
- `@Incoming("channel-in") @Outgoing("channel-out") Message<O> process(I in)`
- `@Incoming("channel-in") void consume(I in)`
- `@Incoming("channel-in") Uni<Void> consume(I in)`
- `@Incoming("channel-in") Uni<Void> consume(Message<I> msg)`
- `@Incoming("channel-in") CompletionStage<Void> consume(I in)`
- `@Incoming("channel-in") CompletionStage<Void> consume(Message<I> msg)`

=== Use of @RunOnVirtualThread annotation on methods and classes

You can use the `@RunOnVirtualThread` annotation:

1. directly on a reactive messaging method - this method will be considered _blocking_ and executed on a virtual thread
2. on the class containing reactive messaging methods - the methods from this class annotation with `@Blocking` will be executed on virtual thread, except if the annotation defines a pool name configured to use regular worker threads

For example, you can use `@RunOnVirtualThread` directly on the method:

[source, java]
----
@ApplicationScoped
public class MyBean {

@Incoming("in")
@Outgoing("out")
@RunOnVirtualThread
public String process(String s) {
// Called on a new virtual thread for every incoming message
}
}
----

Alternatively, you can use `@RunOnVirtualThread` on the class itself:

[source, java]
----
@ApplicationScoped
@RunOnVirtualThread
public class MyBean {

@Incoming("in1")
@Outgoing("out1")
public String process(String s) {
// Called on the event loop - no @Blocking annotation
}

@Incoming("in2")
@Outgoing("out2")
@Blocking
public String process(String s) {
// Call on a new virtual thread for every incoming message
}

@Incoming("in3")
@Outgoing("out3")
@Blocking("my-worker-pool")
public String process(String s) {
// Called on a regular worker thread from the pool named "my-worker-pool"
}
}
----

== Control the maximum concurrency

In order to leverage the lightweight nature of virtual threads, the default maximum concurrency for methods annotated with `@RunOnVirtualThread` is 1024.
As opposed to platform threads, virtual threads are not pooled and created per message. Therefore the maximum concurrency applies separately to all `@RunOnVirtualThread` methods.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,15 @@ public Integer get() {
AnnotationInstance smallryeBlockingAnnotation = methodInfo.annotation(SMALLRYE_BLOCKING);
AnnotationInstance transactionalAnnotation = methodInfo.annotation(TRANSACTIONAL);
AnnotationInstance runOnVirtualThreadAnnotation = methodInfo.annotation(RUN_ON_VIRTUAL_THREAD);
// IF @RunOnVirtualThread is used on the declaring class, it forces all @Blocking method to be run on virtual threads.
AnnotationInstance runOnVirtualThreadClassAnnotation = methodInfo.declaringClass().annotation(RUN_ON_VIRTUAL_THREAD);
if (blockingAnnotation != null || smallryeBlockingAnnotation != null || transactionalAnnotation != null
|| runOnVirtualThreadAnnotation != null) {
mediatorConfigurationSupport.validateBlocking(validationOutput);
configuration.setBlocking(true);
if (blockingAnnotation != null) {
AnnotationValue ordered = blockingAnnotation.value("ordered");
if (runOnVirtualThreadAnnotation != null) {
if (runOnVirtualThreadAnnotation != null || runOnVirtualThreadClassAnnotation != null) {
if (ordered != null && ordered.asBoolean()) {
throw new ConfigurationException(
"The method `" + methodInfo.name()
Expand All @@ -203,7 +205,7 @@ public Integer get() {
!(poolName = blockingAnnotation.value().asString()).equals(Blocking.DEFAULT_WORKER_POOL)) {
configuration.setWorkerPoolName(poolName);
}
} else if (runOnVirtualThreadAnnotation != null) {
} else if (runOnVirtualThreadAnnotation != null || runOnVirtualThreadClassAnnotation != null) {
configuration.setBlockingExecutionOrdered(false);
configuration.setWorkerPoolName(QuarkusWorkerPoolRegistry.DEFAULT_VIRTUAL_THREAD_WORKER);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,19 @@ private <T> Uni<T> runOnVirtualThread(Context currentContext, Uni<T> uni) {
return uni.runSubscriptionOn(VirtualExecutorSupplier.Instance.get())
.onItemOrFailure().transformToUni((item, failure) -> {
return Uni.createFrom().emitter(emitter -> {
if (failure != null) {
currentContext.runOnContext(() -> emitter.fail(failure));
if (currentContext != null) {
if (failure != null) {
currentContext.runOnContext(() -> emitter.fail(failure));
} else {
currentContext.runOnContext(() -> emitter.complete(item));
}
} else {
currentContext.runOnContext(() -> emitter.complete(item));
// Some method do not have a context (generator methods)
if (failure != null) {
emitter.fail(failure);
} else {
emitter.complete(item);
}
}
});
});
Expand Down