Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for virtual threads in smallrye-reactive-messaging #34557

Merged
merged 3 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -56,7 +56,7 @@ public class AeshConsole extends QuarkusConsole {
* Because Aesh can log deadlocks are possible on Windows if a write fails, unless care
* is taken.
*/
private final LinkedBlockingDeque<String> writeQueue = new LinkedBlockingDeque<>();
private final ConcurrentLinkedQueue<String> writeQueue = new ConcurrentLinkedQueue<>();
private final Lock connectionLock = new ReentrantLock();
private static final ThreadLocal<Boolean> IN_WRITE = new ThreadLocal<>() {
@Override
Expand Down
6 changes: 6 additions & 0 deletions docs/src/main/asciidoc/amqp-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,12 @@ The first one provides more fine-grained tuning such as the worker pool to use a
The second one, used also with other reactive features of Quarkus, uses the default worker pool and preserves the order.
====

[TIP]
.@RunOnVirtualThread
====
For running the blocking processing on Java _virtual threads_, see the xref:messaging-virtual-threads.adoc[Quarkus Virtual Thread support with Reactive Messaging documentation].
====

[TIP]
.@Transactional
====
Expand Down
6 changes: 6 additions & 0 deletions docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,12 @@ The second one, used also with other reactive features of Quarkus, uses the defa
Detailed information on the usage of `@Blocking` annotation can be found in https://smallrye.io/smallrye-reactive-messaging/latest/concepts/blocking/[SmallRye Reactive Messaging – Handling blocking execution].
====

[TIP]
.@RunOnVirtualThread
====
For running the blocking processing on Java _virtual threads_, see the xref:messaging-virtual-threads.adoc[Quarkus Virtual Thread support with Reactive Messaging documentation].
====

[TIP]
.@Transactional
====
Expand Down
167 changes: 167 additions & 0 deletions docs/src/main/asciidoc/messaging-virtual-threads.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
= Quarkus Virtual Thread support with Reactive Messaging

include::_attributes.adoc[]
:runonvthread: https://javadoc.io/doc/io.smallrye.common/smallrye-common-annotation/latest/io/smallrye/common/annotation/RunOnVirtualThread.html
:rm_blocking_annotation: https://javadoc.io/doc/io.smallrye.reactive/smallrye-reactive-messaging-api/latest/io/smallrye/reactive/messaging/annotations/Blocking.html
:rm_blocking_docs: http://smallrye.io/smallrye-reactive-messaging/4.8.0/concepts/blocking/

This guide explains how to benefit from Java virtual threads when writing message processing applications in Quarkus.

[TIP]
====
This guide focuses on using virtual threads with Reactive Messaging extensions.
Please refer to xref:virtual-threads.adoc[Writing simpler reactive REST services with Quarkus Virtual Thread support]
to read more about Java virtual threads in general and the Quarkus Virtual Thread support for REST services.
For reference guides of specific Reactive Messaging extensions see xref:kafka.adoc[Apache Kafka Reference Guide],
xref:amqp-reference.adoc[Reactive Messaging AMQP 1.0 Connector], xref:rabbitmq-reference.adoc[Reactive Messaging RabbitMQ Connector] or xref:pulsar.adoc[Apache Pulsar Reference Guide].
====

By default, Reactive Messaging invokes message processing methods on an event-loop thread.
See the xref:quarkus-reactive-architecture.adoc[Quarkus Reactive Architecture documentation] for further details on this topic.
But, you sometimes need to combine Reactive Messaging with blocking processing such as calling external services or database operations.
For this, you can use the link:{rm_blocking_annotation}[@Blocking] annotation indicating that the processing is _blocking_ and should be run on a worker thread.
You can read more on the blocking processing in link:{rm_blocking_docs}[SmallRye Reactive Messaging documentation].

The idea behind Quarkus Virtual Thread support for Reactive Messaging is to offload the message processing on virtual threads,
instead of running it on an event-loop thread or a worker thread.

To enable virtual thread support on a message consumer method, simply add the link:{runonvthread}[@RunOnVirtualThread] annotation to the method.
If the JDK is compatible (Java 19 or later versions) then each incoming message will be offloaded to a new virtual thread.
It will then be possible to perform blocking operations without blocking the platform thread upon which the virtual thread is mounted.

== Example using the Reactive Messaging Kafka extension

Let's see an example of how to process Kafka records on virtual threads.
First, make sure to have a reactive messaging extension dependency to your build file:

[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"]
.pom.xml
----
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
----

[source,gradle,role="secondary asciidoc-tabs-target-sync-gradle"]
.build.gradle
----
implementation("io.quarkus:quarkus-smallrye-reactive-messaging-kafka")
----

You also need to make sure that you are using Java 19 or later, this can be enforced in your `pom.xml` file with the following:

[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"]
.pom.xml
----
<properties>
<maven.compiler.source>19</maven.compiler.source>
<maven.compiler.target>19</maven.compiler.target>
</properties>
----

Virtual threads are still a preview feature, so you need to start your application with the `--enable-preview` flag:

[source, bash]
----
java --enable-preview -jar target/quarkus-app/quarkus-run.jar
----

or to use the Quarkus Dev mode, insert the following to the `quarkus-maven-plugin` configuration:

[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"]
.pom.xml
----
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.version}</version>
<executions>
<execution>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
<configuration>
<source>19</source>
<target>19</target>
<compilerArgs>
<arg>--enable-preview</arg>
</compilerArgs>
<jvmArgs>--enable-preview --add-opens java.base/java.lang=ALL-UNNAMED</jvmArgs>
</configuration>
</plugin>
----

Then you can start using the annotation `@RunOnVirtualThread` on your consumer methods also annotated with `@Incoming`.
In the following example we'll use the xref:rest-client-reactive.adoc[RESTEasy Reactive REST Client] to make a blocking call to a REST endpoint:

[source, java]
----
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import io.smallrye.common.annotation.RunOnVirtualThread;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class PriceConsumer {
@RestClient // <2>
PriceAlertService alertService;
@Incoming("prices")
@RunOnVirtualThread // <1>
public void consume(double price) {
if (price > 90.0) {
alertService.alert(price); // <3>
}
}
@Outgoing("prices-out") // <4>
public Multi<Double> randomPriceGenerator() {
return Multi.createFrom().<Random, Double>generator(Random::new, (r, e) -> {
e.emit(r.nextDouble(100));
return r;
});
}
}
----

<1> `@RunOnVirtualThread` annotation on the `@Incoming` method ensures that the method will be called on a virtual thread.
<2> the REST client stub is injected with the `@RestClient` annotation.
<3> `alert` method blocks the virtual thread until the REST call returns.
<4> This `@Outgoing` method generates random prices and writes them a Kafka topic to be consumed back by the application.

Note that by default Reactive Messaging message processing happens sequentially, preserving the order of messages.
In the same way, `@Blocking(ordered = false)` annotation changes this behaviour,
using `@RunOnVirtualThread` enforces concurrent message processing without preserving the order.

In order to leverage the lightweight nature of virtual threads, the default maximum concurrency for methods annotated with `@RunOnVirtualThread` is 1024.
As opposed to platform threads, virtual threads are not pooled and created per message. Therefore the maximum concurrency applies separately to all `@RunOnVirtualThread` methods.

There are two ways to customize the concurrency level:

1. The `@RunOnVirtualThread` annotation can be used together with the link:{rm_blocking_annotation}[@Blocking] annotation to specify a worker name.
+
[source, java]
----
@Incoming("prices")
@RunOnVirtualThread
@Blocking("my-worker")
public void consume(double price) {
//...
}
----
+
Then, for example, to set the maximum concurrency of this method down to 30, set using the config property `smallrye.messaging.worker.my-worker.max-concurrency=30`.
+
2. For every `@RunOnVirtualThread` method that is not configured with a worker name, you can use the config property `smallrye.messaging.worker.<virtual-thread>.max-concurrency`.

59 changes: 59 additions & 0 deletions docs/src/main/asciidoc/pulsar.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,65 @@ Following types can be injected as channels:
As with the previous `Message` example, if your injected channel receives payloads (`Multi<T>`), it acknowledges the message automatically, and support multiple subscribers.
If your injected channel receives Message (`Multi<Message<T>>`), you will be responsible for the acknowledgment and broadcasting.

[[blocking-processing]]
=== Blocking processing

Reactive Messaging invokes your method on an I/O thread.
See the xref:quarkus-reactive-architecture.adoc[Quarkus Reactive Architecture documentation] for further details on this topic.
But, you often need to combine Reactive Messaging with blocking processing such as database interactions.
For this, you need to use the `@Blocking` annotation indicating that the processing is _blocking_ and should not be run on the caller thread.

For example, The following code illustrates how you can store incoming payloads to a database using Hibernate with Panache:

[source,java]
----
import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;
@ApplicationScoped
public class PriceStorage {
@Incoming("prices")
@Transactional
public void store(int priceInUsd) {
Price price = new Price();
price.value = priceInUsd;
price.persist();
}
}
----

[NOTE]
====
There are 2 `@Blocking` annotations:
1. `io.smallrye.reactive.messaging.annotations.Blocking`
2. `io.smallrye.common.annotation.Blocking`
They have the same effect.
Thus, you can use both.
The first one provides more fine-grained tuning such as the worker pool to use and whether it preserves the order.
The second one, used also with other reactive features of Quarkus, uses the default worker pool and preserves the order.
Detailed information on the usage of `@Blocking` annotation can be found in https://smallrye.io/smallrye-reactive-messaging/latest/concepts/blocking/[SmallRye Reactive Messaging – Handling blocking execution].
====

[TIP]
.@RunOnVirtualThread
====
For running the blocking processing on Java _virtual threads_, see the xref:messaging-virtual-threads.adoc[Quarkus Virtual Thread support with Reactive Messaging documentation].
====

[TIP]
.@Transactional
====
If your method is annotated with `@Transactional`, it will be considered _blocking_ automatically, even if the method is not annotated with `@Blocking`.
====

=== Pulsar Subscription Types

Pulsar *subscriptionType* consumer configuration can be used flexibly to achieve different messaging scenarios, such as publish-subscribe or queuing.
Expand Down
6 changes: 6 additions & 0 deletions docs/src/main/asciidoc/rabbitmq-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,12 @@ The first one provides more fine-grained tuning such as the worker pool to use a
The second one, used also with other reactive features of Quarkus, uses the default worker pool and preserves the order.
====

[TIP]
.@RunOnVirtualThread
====
For running the blocking processing on Java _virtual threads_, see the xref:messaging-virtual-threads.adoc[Quarkus Virtual Thread support with Reactive Messaging documentation].
====

== Customizing the underlying RabbitMQ client

The connector uses the Vert.x RabbitMQ client underneath.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.KOTLIN_UNIT;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.MERGE;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.OUTGOING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.RUN_ON_VIRTUAL_THREAD;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.SMALLRYE_BLOCKING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.TRANSACTIONAL;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.VOID_CLASS;
Expand All @@ -33,6 +34,7 @@
import io.quarkus.runtime.configuration.ConfigurationException;
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusMediatorConfiguration;
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusParameterDescriptor;
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusWorkerPoolRegistry;
import io.quarkus.smallrye.reactivemessaging.runtime.TypeInfo;
import io.smallrye.reactive.messaging.Shape;
import io.smallrye.reactive.messaging.annotations.Blocking;
Expand Down Expand Up @@ -178,17 +180,32 @@ public Integer get() {
AnnotationInstance blockingAnnotation = methodInfo.annotation(BLOCKING);
AnnotationInstance smallryeBlockingAnnotation = methodInfo.annotation(SMALLRYE_BLOCKING);
AnnotationInstance transactionalAnnotation = methodInfo.annotation(TRANSACTIONAL);
if (blockingAnnotation != null || smallryeBlockingAnnotation != null || transactionalAnnotation != null) {
AnnotationInstance runOnVirtualThreadAnnotation = methodInfo.annotation(RUN_ON_VIRTUAL_THREAD);
if (blockingAnnotation != null || smallryeBlockingAnnotation != null || transactionalAnnotation != null
|| runOnVirtualThreadAnnotation != null) {
mediatorConfigurationSupport.validateBlocking(validationOutput);
configuration.setBlocking(true);
if (blockingAnnotation != null) {
AnnotationValue ordered = blockingAnnotation.value("ordered");
configuration.setBlockingExecutionOrdered(ordered == null || ordered.asBoolean());
if (runOnVirtualThreadAnnotation != null) {
if (ordered != null && ordered.asBoolean()) {
throw new ConfigurationException(
"The method `" + methodInfo.name()
+ "` is using `@RunOnVirtualThread` but explicitly set as `@Blocking(ordered = true)`");
}
configuration.setBlockingExecutionOrdered(false);
configuration.setWorkerPoolName(QuarkusWorkerPoolRegistry.DEFAULT_VIRTUAL_THREAD_WORKER);
} else {
configuration.setBlockingExecutionOrdered(ordered == null || ordered.asBoolean());
}
String poolName;
if (blockingAnnotation.value() != null &&
!(poolName = blockingAnnotation.value().asString()).equals(Blocking.DEFAULT_WORKER_POOL)) {
configuration.setWorkerPoolName(poolName);
}
} else if (runOnVirtualThreadAnnotation != null) {
configuration.setBlockingExecutionOrdered(false);
configuration.setWorkerPoolName(QuarkusWorkerPoolRegistry.DEFAULT_VIRTUAL_THREAD_WORKER);
} else {
configuration.setBlockingExecutionOrdered(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.jboss.jandex.DotName;

import io.smallrye.common.annotation.RunOnVirtualThread;
import io.smallrye.reactive.messaging.MessageConverter;
import io.smallrye.reactive.messaging.MutinyEmitter;
import io.smallrye.reactive.messaging.annotations.Blocking;
Expand Down Expand Up @@ -89,6 +90,7 @@ public final class ReactiveMessagingDotNames {
.createSimple("io.quarkus.smallrye.reactivemessaging.runtime.kotlin.AbstractSubscribingCoroutineInvoker");

static final DotName TRANSACTIONAL = DotName.createSimple("jakarta.transaction.Transactional");
static final DotName RUN_ON_VIRTUAL_THREAD = DotName.createSimple(RunOnVirtualThread.class.getName());

private ReactiveMessagingDotNames() {
}
Expand Down
Loading