-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Guide for using virtual threads with reactive messaging extensions
- Loading branch information
1 parent
1b7d001
commit 1c0b57c
Showing
5 changed files
with
244 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
= Quarkus Virtual Thread support with Reactive Messaging | ||
|
||
include::_attributes.adoc[] | ||
:runonvthread: https://javadoc.io/doc/io.smallrye.common/smallrye-common-annotation/latest/io/smallrye/common/annotation/RunOnVirtualThread.html | ||
:rm_blocking_annotation: https://javadoc.io/doc/io.smallrye.reactive/smallrye-reactive-messaging-api/latest/io/smallrye/reactive/messaging/annotations/Blocking.html | ||
:rm_blocking_docs: http://smallrye.io/smallrye-reactive-messaging/4.8.0/concepts/blocking/ | ||
|
||
This guide explains how to benefit from Java virtual threads when writing message processing applications in Quarkus. | ||
|
||
[TIP] | ||
==== | ||
This guide focuses on using virtual threads with Reactive Messaging extensions. | ||
Please refer to xref:virtual-threads.adoc[Writing simpler reactive REST services with Quarkus Virtual Thread support] | ||
to read more about Java virtual threads in general and the Quarkus Virtual Thread support for REST services. | ||
For reference guides of specific Reactive Messaging extensions see xref:kafka.adoc[Apache Kafka Reference Guide], | ||
xref:amqp-reference.adoc[Reactive Messaging AMQP 1.0 Connector], xref:rabbitmq-reference.adoc[Reactive Messaging RabbitMQ Connector] or xref:pulsar.adoc[Apache Pulsar Reference Guide]. | ||
==== | ||
|
||
By default, Reactive Messaging invokes message processing methods on an event-loop thread. | ||
See the xref:quarkus-reactive-architecture.adoc[Quarkus Reactive Architecture documentation] for further details on this topic. | ||
But, you sometimes need to combine Reactive Messaging with blocking processing such as calling external services or database operations. | ||
For this, you can use the link:{rm_blocking_annotation}[@Blocking] annotation indicating that the processing is _blocking_ and should be run on a worker thread. | ||
You can read more on the blocking processing in link:{rm_blocking_docs}[SmallRye Reactive Messaging documentation]. | ||
|
||
The idea behind Quarkus Virtual Thread support for Reactive Messaging is to offload the message processing on virtual threads, | ||
instead of running it on an event-loop thread or a worker thread. | ||
|
||
To enable virtual thread support on a message consumer method, simply add the link:{runonvthread}[@RunOnVirtualThread] annotation to the method. | ||
If the JDK is compatible (Java 19 or later versions) then each incoming message will be offloaded to a new virtual thread. | ||
It will then be possible to perform blocking operations without blocking the platform thread upon which the virtual thread is mounted. | ||
|
||
== Example using the Reactive Messaging Kafka extension | ||
|
||
Let's see an example of how to process Kafka records on virtual threads. | ||
First, make sure to have a reactive messaging extension dependency to your build file: | ||
|
||
[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"] | ||
.pom.xml | ||
---- | ||
<dependency> | ||
<groupId>io.quarkus</groupId> | ||
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId> | ||
</dependency> | ||
---- | ||
|
||
[source,gradle,role="secondary asciidoc-tabs-target-sync-gradle"] | ||
.build.gradle | ||
---- | ||
implementation("io.quarkus:quarkus-smallrye-reactive-messaging-kafka") | ||
---- | ||
|
||
You also need to make sure that you are using Java 19 or later, this can be enforced in your `pom.xml` file with the following: | ||
|
||
[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"] | ||
.pom.xml | ||
---- | ||
<properties> | ||
<maven.compiler.source>19</maven.compiler.source> | ||
<maven.compiler.target>19</maven.compiler.target> | ||
</properties> | ||
---- | ||
|
||
Virtual threads are still a preview feature, so you need to start your application with the `--enable-preview` flag: | ||
|
||
[source, bash] | ||
---- | ||
java --enable-preview -jar target/quarkus-app/quarkus-run.jar | ||
---- | ||
|
||
or to use the Quarkus Dev mode, insert the following to the `quarkus-maven-plugin` configuration: | ||
|
||
[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"] | ||
.pom.xml | ||
---- | ||
<plugin> | ||
<groupId>io.quarkus</groupId> | ||
<artifactId>quarkus-maven-plugin</artifactId> | ||
<version>${quarkus.version}</version> | ||
<executions> | ||
<execution> | ||
<goals> | ||
<goal>build</goal> | ||
</goals> | ||
</execution> | ||
</executions> | ||
<configuration> | ||
<source>19</source> | ||
<target>19</target> | ||
<compilerArgs> | ||
<arg>--enable-preview</arg> | ||
</compilerArgs> | ||
<jvmArgs>--enable-preview --add-opens java.base/java.lang=ALL-UNNAMED</jvmArgs> | ||
</configuration> | ||
</plugin> | ||
---- | ||
|
||
Then you can start using the annotation `@RunOnVirtualThread` on your consumer methods also annotated with `@Incoming`. | ||
In the following example we'll use the xref:rest-client-reactive.adoc[RESTEasy Reactive REST Client] to make a blocking call to a REST endpoint: | ||
|
||
[source, java] | ||
---- | ||
import org.eclipse.microprofile.reactive.messaging.Incoming; | ||
import org.eclipse.microprofile.reactive.messaging.Outgoing; | ||
import org.eclipse.microprofile.rest.client.inject.RestClient; | ||
import io.smallrye.common.annotation.RunOnVirtualThread; | ||
import jakarta.enterprise.context.ApplicationScoped; | ||
@ApplicationScoped | ||
public class PriceConsumer { | ||
@RestClient // <2> | ||
PriceAlertService alertService; | ||
@Incoming("prices") | ||
@RunOnVirtualThread // <1> | ||
public void consume(double price) { | ||
if (price > 90.0) { | ||
alertService.alert(price); // <3> | ||
} | ||
} | ||
@Outgoing("prices-out") // <4> | ||
public Multi<Double> randomPriceGenerator() { | ||
return Multi.createFrom().<Random, Double>generator(Random::new, (r, e) -> { | ||
e.emit(r.nextDouble(100)); | ||
return r; | ||
}); | ||
} | ||
} | ||
---- | ||
|
||
<1> `@RunOnVirtualThread` annotation on the `@Incoming` method ensures that the method will be called on a virtual thread. | ||
<2> the REST client stub is injected with the `@RestClient` annotation. | ||
<3> `alert` method blocks the virtual thread until the REST call returns. | ||
<4> This `@Outgoing` method generates random prices and writes them a Kafka topic to be consumed back by the application. | ||
|
||
Note that by default Reactive Messaging message processing happens sequentially, preserving the order of messages. | ||
In the same way, `@Blocking(ordered = false)` annotation changes this behaviour, | ||
using `@RunOnVirtualThread` enforces concurrent message processing without preserving the order. | ||
|
||
In order to leverage the lightweight nature of virtual threads, the default maximum concurrency for methods annotated with `@RunOnVirtualThread` is 1024. | ||
As opposed to platform threads, virtual threads are not pooled and created per message. Therefore the maximum concurrency applies separately to all `@RunOnVirtualThread` methods. | ||
|
||
There are two ways to customize the concurrency level: | ||
|
||
1. The `@RunOnVirtualThread` annotation can be used together with the link:{rm_blocking_annotation}[@Blocking] annotation to specify a worker name. | ||
+ | ||
[source, java] | ||
---- | ||
@Incoming("prices") | ||
@RunOnVirtualThread | ||
@Blocking("my-worker") | ||
public void consume(double price) { | ||
//... | ||
} | ||
---- | ||
+ | ||
Then, for example, to set the maximum concurrency of this method down to 30, set using the config property `smallrye.messaging.worker.my-worker.max-concurrency=30`. | ||
+ | ||
2. For every `@RunOnVirtualThread` method that is not configured with a worker name, you can use the config property `smallrye.messaging.worker.<virtual-thread>.max-concurrency`. | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters