diff --git a/docs/src/main/asciidoc/amqp-reference.adoc b/docs/src/main/asciidoc/amqp-reference.adoc index 0b870346c580d..7da796b9f759c 100644 --- a/docs/src/main/asciidoc/amqp-reference.adoc +++ b/docs/src/main/asciidoc/amqp-reference.adoc @@ -411,6 +411,12 @@ The first one provides more fine-grained tuning such as the worker pool to use a The second one, used also with other reactive features of Quarkus, uses the default worker pool and preserves the order. ==== +[TIP] +.@RunOnVirtualThread +==== +For running the blocking processing on Java _virtual threads_, see the xref:messaging-virtual-threads.adoc[Quarkus Virtual Thread support with Reactive Messaging documentation]. +==== + [TIP] .@Transactional ==== diff --git a/docs/src/main/asciidoc/kafka.adoc b/docs/src/main/asciidoc/kafka.adoc index 7dda7c2f445c5..27c695fd9426b 100644 --- a/docs/src/main/asciidoc/kafka.adoc +++ b/docs/src/main/asciidoc/kafka.adoc @@ -284,6 +284,12 @@ The second one, used also with other reactive features of Quarkus, uses the defa Detailed information on the usage of `@Blocking` annotation can be found in https://smallrye.io/smallrye-reactive-messaging/latest/concepts/blocking/[SmallRye Reactive Messaging – Handling blocking execution]. ==== +[TIP] +.@RunOnVirtualThread +==== +For running the blocking processing on Java _virtual threads_, see the xref:messaging-virtual-threads.adoc[Quarkus Virtual Thread support with Reactive Messaging documentation]. +==== + [TIP] .@Transactional ==== diff --git a/docs/src/main/asciidoc/messaging-virtual-threads.adoc b/docs/src/main/asciidoc/messaging-virtual-threads.adoc new file mode 100644 index 0000000000000..f07e9cff31c56 --- /dev/null +++ b/docs/src/main/asciidoc/messaging-virtual-threads.adoc @@ -0,0 +1,167 @@ += Quarkus Virtual Thread support with Reactive Messaging + +include::_attributes.adoc[] +:runonvthread: https://javadoc.io/doc/io.smallrye.common/smallrye-common-annotation/latest/io/smallrye/common/annotation/RunOnVirtualThread.html +:rm_blocking_annotation: https://javadoc.io/doc/io.smallrye.reactive/smallrye-reactive-messaging-api/latest/io/smallrye/reactive/messaging/annotations/Blocking.html +:rm_blocking_docs: http://smallrye.io/smallrye-reactive-messaging/4.8.0/concepts/blocking/ + +This guide explains how to benefit from Java virtual threads when writing message processing applications in Quarkus. + +[TIP] +==== +This guide focuses on using virtual threads with Reactive Messaging extensions. +Please refer to xref:virtual-threads.adoc[Writing simpler reactive REST services with Quarkus Virtual Thread support] +to read more about Java virtual threads in general and the Quarkus Virtual Thread support for REST services. + +For reference guides of specific Reactive Messaging extensions see xref:kafka.adoc[Apache Kafka Reference Guide], +xref:amqp-reference.adoc[Reactive Messaging AMQP 1.0 Connector], xref:rabbitmq-reference.adoc[Reactive Messaging RabbitMQ Connector] or xref:pulsar.adoc[Apache Pulsar Reference Guide]. +==== + +By default, Reactive Messaging invokes message processing methods on an event-loop thread. +See the xref:quarkus-reactive-architecture.adoc[Quarkus Reactive Architecture documentation] for further details on this topic. +But, you sometimes need to combine Reactive Messaging with blocking processing such as calling external services or database operations. +For this, you can use the link:{rm_blocking_annotation}[@Blocking] annotation indicating that the processing is _blocking_ and should be run on a worker thread. +You can read more on the blocking processing in link:{rm_blocking_docs}[SmallRye Reactive Messaging documentation]. + +The idea behind Quarkus Virtual Thread support for Reactive Messaging is to offload the message processing on virtual threads, +instead of running it on an event-loop thread or a worker thread. + +To enable virtual thread support on a message consumer method, simply add the link:{runonvthread}[@RunOnVirtualThread] annotation to the method. +If the JDK is compatible (Java 19 or later versions) then each incoming message will be offloaded to a new virtual thread. +It will then be possible to perform blocking operations without blocking the platform thread upon which the virtual thread is mounted. + +== Example using the Reactive Messaging Kafka extension + +Let's see an example of how to process Kafka records on virtual threads. +First, make sure to have a reactive messaging extension dependency to your build file: + +[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"] +.pom.xml +---- + + io.quarkus + quarkus-smallrye-reactive-messaging-kafka + +---- + +[source,gradle,role="secondary asciidoc-tabs-target-sync-gradle"] +.build.gradle +---- +implementation("io.quarkus:quarkus-smallrye-reactive-messaging-kafka") +---- + +You also need to make sure that you are using Java 19 or later, this can be enforced in your `pom.xml` file with the following: + +[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"] +.pom.xml +---- + + 19 + 19 + +---- + +Virtual threads are still a preview feature, so you need to start your application with the `--enable-preview` flag: + +[source, bash] +---- +java --enable-preview -jar target/quarkus-app/quarkus-run.jar +---- + +or to use the Quarkus Dev mode, insert the following to the `quarkus-maven-plugin` configuration: + +[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"] +.pom.xml +---- + + io.quarkus + quarkus-maven-plugin + ${quarkus.version} + + + + build + + + + + 19 + 19 + + --enable-preview + + --enable-preview --add-opens java.base/java.lang=ALL-UNNAMED + + +---- + +Then you can start using the annotation `@RunOnVirtualThread` on your consumer methods also annotated with `@Incoming`. +In the following example we'll use the xref:rest-client-reactive.adoc[RESTEasy Reactive REST Client] to make a blocking call to a REST endpoint: + +[source, java] +---- +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.eclipse.microprofile.rest.client.inject.RestClient; + +import io.smallrye.common.annotation.RunOnVirtualThread; + +import jakarta.enterprise.context.ApplicationScoped; + +@ApplicationScoped +public class PriceConsumer { + + @RestClient // <2> + PriceAlertService alertService; + + + @Incoming("prices") + @RunOnVirtualThread // <1> + public void consume(double price) { + if (price > 90.0) { + alertService.alert(price); // <3> + } + } + + @Outgoing("prices-out") // <4> + public Multi randomPriceGenerator() { + return Multi.createFrom().generator(Random::new, (r, e) -> { + e.emit(r.nextDouble(100)); + return r; + }); + } + + +} +---- + +<1> `@RunOnVirtualThread` annotation on the `@Incoming` method ensures that the method will be called on a virtual thread. +<2> the REST client stub is injected with the `@RestClient` annotation. +<3> `alert` method blocks the virtual thread until the REST call returns. +<4> This `@Outgoing` method generates random prices and writes them a Kafka topic to be consumed back by the application. + +Note that by default Reactive Messaging message processing happens sequentially, preserving the order of messages. +In the same way, `@Blocking(ordered = false)` annotation changes this behaviour, +using `@RunOnVirtualThread` enforces concurrent message processing without preserving the order. + +In order to leverage the lightweight nature of virtual threads, the default maximum concurrency for methods annotated with `@RunOnVirtualThread` is 1024. +As opposed to platform threads, virtual threads are not pooled and created per message. Therefore the maximum concurrency applies separately to all `@RunOnVirtualThread` methods. + +There are two ways to customize the concurrency level: + +1. The `@RunOnVirtualThread` annotation can be used together with the link:{rm_blocking_annotation}[@Blocking] annotation to specify a worker name. ++ +[source, java] +---- +@Incoming("prices") +@RunOnVirtualThread +@Blocking("my-worker") +public void consume(double price) { + //... +} +---- ++ +Then, for example, to set the maximum concurrency of this method down to 30, set using the config property `smallrye.messaging.worker.my-worker.max-concurrency=30`. ++ +2. For every `@RunOnVirtualThread` method that is not configured with a worker name, you can use the config property `smallrye.messaging.worker..max-concurrency`. + diff --git a/docs/src/main/asciidoc/pulsar.adoc b/docs/src/main/asciidoc/pulsar.adoc index e1be2db74bc35..529684251e09d 100644 --- a/docs/src/main/asciidoc/pulsar.adoc +++ b/docs/src/main/asciidoc/pulsar.adoc @@ -254,6 +254,65 @@ Following types can be injected as channels: As with the previous `Message` example, if your injected channel receives payloads (`Multi`), it acknowledges the message automatically, and support multiple subscribers. If your injected channel receives Message (`Multi>`), you will be responsible for the acknowledgment and broadcasting. +[[blocking-processing]] +=== Blocking processing + +Reactive Messaging invokes your method on an I/O thread. +See the xref:quarkus-reactive-architecture.adoc[Quarkus Reactive Architecture documentation] for further details on this topic. +But, you often need to combine Reactive Messaging with blocking processing such as database interactions. +For this, you need to use the `@Blocking` annotation indicating that the processing is _blocking_ and should not be run on the caller thread. + +For example, The following code illustrates how you can store incoming payloads to a database using Hibernate with Panache: + +[source,java] +---- +import io.smallrye.reactive.messaging.annotations.Blocking; +import org.eclipse.microprofile.reactive.messaging.Incoming; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.transaction.Transactional; + +@ApplicationScoped +public class PriceStorage { + + @Incoming("prices") + @Transactional + public void store(int priceInUsd) { + Price price = new Price(); + price.value = priceInUsd; + price.persist(); + } + +} +---- + +[NOTE] +==== +There are 2 `@Blocking` annotations: + +1. `io.smallrye.reactive.messaging.annotations.Blocking` +2. `io.smallrye.common.annotation.Blocking` + +They have the same effect. +Thus, you can use both. +The first one provides more fine-grained tuning such as the worker pool to use and whether it preserves the order. +The second one, used also with other reactive features of Quarkus, uses the default worker pool and preserves the order. + +Detailed information on the usage of `@Blocking` annotation can be found in https://smallrye.io/smallrye-reactive-messaging/latest/concepts/blocking/[SmallRye Reactive Messaging – Handling blocking execution]. +==== + +[TIP] +.@RunOnVirtualThread +==== +For running the blocking processing on Java _virtual threads_, see the xref:messaging-virtual-threads.adoc[Quarkus Virtual Thread support with Reactive Messaging documentation]. +==== + +[TIP] +.@Transactional +==== +If your method is annotated with `@Transactional`, it will be considered _blocking_ automatically, even if the method is not annotated with `@Blocking`. +==== + === Pulsar Subscription Types Pulsar *subscriptionType* consumer configuration can be used flexibly to achieve different messaging scenarios, such as publish-subscribe or queuing. diff --git a/docs/src/main/asciidoc/rabbitmq-reference.adoc b/docs/src/main/asciidoc/rabbitmq-reference.adoc index e7bb380a417c4..105a674eb4940 100644 --- a/docs/src/main/asciidoc/rabbitmq-reference.adoc +++ b/docs/src/main/asciidoc/rabbitmq-reference.adoc @@ -335,6 +335,12 @@ The first one provides more fine-grained tuning such as the worker pool to use a The second one, used also with other reactive features of Quarkus, uses the default worker pool and preserves the order. ==== +[TIP] +.@RunOnVirtualThread +==== +For running the blocking processing on Java _virtual threads_, see the xref:messaging-virtual-threads.adoc[Quarkus Virtual Thread support with Reactive Messaging documentation]. +==== + == Customizing the underlying RabbitMQ client The connector uses the Vert.x RabbitMQ client underneath.