diff --git a/bom/application/pom.xml b/bom/application/pom.xml
index bf5e56a92afb9..a426e58b7dd06 100644
--- a/bom/application/pom.xml
+++ b/bom/application/pom.xml
@@ -52,7 +52,7 @@
1.0.13
2.7.0
2.26.0
- 3.18.0
+ 3.19.1
1.2.0
1.2.1
1.3.5
diff --git a/docs/src/main/asciidoc/kafka.adoc b/docs/src/main/asciidoc/kafka.adoc
index efb06fd119ec2..a8281d1280a9a 100644
--- a/docs/src/main/asciidoc/kafka.adoc
+++ b/docs/src/main/asciidoc/kafka.adoc
@@ -282,7 +282,7 @@ Thus, you can use both.
The first one provides more fine-grained tuning such as the worker pool to use and whether it preserves the order.
The second one, used also with other reactive features of Quarkus, uses the default worker pool and preserves the order.
-Detailed information on the usage of `@Blocking` annotation can be found in https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/3.1/advanced/blocking.html[SmallRye Reactive Messaging – Handling blocking execution].
+Detailed information on the usage of `@Blocking` annotation can be found in https://smallrye.io/smallrye-reactive-messaging/latest/concepts/blocking/[SmallRye Reactive Messaging – Handling blocking execution].
====
[TIP]
@@ -364,6 +364,9 @@ If high throughput is important for you, and you are not limited by the downstre
- or set `enable.auto.commit` to true and annotate the consuming method with `@Acknowledgment(Acknowledgment.Strategy.NONE)`.
====
+Smallrye Reactive Messaging enables implementing custom commit strategies.
+See https://smallrye.io/smallrye-reactive-messaging/latest/kafka/receiving-kafka-records/#acknowledgement[SmallRye Reactive Messaging documentation] for more information.
+
[[error-handling]]
=== Error Handling Strategies
@@ -389,6 +392,9 @@ The record written on the dead letter queue contains a set of additional headers
- *dead-letter-partition*: the original partition of the record (integer mapped to String)
- *dead-letter-offset*: the original offset of the record (long mapped to String)
+Smallrye Reactive Messaging enables implementing custom failure strategies.
+See https://smallrye.io/smallrye-reactive-messaging/latest/kafka/receiving-kafka-records/#acknowledgement[SmallRye Reactive Messaging documentation] for more information.
+
==== Retrying processing
You can combine Reactive Messaging with https://github.com/smallrye/smallrye-fault-tolerance[SmallRye Fault Tolerance], and retry processing if it failed:
@@ -926,7 +932,18 @@ The `io.smallrye.reactive.messaging.annotations.Emitter`, `io.smallrye.reactive.
The new `Emitter.send` method returns a `CompletionStage` completed when the produced message is acknowledged.
====
-More information on how to use `Emitter` can be found in https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/3.1/emitter/emitter.html#_emitter_and_channel[SmallRye Reactive Messaging – Emitters and Channels]
+[NOTE]
+.Depreciation
+====
+`MutinyEmitter#send(Message msg)` method is deprecated in favor of following methods receiving `Message` for emitting:
+
+* `> Uni sendMessage(M msg)`
+* `> void sendMessageAndAwait(M msg)`
+* `> Cancellable sendMessageAndForget(M msg)`
+
+====
+
+More information on how to use `Emitter` can be found in https://smallrye.io/smallrye-reactive-messaging/latest/concepts/emitter/[SmallRye Reactive Messaging – Emitters and Channels]
=== Write Acknowledgement
@@ -1254,11 +1271,6 @@ The `KafkaTransactions#withTransactionAndAck` method acks and nacks the message
Nacked messages will be handled by the failure strategy of the incoming channel, (see <>).
Configuring `failure-strategy=ignore` simply resets the Kafka consumer to the last committed offsets and resumes the consumption from there.
-[NOTE]
-====
-Redpanda does not yet support link:https://github.com/redpanda-data/redpanda/issues/3279[producer scalability for exactly-once processing].
-In order to use Kafka exactly-once processing with Quarkus you can configure Dev Services for Kafka to <>.
-====
[[kafka-bare-clients]]
== Accessing Kafka clients directly
@@ -1996,10 +2008,14 @@ As described in <>, you need to add the `@Blocking` annotat
See the xref:quarkus-reactive-architecture.adoc[Quarkus Reactive Architecture documentation] for further details on this topic.
+== Channel Decorators
+
+SmallRye Reactive Messaging supports decorating incoming and outgoing channels for implementing cross-cutting concerns such as monitoring, tracing or message interception. For more information on implementing decorators and message interceptors see the http://smallrye.io/smallrye-reactive-messaging/3.19.1/concepts/decorators/[SmallRye Reactive Messaging documentation].
+
[[kafka-configuration]]
== Configuration Reference
-More details about the SmallRye Reactive Messaging configuration can be found in the https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/3.1/kafka/kafka.html[SmallRye Reactive Messaging - Kafka Connector Documentation].
+More details about the SmallRye Reactive Messaging configuration can be found in the https://smallrye.io/smallrye-reactive-messaging/latest/kafka/kafka/#using-the-kafka-connector[SmallRye Reactive Messaging - Kafka Connector Documentation].
[TIP]
====
@@ -2336,7 +2352,7 @@ public class FruitStore {
Mutiny.Session session; // <1>
@Incoming("in")
- public Uni consume(Fruit fruit) {
+ public Uni consume(Fruit entity) {
return session.withTransaction(t -> { // <2>
return entity.persistAndFlush() // <3>
.replaceWithVoid(); // <4>
diff --git a/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/resources/application-secured.properties b/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/resources/application-secured.properties
index 93bedede04be4..8f2da7b189603 100644
--- a/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/resources/application-secured.properties
+++ b/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/resources/application-secured.properties
@@ -3,6 +3,7 @@ amqp-username=artemis
amqp-password=artemis
mp.messaging.outgoing.source.connector=smallrye-amqp
+mp.messaging.outgoing.source.durable=true
mp.messaging.incoming.in.connector=smallrye-amqp
mp.messaging.incoming.in.address=source
diff --git a/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/resources/application.properties b/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/resources/application.properties
index 5506833771e38..f6e591ab27f7b 100644
--- a/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/resources/application.properties
+++ b/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/resources/application.properties
@@ -1,6 +1,7 @@
amqp-port=5672
mp.messaging.outgoing.source.connector=smallrye-amqp
+mp.messaging.outgoing.source.durable=true
mp.messaging.incoming.in.connector=smallrye-amqp
mp.messaging.incoming.in.address=source
diff --git a/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties b/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties
index d4907bbe9f11b..a64c88f99975e 100644
--- a/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties
+++ b/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties
@@ -4,3 +4,5 @@ quarkus.log.category.\"org.apache.zookeeper\".level=WARN
# enable health check
quarkus.kafka.health.enabled=true
+
+quarkus.apicurio-registry.devservices.image-name=quay.io/apicurio/apicurio-registry-mem:2.2.5.Final
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index a408c41796ba6..99dbb209ec2b2 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -288,6 +288,7 @@
reactive-messaging-rabbitmq
reactive-messaging-rabbitmq-dyn
reactive-messaging-hibernate-reactive
+ reactive-messaging-hibernate-orm
rest-client
resteasy-reactive-kotlin
rest-client-reactive
diff --git a/integration-tests/reactive-messaging-hibernate-orm/pom.xml b/integration-tests/reactive-messaging-hibernate-orm/pom.xml
new file mode 100644
index 0000000000000..aa7cacfac2ba7
--- /dev/null
+++ b/integration-tests/reactive-messaging-hibernate-orm/pom.xml
@@ -0,0 +1,271 @@
+
+
+
+ quarkus-integration-tests-parent
+ io.quarkus
+ 999-SNAPSHOT
+
+ 4.0.0
+
+ quarkus-integration-test-reactive-messaging-hibernate-orm
+ Quarkus - Integration Tests - Reactive Messaging - Hibernate ORM
+ The Reactive Messaging with Kafka and Hibernate integration tests module
+
+
+ true
+
+
+
+
+ io.quarkus
+ quarkus-integration-test-class-transformer
+
+
+ io.quarkus
+ quarkus-integration-test-shared-library
+
+
+
+
+ io.quarkus
+ quarkus-resteasy-reactive
+
+
+ io.quarkus
+ quarkus-resteasy-reactive-jackson
+
+
+ io.vertx
+ vertx-core
+
+
+
+
+ io.quarkus
+ quarkus-smallrye-health
+
+
+
+
+ io.quarkus
+ quarkus-kafka-client
+
+
+ io.quarkus
+ quarkus-smallrye-reactive-messaging-kafka
+
+
+
+
+ io.quarkus
+ quarkus-hibernate-orm-panache
+
+
+ io.quarkus
+ quarkus-jdbc-postgresql
+
+
+
+
+
+
+ io.quarkus
+ quarkus-junit5
+ test
+
+
+ io.quarkus
+ quarkus-test-kafka-companion
+ test
+
+
+ io.rest-assured
+ rest-assured
+ test
+
+
+ jakarta.xml.bind
+ jakarta.xml.bind-api
+
+
+
+
+
+
+ io.quarkus
+ quarkus-integration-test-class-transformer-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-kafka-client-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-resteasy-reactive-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-resteasy-reactive-jackson-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-smallrye-health-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-smallrye-reactive-messaging-kafka-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-hibernate-orm-panache-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-jdbc-postgresql-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.smallrye.reactive
+ smallrye-reactive-messaging-api
+
+
+ org.awaitility
+ awaitility
+ test
+
+
+
+
+
+
+ io.quarkus
+ quarkus-maven-plugin
+
+
+
+ build
+ generate-code
+ generate-code-tests
+
+
+
+
+
+
+ maven-failsafe-plugin
+
+ true
+
+
+
+
+ maven-surefire-plugin
+
+ true
+
+
+
+
+
+
+
+ test-kafka
+
+
+ test-containers
+
+
+
+
+
+ maven-surefire-plugin
+
+ false
+
+
+
+ maven-failsafe-plugin
+
+ false
+
+
+
+
+
+
+
+
+
diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/Fruit.java b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/Fruit.java
new file mode 100644
index 0000000000000..da5f7a87c4829
--- /dev/null
+++ b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/Fruit.java
@@ -0,0 +1,19 @@
+package io.quarkus.it.kafka;
+
+import javax.persistence.Entity;
+
+import io.quarkus.hibernate.orm.panache.PanacheEntity;
+
+@Entity
+public class Fruit extends PanacheEntity {
+
+ public String name;
+
+ public Fruit(String name) {
+ this.name = name;
+ }
+
+ public Fruit() {
+ // Jackson will use this constructor
+ }
+}
diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/FruitProducer.java b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/FruitProducer.java
new file mode 100644
index 0000000000000..9921a1e2fd469
--- /dev/null
+++ b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/FruitProducer.java
@@ -0,0 +1,27 @@
+package io.quarkus.it.kafka;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.MediaType;
+
+import org.eclipse.microprofile.reactive.messaging.Channel;
+
+import io.smallrye.common.vertx.VertxContext;
+import io.smallrye.mutiny.Uni;
+import io.smallrye.reactive.messaging.MutinyEmitter;
+
+@Path("/kafka")
+public class FruitProducer {
+
+ @Channel("fruits-out")
+ MutinyEmitter emitter;
+
+ @POST
+ @Path("/fruits")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Uni post(Fruit fruit) {
+ assert VertxContext.isOnDuplicatedContext();
+ return emitter.send(fruit);
+ }
+}
diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/KafkaEndpoint.java b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/KafkaEndpoint.java
new file mode 100644
index 0000000000000..35f7b90ed512d
--- /dev/null
+++ b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/KafkaEndpoint.java
@@ -0,0 +1,23 @@
+package io.quarkus.it.kafka;
+
+import java.util.List;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+@Path("/kafka")
+public class KafkaEndpoint {
+ @Inject
+ KafkaReceivers receivers;
+
+ @GET
+ @Path("/fruits")
+ @Produces(MediaType.APPLICATION_JSON)
+ public List getFruits() {
+ return receivers.getFruits();
+ }
+
+}
diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/KafkaReceivers.java b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/KafkaReceivers.java
new file mode 100644
index 0000000000000..ad1ade1bff1d5
--- /dev/null
+++ b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/KafkaReceivers.java
@@ -0,0 +1,32 @@
+package io.quarkus.it.kafka;
+
+import java.util.List;
+import java.util.concurrent.CompletionStage;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.transaction.Transactional;
+
+import org.eclipse.microprofile.reactive.messaging.Channel;
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+import org.eclipse.microprofile.reactive.messaging.Message;
+
+import io.smallrye.reactive.messaging.MutinyEmitter;
+
+@ApplicationScoped
+public class KafkaReceivers {
+
+ @Channel("fruits-persisted")
+ MutinyEmitter emitter;
+
+ @Incoming("fruits-in")
+ @Transactional
+ public CompletionStage persist(Message fruit) {
+ fruit.getPayload().persist();
+ return emitter.sendMessage(fruit).subscribeAsCompletionStage();
+ }
+
+ public List getFruits() {
+ return Fruit.listAll();
+ }
+
+}
diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/resources/application.properties b/integration-tests/reactive-messaging-hibernate-orm/src/main/resources/application.properties
new file mode 100644
index 0000000000000..165aa8b197ca4
--- /dev/null
+++ b/integration-tests/reactive-messaging-hibernate-orm/src/main/resources/application.properties
@@ -0,0 +1,13 @@
+quarkus.log.category.kafka.level=WARN
+quarkus.log.category.\"org.apache.kafka\".level=WARN
+quarkus.log.category.\"org.apache.zookeeper\".level=WARN
+
+# enable health check
+quarkus.kafka.health.enabled=true
+
+kafka.auto.offset.reset=earliest
+
+quarkus.hibernate-orm.database.generation=drop-and-create
+
+mp.messaging.outgoing.fruits-out.topic=fruits
+mp.messaging.incoming.fruits-in.topic=fruits
diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/test/java/io/quarkus/it/kafka/KafkaConnectorIT.java b/integration-tests/reactive-messaging-hibernate-orm/src/test/java/io/quarkus/it/kafka/KafkaConnectorIT.java
new file mode 100644
index 0000000000000..21c6143667af1
--- /dev/null
+++ b/integration-tests/reactive-messaging-hibernate-orm/src/test/java/io/quarkus/it/kafka/KafkaConnectorIT.java
@@ -0,0 +1,8 @@
+package io.quarkus.it.kafka;
+
+import io.quarkus.test.junit.QuarkusIntegrationTest;
+
+@QuarkusIntegrationTest
+public class KafkaConnectorIT extends KafkaConnectorTest {
+
+}
diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java b/integration-tests/reactive-messaging-hibernate-orm/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java
new file mode 100644
index 0000000000000..5db5944fc335e
--- /dev/null
+++ b/integration-tests/reactive-messaging-hibernate-orm/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java
@@ -0,0 +1,60 @@
+package io.quarkus.it.kafka;
+
+import static io.restassured.RestAssured.get;
+import static io.restassured.RestAssured.given;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.is;
+
+import java.util.List;
+
+import javax.ws.rs.core.Response;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+import io.quarkus.test.kafka.InjectKafkaCompanion;
+import io.quarkus.test.kafka.KafkaCompanionResource;
+import io.restassured.common.mapper.TypeRef;
+import io.restassured.http.ContentType;
+import io.smallrye.reactive.messaging.kafka.companion.KafkaCompanion;
+
+@QuarkusTest
+@QuarkusTestResource(KafkaCompanionResource.class)
+public class KafkaConnectorTest {
+
+ protected static final TypeRef> TYPE_REF = new TypeRef>() {
+ };
+
+ @InjectKafkaCompanion
+ KafkaCompanion companion;
+
+ @Test
+ public void testFruits() {
+ given().body(new Fruit("apple")).contentType(ContentType.JSON).when().post("/kafka/fruits").then()
+ .assertThat().statusCode(is(Response.Status.NO_CONTENT.getStatusCode()));
+ given().body(new Fruit("banana")).contentType(ContentType.JSON).when().post("/kafka/fruits").then()
+ .assertThat().statusCode(is(Response.Status.NO_CONTENT.getStatusCode()));
+ given().body(new Fruit("peach")).contentType(ContentType.JSON).when().post("/kafka/fruits").then()
+ .assertThat().statusCode(is(Response.Status.NO_CONTENT.getStatusCode()));
+ given().body(new Fruit("orange")).contentType(ContentType.JSON).when().post("/kafka/fruits").then()
+ .assertThat().statusCode(is(Response.Status.NO_CONTENT.getStatusCode()));
+ given().body(new Fruit("cherry")).contentType(ContentType.JSON).when().post("/kafka/fruits").then()
+ .assertThat().statusCode(is(Response.Status.NO_CONTENT.getStatusCode()));
+ given().body(new Fruit("pear")).contentType(ContentType.JSON).when().post("/kafka/fruits").then()
+ .assertThat().statusCode(is(Response.Status.NO_CONTENT.getStatusCode()));
+
+ await().untilAsserted(() -> Assertions.assertEquals(6, get("/kafka/fruits").as(TYPE_REF).size()));
+
+ for (ConsumerRecord record : companion
+ .consumeWithDeserializers(new StringDeserializer(), new StringDeserializer())
+ .fromTopics("fruits-persisted", 6)
+ .awaitCompletion()) {
+ System.out.println(record);
+ }
+ }
+
+}
diff --git a/integration-tests/reactive-messaging-hibernate-reactive/src/main/java/io/quarkus/it/kafka/KafkaReceivers.java b/integration-tests/reactive-messaging-hibernate-reactive/src/main/java/io/quarkus/it/kafka/KafkaReceivers.java
index 2d5a5cd56f546..a52491f4ca07a 100644
--- a/integration-tests/reactive-messaging-hibernate-reactive/src/main/java/io/quarkus/it/kafka/KafkaReceivers.java
+++ b/integration-tests/reactive-messaging-hibernate-reactive/src/main/java/io/quarkus/it/kafka/KafkaReceivers.java
@@ -4,6 +4,7 @@
import java.util.Objects;
import javax.enterprise.context.ApplicationScoped;
+import javax.enterprise.context.control.ActivateRequestContext;
import javax.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Incoming;
@@ -11,6 +12,7 @@
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.hibernate.reactive.mutiny.Mutiny;
+import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.common.vertx.ContextLocals;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Uni;
@@ -38,10 +40,15 @@ public Uni> persistFruit(Message fruit) {
@Blocking
@Incoming("fruits-persisted")
+ @ActivateRequestContext
public Uni consumeFruit(Message fruit) {
assert VertxContext.isOnDuplicatedContext();
- assert Objects.equals(ContextLocals.get("fruit-id").get(), fruit.getPayload().id);
- return Uni.createFrom().completionStage(fruit.ack());
+ Fruit payload = fruit.getPayload();
+ assert Objects.equals(ContextLocals.get("fruit-id").get(), payload.id);
+ return Panache.withTransaction(() -> {
+ payload.name = "fruit-" + payload.name;
+ return payload.persist().chain(() -> Uni.createFrom().completionStage(fruit.ack()));
+ });
}
public Uni> getFruits() {