From 50c64fa55c61083e7dbcfd41ecc26d7924a1825b Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Wed, 25 Aug 2021 09:14:51 +0200 Subject: [PATCH] Add a bit more detail about the execution model of some reactive extensions --- docs/src/main/asciidoc/amqp-reference.adoc | 46 +++++++++++++++++++ docs/src/main/asciidoc/kafka.adoc | 13 +++++- docs/src/main/asciidoc/mailer-reference.adoc | 8 ++++ .../src/main/asciidoc/reactive-event-bus.adoc | 1 + docs/src/main/asciidoc/reactive-routes.adoc | 5 +- 5 files changed, 71 insertions(+), 2 deletions(-) diff --git a/docs/src/main/asciidoc/amqp-reference.adoc b/docs/src/main/asciidoc/amqp-reference.adoc index 8b9e811e79377..6a6789ca77080 100644 --- a/docs/src/main/asciidoc/amqp-reference.adoc +++ b/docs/src/main/asciidoc/amqp-reference.adoc @@ -347,6 +347,52 @@ mp.messaging.incoming.people-out.link-name=people More details about the AMQP Address model can be found in the https://activemq.apache.org/components/artemis/documentation/2.0.0/address-model.html[Artemis documentation]. +[#blocking-processing] +=== Execution model and Blocking processing + +Reactive Messaging invokes your method on an I/O thread. +See the xref:quarkus-reactive-architecture.adoc[Quarkus Reactive Architecture documentation] for further details on this topic. +But, you often need to combine Reactive Messaging with blocking processing such as database interactions. +For this, you need to use the `@Blocking` annotation indicating that the processing is _blocking_ and should not be run on the caller thread. + +For example, The following code illustrates how you can store incoming payloads to a database using Hibernate with Panache: + +[source,java] +---- +import io.smallrye.reactive.messaging.annotations.Blocking; +import org.eclipse.microprofile.reactive.messaging.Incoming; + +import javax.enterprise.context.ApplicationScoped; +import javax.transaction.Transactional; + +@ApplicationScoped +public class PriceStorage { + + @Incoming("prices") + @Blocking + @Transactional + public void store(int priceInUsd) { + Price price = new Price(); + price.value = priceInUsd; + price.persist(); + } + +} +---- + +[NOTE] +==== +There are 2 `@Blocking` annotations: + +1. `io.smallrye.reactive.messaging.annotations.Blocking` +2. `io.smallrye.common.annotation.Blocking` + +They have the same effect. +Thus, you can use both. +The first one provides more fine-grained tuning such as the worker pool to use and whether it preserves the order. +The second one, used also with other reactive features of Quarkus, uses the default worker pool and preserves the order. +==== + == Customizing the underlying AMQP client The connector uses the Vert.x AMQP client underneath. diff --git a/docs/src/main/asciidoc/kafka.adoc b/docs/src/main/asciidoc/kafka.adoc index 1dc076d579896..7cd1c46900127 100644 --- a/docs/src/main/asciidoc/kafka.adoc +++ b/docs/src/main/asciidoc/kafka.adoc @@ -213,9 +213,12 @@ Injecting `@Channel("prices")` or having `@Incoming("prices")` does not automati You need to configure an inbound connector with `mp.messaging.incoming.prices\...` or have an `@Outgoing("prices")` method somewhere in your application (in which case, `prices` will be an in-memory channel). ==== +[#blocking-processing] === Blocking processing -You often need to combine Reactive Messaging with blocking processing such as database interactions. +Reactive Messaging invokes your method on an I/O thread. +See the xref:quarkus-reactive-architecture.adoc[Quarkus Reactive Architecture documentation] for further details on this topic. +But, you often need to combine Reactive Messaging with blocking processing such as database interactions. For this, you need to use the `@Blocking` annotation indicating that the processing is _blocking_ and should not be run on the caller thread. For example, The following code illustrates how you can store incoming payloads to a database using Hibernate with Panache: @@ -1550,6 +1553,14 @@ You can enable this by adding the `quarkus-kubernetes-service-binding` extension When running in appropriately configured Kubernetes clusters, Kafka extension will pull its Kafka broker connection configuration from the service binding available inside the cluster, without the need for user configuration. +== Execution model + +Reactive Messaging invokes user's methods on an I/O thread. +Thus, by default, the methods must not block. +As described in <>, you need to add the `@Blocking` annotation on the method if this method will block the caller thread. + +See the xref:quarkus-reactive-architecture.adoc[Quarkus Reactive Architecture documentation] for further details on this topic. + [[kafka-configuration]] == Configuration Reference diff --git a/docs/src/main/asciidoc/mailer-reference.adoc b/docs/src/main/asciidoc/mailer-reference.adoc index 756886cbea1f4..d6e9af7836621 100644 --- a/docs/src/main/asciidoc/mailer-reference.adoc +++ b/docs/src/main/asciidoc/mailer-reference.adoc @@ -208,6 +208,14 @@ Otherwise, search for the template as the specified location. In this particular <3> Set the data used in the template. <4> `MailTemplate.send()` triggers the rendering and, once finished, sends the e-mail via a `Mailer` instance. +[[execution-model]] +== Execution model + +The reactive mailer is non-blocking, and the results are provided on an I/O thread. +See the xref:quarkus-reactive-architecture.adoc[Quarkus Reactive Architecture documentation] for further details on this topic. + +The non-reactive mailer blocks until the messages are sent to the SMTP server. +Note that does not mean that the message is delivered, just that it's been sent successfully to the SMTP server, which will be responsible for the delivery. [[testing]] == Testing email sending diff --git a/docs/src/main/asciidoc/reactive-event-bus.adoc b/docs/src/main/asciidoc/reactive-event-bus.adoc index dccf8de557465..b0e96e7f9f37b 100644 --- a/docs/src/main/asciidoc/reactive-event-bus.adoc +++ b/docs/src/main/asciidoc/reactive-event-bus.adoc @@ -101,6 +101,7 @@ void consumeBlocking(String message) { ---- When using `@Blocking`, it ignores the value of the `blocking` attribute of `@ConsumeEvent`. +See the xref:quarkus-reactive-architecture.adoc[Quarkus Reactive Architecture documentation] for further details on this topic. ==== Asynchronous processing is also possible by returning either an `io.smallrye.mutiny.Uni` or a `java.util.concurrent.CompletionStage`: diff --git a/docs/src/main/asciidoc/reactive-routes.adoc b/docs/src/main/asciidoc/reactive-routes.adoc index 8a3fb55c23474..d3af01c7f3350 100644 --- a/docs/src/main/asciidoc/reactive-routes.adoc +++ b/docs/src/main/asciidoc/reactive-routes.adoc @@ -18,12 +18,15 @@ The code presented in this guide is available in this {quickstarts-base-url}[Git Before going further, let's have a look at the HTTP layer of Quarkus. Quarkus HTTP support is based on a non-blocking and reactive engine (Eclipse Vert.x and Netty). -All the HTTP requests your application receive are handled by _event loops_ (IO Thread) and then are routed towards the code that manages the request. +All the HTTP requests your application receive are handled by _event loops_ (I/O Thread) and then are routed towards the code that manages the request. Depending on the destination, it can invoke the code managing the request on a worker thread (Servlet, Jax-RS) or use the IO Thread (reactive route). Note that because of this, a reactive route must be non-blocking or explicitly declare its blocking nature (which would result by being called on a worker thread). image:http-architecture.png[alt=Quarkus HTTP Architecture] +See the xref:quarkus-reactive-architecture.adoc[Quarkus Reactive Architecture documentation] for further details on this topic. + + == Declaring reactive routes The first way to use reactive routes is to use the `@Route` annotation.