diff --git a/bom/application/pom.xml b/bom/application/pom.xml
index dca533ccc6524..2b011810cc358 100644
--- a/bom/application/pom.xml
+++ b/bom/application/pom.xml
@@ -61,8 +61,8 @@
2.1.0
1.0.13
3.0.1
- 3.11.0
- 4.19.0
+ 3.12.0
+ 4.20.0
2.6.0
2.1.3
2.1.1
@@ -148,7 +148,7 @@
1.16.0
1.0.4
3.5.3.Final
- 2.5.8
+ 2.6.0
3.7.0
1.8.0
1.1.10.5
@@ -224,7 +224,7 @@
0.1.3
2.12.0
0.8.11
- 1.0.0
+ 1.1.0
3.0.0
2.12.3
diff --git a/docs/src/main/asciidoc/kafka.adoc b/docs/src/main/asciidoc/kafka.adoc
index 60509c7c861b3..a9d105aa244ab 100644
--- a/docs/src/main/asciidoc/kafka.adoc
+++ b/docs/src/main/asciidoc/kafka.adoc
@@ -615,9 +615,10 @@ public class KafkaRebalancedConsumerRebalanceListener implements KafkaConsumerRe
----
package inbound;
-import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
+import org.eclipse.microprofile.reactive.messaging.Message;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
@@ -628,7 +629,7 @@ public class KafkaRebalancedConsumer {
@Incoming("rebalanced-example")
@Acknowledgment(Acknowledgment.Strategy.NONE)
- public CompletionStage consume(IncomingKafkaRecord message) {
+ public CompletionStage consume(Message> message) {
// We don't need to ACK messages because in this example,
// we set offset during consumer rebalance
return CompletableFuture.completedFuture(null);
@@ -715,14 +716,14 @@ public void consume(List prices) {
}
----
-The incoming method can also receive `Message>`, `KafkaRecordBatch`, and `ConsumerRecords` types.
+The incoming method can also receive `Message>`, `Message>`, and `ConsumerRecords` types.
They give access to record details such as offset or timestamp:
[source, java]
----
@Incoming("prices")
-public CompletionStage consumeMessage(KafkaRecordBatch records) {
- for (KafkaRecord record : records) {
+public CompletionStage consumeMessage(Message> records) {
+ for (ConsumerRecord record : records.getPayload()) {
String payload = record.getPayload();
String topic = record.getTopic();
// process messages
@@ -1540,13 +1541,15 @@ The following example includes a batch of Kafka records inside a transaction.
----
import jakarta.enterprise.context.ApplicationScoped;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
+import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
-import io.smallrye.reactive.messaging.kafka.KafkaRecordBatch;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
@ApplicationScoped
@@ -1557,10 +1560,10 @@ public class KafkaExactlyOnceProcessor {
KafkaTransactions txProducer;
@Incoming("prices-in")
- public Uni emitInTransaction(KafkaRecordBatch batch) { // <1>
+ public Uni emitInTransaction(Message> batch) { // <1>
return txProducer.withTransactionAndAck(batch, emitter -> { // <2>
- for (KafkaRecord record : batch) {
- emitter.send(KafkaRecord.of(record.getKey(), record.getPayload() + 1)); // <3>
+ for (ConsumerRecord record : batch.getPayload()) {
+ emitter.send(KafkaRecord.of(record.key(), record.value() + 1)); // <3>
}
return Uni.createFrom().voidItem();
});
@@ -1571,7 +1574,7 @@ public class KafkaExactlyOnceProcessor {
<1> It is recommended to use exactly-once processing along with the batch consumption mode.
While it is possible to use it with a single Kafka message, it'll have a significant performance impact.
-<2> The consumed `KafkaRecordBatch` message is passed to the `KafkaTransactions#withTransactionAndAck` in order to handle the offset commits and message acks.
+<2> The consumed message is passed to the `KafkaTransactions#withTransactionAndAck` in order to handle the offset commits and message acks.
<3> The `send` method writes records to Kafka inside the transaction, without waiting for send receipt from the broker.
Messages pending to be written to Kafka will be buffered, and flushed before committing the transaction.
It is therefore recommended configuring the `@OnOverflow` `bufferSize` in order to fit enough messages, for example the `max.poll.records`, maximum amount of records returned in a batch.
@@ -1908,7 +1911,7 @@ Similarly, if you declare
[source,java]
----
@Incoming("my-kafka-records")
-public void consume(KafkaRecord record) {
+public void consume(Record record) {
...
}
----
diff --git a/docs/src/main/asciidoc/pulsar.adoc b/docs/src/main/asciidoc/pulsar.adoc
index 5ebbe6c71f993..eb2e86d2ad85a 100644
--- a/docs/src/main/asciidoc/pulsar.adoc
+++ b/docs/src/main/asciidoc/pulsar.adoc
@@ -416,22 +416,20 @@ You can enable batch mode using `batchReceive=true` property, or setting a `batc
[source, java]
----
@Incoming("prices")
-public CompletionStage consumeMessage(PulsarIncomingBatchMessage messages) {
- for (PulsarMessage msg : messages) {
- msg.getMetadata(PulsarIncomingMessageMetadata.class).ifPresent(metadata -> {
- String key = metadata.getKey();
- String topic = metadata.getTopicName();
- long timestamp = metadata.getEventTime();
- //... process messages
- });
+public CompletionStage consumeMessage(Message> messages) {
+ for (org.apache.pulsar.client.api.Message msg : messages.getPayload()) {
+ String key = msg.getKey();
+ String topic = msg.getTopicName();
+ long timestamp = msg.getEventTime();
+ //... process messages
}
// ack will commit the latest offsets (per partition) of the batch.
return messages.ack();
}
@Incoming("prices")
-public void consumeRecords(Messages messages) {
- for (Message msg : messages) {
+public void consumeRecords(org.apache.pulsar.client.api.Messages messages) {
+ for (org.apache.pulsar.client.api.Message msg : messages) {
//... process messages
}
}
@@ -457,7 +455,7 @@ You can configure batch mode explicitly with `mp.messaging.incoming.$channel.bat
== Sending messages to Pulsar
-The Pulsar Connector can write Reactive Messaging Messages as Pulsar Message.
+The Pulsar Connector can write Reactive Messaging `Message`s as Pulsar Message.
=== Example
@@ -739,32 +737,32 @@ package pulsar.outbound;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
+import org.apache.pulsar.client.api.Messages;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
+import org.eclipse.microprofile.reactive.messaging.Message;
import io.smallrye.mutiny.Uni;
-import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessage;
-import io.smallrye.reactive.messaging.pulsar.PulsarMessage;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;
-@ApplicationScoped
-public class PulsarExactlyOnceProcessor {
+ @ApplicationScoped
+ public class PulsarExactlyOnceProcessor {
- @Inject
- @Channel("tx-out-example")
- PulsarTransactions txProducer;
-
- @Incoming("in-channel")
- public Uni emitInTransaction(PulsarIncomingBatchMessage batch) {
- return txProducer.withTransactionAndAck(batch, emitter -> {
- for (PulsarMessage record : batch) {
- emitter.send(PulsarMessage.of(record.getPayload() + 1, record.getKey()));
- }
- return Uni.createFrom().voidItem();
- });
- }
+ @Inject
+ @Channel("tx-out-example")
+ PulsarTransactions txProducer;
-}
+ @Incoming("in-channel")
+ public Uni emitInTransaction(Message> batch) {
+ return txProducer.withTransactionAndAck(batch, emitter -> {
+ for (org.apache.pulsar.client.api.Message record : batch.getPayload()) {
+ emitter.send(PulsarMessage.of(record.getValue() + 1, record.getKey()));
+ }
+ return Uni.createFrom().voidItem();
+ });
+ }
+
+ }
----
If the processing completes successfully, the message is acknowledged inside the transaction and the transaction is committed.
diff --git a/extensions/mutiny/deployment/src/main/java/io/quarkus/mutiny/deployment/MutinyProcessor.java b/extensions/mutiny/deployment/src/main/java/io/quarkus/mutiny/deployment/MutinyProcessor.java
index 0802ea01dc861..b31d195875253 100644
--- a/extensions/mutiny/deployment/src/main/java/io/quarkus/mutiny/deployment/MutinyProcessor.java
+++ b/extensions/mutiny/deployment/src/main/java/io/quarkus/mutiny/deployment/MutinyProcessor.java
@@ -1,5 +1,6 @@
package io.quarkus.mutiny.deployment;
+import java.util.List;
import java.util.Optional;
import org.jboss.threads.ContextHandler;
@@ -10,6 +11,7 @@
import io.quarkus.deployment.builditem.ContextHandlerBuildItem;
import io.quarkus.deployment.builditem.ExecutorBuildItem;
import io.quarkus.deployment.builditem.ShutdownContextBuildItem;
+import io.quarkus.deployment.builditem.nativeimage.UnsafeAccessedFieldBuildItem;
import io.quarkus.mutiny.runtime.MutinyInfrastructure;
public class MutinyProcessor {
@@ -31,4 +33,12 @@ public void buildTimeInit(MutinyInfrastructure recorder) {
recorder.configureThreadBlockingChecker();
recorder.configureOperatorLogger();
}
+
+ @BuildStep
+ public List jctoolsUnsafeAccessedFields() {
+ return List.of(
+ new UnsafeAccessedFieldBuildItem(
+ "org.jctools.util.UnsafeRefArrayAccess",
+ "REF_ELEMENT_SHIFT"));
+ }
}
diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java
index 3153a56e7e55b..5dd492dbbcb1f 100644
--- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java
+++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java
@@ -19,6 +19,9 @@ final class DotNames {
static final DotName TARGETED_MESSAGES = DotName.createSimple(io.smallrye.reactive.messaging.TargetedMessages.class.getName());
static final DotName MESSAGE = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Message.class.getName());
+
+ static final DotName GENERIC_PAYLOAD = DotName.createSimple(io.smallrye.reactive.messaging.GenericPayload.class.getName());
+
static final DotName KAFKA_RECORD = DotName.createSimple(io.smallrye.reactive.messaging.kafka.KafkaRecord.class.getName());
static final DotName RECORD = DotName.createSimple(io.smallrye.reactive.messaging.kafka.Record.class.getName());
static final DotName CONSUMER_RECORD = DotName.createSimple(org.apache.kafka.clients.consumer.ConsumerRecord.class.getName());
@@ -30,6 +33,7 @@ final class DotNames {
static final DotName SUBSCRIBER = DotName.createSimple(org.reactivestreams.Subscriber.class.getName());
static final DotName SUBSCRIBER_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder.class.getName());
static final DotName PUBLISHER = DotName.createSimple(org.reactivestreams.Publisher.class.getName());
+ static final DotName FLOW_PUBLISHER = DotName.createSimple(java.util.concurrent.Flow.Publisher.class.getName());
static final DotName PUBLISHER_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder.class.getName());
static final DotName PROCESSOR = DotName.createSimple(org.reactivestreams.Processor.class.getName());
static final DotName PROCESSOR_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder.class.getName());
diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java
index df4f15d53c438..16aefe55c77b4 100644
--- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java
+++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java
@@ -420,6 +420,7 @@ private Type getIncomingTypeFromMethod(MethodInfo method) {
if ((isCompletionStage(returnType) && parametersCount >= 1)
|| (isUni(returnType) && parametersCount >= 1)
|| (isPublisher(returnType) && parametersCount == 1)
+ || (isFlowPublisher(returnType) && parametersCount == 1)
|| (isPublisherBuilder(returnType) && parametersCount == 1)
|| (isMulti(returnType) && parametersCount == 1)) {
incomingType = parameterTypes.get(0);
@@ -433,8 +434,11 @@ private Type getIncomingTypeFromMethod(MethodInfo method) {
}
// @Incoming @Outgoing stream manipulation
- if (incomingType != null
- && (isPublisher(incomingType) || isPublisherBuilder(incomingType) || isMulti(incomingType))) {
+ if (incomingType != null &&
+ (isPublisher(incomingType)
+ || isFlowPublisher(incomingType)
+ || isPublisherBuilder(incomingType)
+ || isMulti(incomingType))) {
incomingType = incomingType.asParameterizedType().arguments().get(0);
}
}
@@ -446,7 +450,10 @@ private Type getIncomingTypeFromChannelInjectionPoint(Type injectionPointType) {
return null;
}
- if (isPublisher(injectionPointType) || isPublisherBuilder(injectionPointType) || isMulti(injectionPointType)) {
+ if (isPublisher(injectionPointType)
+ || isPublisherBuilder(injectionPointType)
+ || isFlowPublisher(injectionPointType)
+ || isMulti(injectionPointType)) {
return injectionPointType.asParameterizedType().arguments().get(0);
} else {
return null;
@@ -462,6 +469,7 @@ private Type getOutgoingTypeFromMethod(MethodInfo method) {
// @Outgoing
if ((isPublisher(returnType) && parametersCount == 0)
+ || (isFlowPublisher(returnType) && parametersCount == 0)
|| (isPublisherBuilder(returnType) && parametersCount == 0)
|| (isMulti(returnType) && parametersCount == 0)
|| (isMultiSplitter(returnType) && parametersCount == 0)
@@ -476,11 +484,11 @@ private Type getOutgoingTypeFromMethod(MethodInfo method) {
// @Incoming @Outgoing
if (method.hasAnnotation(DotNames.INCOMING) || method.hasAnnotation(DotNames.INCOMINGS)) {
- if ((isCompletionStage(returnType) && parametersCount == 1)
- || (isUni(returnType) && parametersCount == 1)
- || (isPublisher(returnType) && parametersCount == 1)
+ if (isCompletionStage(returnType) || isUni(returnType) || isMulti(returnType)) {
+ outgoingType = returnType.asParameterizedType().arguments().get(0);
+ } else if ((isPublisher(returnType) && parametersCount == 1)
+ || (isFlowPublisher(returnType) && parametersCount == 1)
|| (isPublisherBuilder(returnType) && parametersCount == 1)
- || (isMulti(returnType) && parametersCount == 1)
|| (isMultiSplitter(returnType) && parametersCount == 1)) {
outgoingType = returnType.asParameterizedType().arguments().get(0);
} else if ((isProcessor(returnType) && parametersCount == 0)
@@ -494,7 +502,10 @@ private Type getOutgoingTypeFromMethod(MethodInfo method) {
// @Incoming @Outgoing stream manipulation
if (outgoingType != null
- && (isPublisher(outgoingType) || isPublisherBuilder(outgoingType) || isMulti(outgoingType))) {
+ && (isPublisher(outgoingType)
+ || isFlowPublisher(outgoingType)
+ || isPublisherBuilder(outgoingType)
+ || isMulti(outgoingType))) {
outgoingType = outgoingType.asParameterizedType().arguments().get(0);
}
}
@@ -548,6 +559,11 @@ private void extractKeyValueType(Type type, TriConsumer key
return;
}
+ if (isGenericPayload(type)) {
+ extractKeyValueType(type.asParameterizedType().arguments().get(0), keyValueTypeAcceptor);
+ return;
+ }
+
if (isMessage(type)) {
List typeArguments = type.asParameterizedType().arguments();
Type messageTypeParameter = typeArguments.get(0);
@@ -627,6 +643,13 @@ private static boolean isPublisher(Type type) {
&& type.asParameterizedType().arguments().size() == 1;
}
+ private static boolean isFlowPublisher(Type type) {
+ // raw type Flow.Publisher is wrong, must be Flow.Publisher
+ return DotNames.FLOW_PUBLISHER.equals(type.name())
+ && type.kind() == Type.Kind.PARAMETERIZED_TYPE
+ && type.asParameterizedType().arguments().size() == 1;
+ }
+
private static boolean isPublisherBuilder(Type type) {
// raw type PublisherBuilder is wrong, must be PublisherBuilder
return DotNames.PUBLISHER_BUILDER.equals(type.name())
@@ -687,6 +710,13 @@ private static boolean isMessage(Type type) {
&& type.asParameterizedType().arguments().size() == 1;
}
+ private static boolean isGenericPayload(Type type) {
+ // raw type Message is wrong, must be Message
+ return DotNames.GENERIC_PAYLOAD.equals(type.name())
+ && type.kind() == Type.Kind.PARAMETERIZED_TYPE
+ && type.asParameterizedType().arguments().size() == 1;
+ }
+
private static boolean isKafkaRecord(Type type) {
// raw type KafkaRecord is wrong, must be KafkaRecord
return DotNames.KAFKA_RECORD.equals(type.name())
diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java
index 7cebd95da8802..c7cf0b045844b 100644
--- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java
+++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java
@@ -53,6 +53,7 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.split.MultiSplitter;
+import io.smallrye.reactive.messaging.GenericPayload;
import io.smallrye.reactive.messaging.MutinyEmitter;
import io.smallrye.reactive.messaging.Targeted;
import io.smallrye.reactive.messaging.TargetedMessages;
@@ -2960,5 +2961,43 @@ private static class RequestReply {
}
+ @Test
+ void kafkaGenericPayload() {
+ Tuple[] expectations = {
+ tuple("mp.messaging.incoming.channel1.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"),
+ tuple("mp.messaging.outgoing.out1.key.serializer", "org.apache.kafka.common.serialization.LongSerializer"),
+ tuple("mp.messaging.outgoing.out1.value.serializer", "io.quarkus.kafka.client.serialization.JsonObjectSerializer"),
+ tuple("mp.messaging.incoming.channel2.key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer"),
+ tuple("mp.messaging.incoming.channel2.value.deserializer", "io.quarkus.kafka.client.serialization.JsonObjectDeserializer"),
+ tuple("mp.messaging.outgoing.channel3.value.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"),
+ tuple("mp.messaging.outgoing.channel4.key.serializer", "org.apache.kafka.common.serialization.StringSerializer"),
+ tuple("mp.messaging.outgoing.channel4.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"),
+ };
+ doTest(expectations, GenericPayloadProducer.class);
+ }
+
+ private static class GenericPayloadProducer {
+ @Incoming("channel1")
+ @Outgoing("out1")
+ GenericPayload> method1(String msg) {
+ return null;
+ }
+
+ @Incoming("channel2")
+ void method2(GenericPayload> msg) {
+ }
+
+ @Outgoing("channel3")
+ GenericPayload method3() {
+ return null;
+ }
+
+
+ @Outgoing("channel4")
+ Multi>> method4() {
+ return null;
+ }
+ }
+
}
diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DotNames.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DotNames.java
index 6c0cbde2047e9..dc321cb954ec5 100644
--- a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DotNames.java
+++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DotNames.java
@@ -18,6 +18,8 @@ final class DotNames {
static final DotName TARGETED_MESSAGES = DotName.createSimple(io.smallrye.reactive.messaging.TargetedMessages.class.getName());
static final DotName MESSAGE = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Message.class.getName());
+
+ static final DotName GENERIC_PAYLOAD = DotName.createSimple(io.smallrye.reactive.messaging.GenericPayload.class.getName());
static final DotName PULSAR_MESSAGE = DotName.createSimple(io.smallrye.reactive.messaging.pulsar.PulsarMessage.class.getName());
static final DotName PULSAR_BATCH_MESSAGE = DotName.createSimple(io.smallrye.reactive.messaging.pulsar.PulsarBatchMessage.class.getName());
static final DotName PULSAR_API_MESSAGE = DotName.createSimple(org.apache.pulsar.client.api.Message.class.getName());
@@ -35,6 +37,8 @@ final class DotNames {
static final DotName PROCESSOR_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder.class.getName());
static final DotName FLOW_PUBLISHER = DotName.createSimple(java.util.concurrent.Flow.Publisher.class.getName());
static final DotName MULTI = DotName.createSimple(io.smallrye.mutiny.Multi.class.getName());
+ static final DotName MULTI_SPLITTER = DotName.createSimple(io.smallrye.mutiny.operators.multi.split.MultiSplitter.class.getName());
+
static final DotName PULSAR_GENERIC_RECORD = DotName.createSimple(org.apache.pulsar.client.api.schema.GenericRecord.class.getName());
static final DotName AVRO_GENERATED = DotName.createSimple("org.apache.avro.specific.AvroGenerated");
diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarSchemaDiscoveryProcessor.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarSchemaDiscoveryProcessor.java
index 78ada8473793a..3f88ab082a283 100644
--- a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarSchemaDiscoveryProcessor.java
+++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarSchemaDiscoveryProcessor.java
@@ -252,12 +252,12 @@ private Type getOutgoingTypeFromMethod(MethodInfo method) {
// @Incoming @Outgoing
if (method.hasAnnotation(DotNames.INCOMING) || method.hasAnnotation(DotNames.INCOMINGS)) {
- if ((isCompletionStage(returnType) && parametersCount == 1)
- || (isUni(returnType) && parametersCount == 1)
- || (isPublisher(returnType) && parametersCount == 1)
+ if (isCompletionStage(returnType) || isUni(returnType) || isMulti(returnType)) {
+ outgoingType = returnType.asParameterizedType().arguments().get(0);
+ } else if ((isPublisher(returnType) && parametersCount == 1)
|| (isPublisherBuilder(returnType) && parametersCount == 1)
|| (isFlowPublisher(returnType) && parametersCount == 1)
- || (isMulti(returnType) && parametersCount == 1)) {
+ || (isMultiSplitter(returnType) && parametersCount == 1)) {
outgoingType = returnType.asParameterizedType().arguments().get(0);
} else if ((isProcessor(returnType) && parametersCount == 0)
|| (isProcessorBuilder(returnType) && parametersCount == 0)) {
@@ -328,6 +328,11 @@ private void extractValueType(Type type, BiConsumer schemaAccepto
return;
}
+ if (isGenericPayload(type)) {
+ extractValueType(type.asParameterizedType().arguments().get(0), schemaAcceptor);
+ return;
+ }
+
if (isMessage(type)) {
List typeArguments = type.asParameterizedType().arguments();
Type messageTypeParameter = typeArguments.get(0);
@@ -381,6 +386,13 @@ private static boolean isMulti(Type type) {
&& type.asParameterizedType().arguments().size() == 1;
}
+ private static boolean isMultiSplitter(Type type) {
+ // raw type MultiSplitter is wrong, must be MultiSplitter
+ return DotNames.MULTI_SPLITTER.equals(type.name())
+ && type.kind() == Type.Kind.PARAMETERIZED_TYPE
+ && type.asParameterizedType().arguments().size() == 2;
+ }
+
private static boolean isTargeted(Type type) {
return DotNames.TARGETED.equals(type.name())
|| DotNames.TARGETED_MESSAGES.equals(type.name());
@@ -467,6 +479,13 @@ private static boolean isMessage(Type type) {
&& type.asParameterizedType().arguments().size() == 1;
}
+ private static boolean isGenericPayload(Type type) {
+ // raw type Message is wrong, must be Message
+ return DotNames.GENERIC_PAYLOAD.equals(type.name())
+ && type.kind() == Type.Kind.PARAMETERIZED_TYPE
+ && type.asParameterizedType().arguments().size() == 1;
+ }
+
private static boolean isPulsarMessage(Type type) {
// raw type PulsarMessage is wrong, must be PulsarMessage
return DotNames.PULSAR_MESSAGE.equals(type.name())
diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DefaultSchemaConfigTest.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DefaultSchemaConfigTest.java
index 5f8fea4d404e5..0910e90ddd70b 100644
--- a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DefaultSchemaConfigTest.java
+++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DefaultSchemaConfigTest.java
@@ -53,6 +53,7 @@
import io.smallrye.config.common.MapBackedConfigSource;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
+import io.smallrye.reactive.messaging.GenericPayload;
import io.smallrye.reactive.messaging.MutinyEmitter;
import io.smallrye.reactive.messaging.Targeted;
import io.smallrye.reactive.messaging.TargetedMessages;
@@ -2070,4 +2071,40 @@ TargetedMessages method2(String msg) {
}
+ @Test
+ void pulsarGenericPayload() {
+ Tuple[] expectations = {
+ tuple("mp.messaging.incoming.channel1.schema", "STRING"),
+ tuple("mp.messaging.outgoing.out1.schema", "JsonObjectJSON_OBJECTSchema"),
+ tuple("mp.messaging.incoming.channel2.schema", "STRING"),
+ tuple("mp.messaging.outgoing.channel3.schema", "INT32"),
+ tuple("mp.messaging.outgoing.channel4.schema", "INT64"),
+ };
+ var generatedSchemas = Map.of("io.vertx.core.json.JsonObject", "JsonObjectJSON_OBJECTSchema");
+ doTest(expectations, generatedSchemas, GenericPayloadProducer.class);
+ }
+
+ private static class GenericPayloadProducer {
+ @Incoming("channel1")
+ @Outgoing("out1")
+ GenericPayload method1(String msg) {
+ return null;
+ }
+
+ @Incoming("channel2")
+ void method2(GenericPayload msg) {
+ }
+
+ @Outgoing("channel3")
+ GenericPayload method3() {
+ return null;
+ }
+
+ @Outgoing("channel4")
+ Multi> method4() {
+ return null;
+ }
+ }
+
+
}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java
index 53834d1195f70..fa8858dca6380 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java
@@ -8,6 +8,7 @@
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.INCOMINGS;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.KOTLIN_UNIT;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.MERGE;
+import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.NON_BLOCKING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.OUTGOING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.OUTGOINGS;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.RUN_ON_VIRTUAL_THREAD;
@@ -36,6 +37,7 @@
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusMediatorConfiguration;
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusParameterDescriptor;
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusWorkerPoolRegistry;
+import io.quarkus.smallrye.reactivemessaging.runtime.ReactiveMessagingConfiguration;
import io.quarkus.smallrye.reactivemessaging.runtime.TypeInfo;
import io.smallrye.reactive.messaging.Shape;
import io.smallrye.reactive.messaging.annotations.Blocking;
@@ -50,7 +52,7 @@ private QuarkusMediatorConfigurationUtil() {
public static QuarkusMediatorConfiguration create(MethodInfo methodInfo, boolean isSuspendMethod, BeanInfo bean,
RecorderContext recorderContext,
- ClassLoader cl, boolean strict) {
+ ClassLoader cl, boolean strict, ReactiveMessagingConfiguration.ExecutionMode executionMode) {
Class[] parameterTypeClasses;
Class> returnTypeClass;
@@ -180,6 +182,22 @@ public Integer get() {
}
}));
configuration.setHasTargetedOutput(mediatorConfigurationSupport.processTargetedOutput());
+ if (!hasBlockingAnnotation(methodInfo)
+ && !hasNonBlockingAnnotation(methodInfo)
+ && hasBlockingPayloadSignature(methodInfo)) {
+ switch (executionMode) {
+ case WORKER:
+ configuration.setBlocking(true);
+ configuration.setBlockingExecutionOrdered(true);
+ break;
+ case VIRTUAL_THREAD:
+ configuration.setBlocking(true);
+ configuration.setWorkerPoolName(QuarkusWorkerPoolRegistry.DEFAULT_VIRTUAL_THREAD_WORKER);
+ break;
+ default:
+ break;
+ }
+ }
AnnotationInstance blockingAnnotation = methodInfo.annotation(BLOCKING);
AnnotationInstance smallryeBlockingAnnotation = methodInfo.annotation(SMALLRYE_BLOCKING);
@@ -452,4 +470,21 @@ private static Type getGenericParameterType(MethodInfo method, int paramIndex) {
return parameters.get(paramIndex);
}
}
+
+ private static boolean hasNonBlockingAnnotation(MethodInfo method) {
+ return method.hasAnnotation(NON_BLOCKING);
+ }
+
+ public static boolean hasBlockingAnnotation(MethodInfo method) {
+ return method.hasAnnotation(BLOCKING)
+ || method.hasAnnotation(SMALLRYE_BLOCKING)
+ || method.hasAnnotation(RUN_ON_VIRTUAL_THREAD)
+ || method.hasAnnotation(TRANSACTIONAL);
+ }
+
+ private static boolean hasBlockingPayloadSignature(MethodInfo methodInfo) {
+ return !ReactiveMessagingDotNames.UNI.equals(methodInfo.returnType().name())
+ && !ReactiveMessagingDotNames.MULTI.equals(methodInfo.returnType().name())
+ && !ReactiveMessagingDotNames.COMPLETION_STAGE.equals(methodInfo.returnType().name());
+ }
}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java
index e78713e715f9b..1871c2a0575c4 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java
@@ -10,7 +10,10 @@
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.jboss.jandex.DotName;
+import io.smallrye.common.annotation.NonBlocking;
import io.smallrye.common.annotation.RunOnVirtualThread;
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.MessageConverter;
import io.smallrye.reactive.messaging.MutinyEmitter;
import io.smallrye.reactive.messaging.annotations.Blocking;
@@ -46,6 +49,7 @@ public final class ReactiveMessagingDotNames {
static final DotName CONNECTOR_ATTRIBUTE = DotName.createSimple(ConnectorAttribute.class.getName());
static final DotName BLOCKING = DotName.createSimple(Blocking.class.getName());
+ static final DotName NON_BLOCKING = DotName.createSimple(NonBlocking.class.getName());
public static final DotName CHANNEL = DotName
.createSimple(org.eclipse.microprofile.reactive.messaging.Channel.class.getName());
public static final DotName LEGACY_CHANNEL = DotName.createSimple(Channel.class.getName());
@@ -94,6 +98,9 @@ public final class ReactiveMessagingDotNames {
static final DotName TRANSACTIONAL = DotName.createSimple("jakarta.transaction.Transactional");
static final DotName RUN_ON_VIRTUAL_THREAD = DotName.createSimple(RunOnVirtualThread.class.getName());
+ static final DotName UNI = DotName.createSimple(Uni.class.getName());
+ static final DotName MULTI = DotName.createSimple(Multi.class.getName());
+
private ReactiveMessagingDotNames() {
}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java
index 44e65ce0f8ef8..eb99ec0a2f816 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java
@@ -1,10 +1,9 @@
package io.quarkus.smallrye.reactivemessaging.deployment;
import static io.quarkus.deployment.annotations.ExecutionTime.STATIC_INIT;
-import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.BLOCKING;
+import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.INCOMING;
+import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.INCOMINGS;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.RUN_ON_VIRTUAL_THREAD;
-import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.SMALLRYE_BLOCKING;
-import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.TRANSACTIONAL;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
@@ -13,6 +12,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Vetoed;
@@ -64,6 +65,8 @@
import io.quarkus.runtime.metrics.MetricsFactory;
import io.quarkus.runtime.util.HashUtil;
import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem;
+import io.quarkus.smallrye.reactivemessaging.deployment.items.ChannelBuildItem;
+import io.quarkus.smallrye.reactivemessaging.deployment.items.ChannelDirection;
import io.quarkus.smallrye.reactivemessaging.deployment.items.InjectedChannelBuildItem;
import io.quarkus.smallrye.reactivemessaging.deployment.items.InjectedEmitterBuildItem;
import io.quarkus.smallrye.reactivemessaging.deployment.items.MediatorBuildItem;
@@ -230,6 +233,7 @@ public void enableHealth(ReactiveMessagingBuildTimeConfig buildTimeConfig,
public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext recorderContext,
BuildProducer syntheticBeans,
List mediatorMethods,
+ List channelBuildItems,
List emitterFields,
List channelFields,
BuildProducer generatedClass,
@@ -239,6 +243,11 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re
ClassOutput classOutput = new GeneratedClassGizmoAdaptor(generatedClass, true);
+ Set innerIncomingChannelNames = channelBuildItems.stream()
+ .filter(c -> !c.isManagedByAConnector() && c.getDirection() == ChannelDirection.INCOMING)
+ .map(ChannelBuildItem::getName)
+ .collect(Collectors.toSet());
+
List mediatorConfigurations = new ArrayList<>(mediatorMethods.size());
List workerConfigurations = new ArrayList<>();
Map emittersConfigurations = new HashMap<>();
@@ -253,9 +262,7 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re
MethodInfo methodInfo = mediatorMethod.getMethod();
BeanInfo bean = mediatorMethod.getBean();
- if (methodInfo.hasAnnotation(BLOCKING) || methodInfo.hasAnnotation(SMALLRYE_BLOCKING)
- || methodInfo.hasAnnotation(RUN_ON_VIRTUAL_THREAD)
- || methodInfo.hasAnnotation(TRANSACTIONAL)) {
+ if (QuarkusMediatorConfigurationUtil.hasBlockingAnnotation(methodInfo)) {
// Just in case both annotation are used, use @Blocking value.
String poolName = methodInfo.hasAnnotation(RUN_ON_VIRTUAL_THREAD)
? QuarkusWorkerPoolRegistry.DEFAULT_VIRTUAL_THREAD_WORKER
@@ -280,7 +287,10 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re
QuarkusMediatorConfiguration mediatorConfiguration = QuarkusMediatorConfigurationUtil
.create(methodInfo, isSuspendMethod, bean, recorderContext,
- Thread.currentThread().getContextClassLoader(), conf.strict);
+ Thread.currentThread().getContextClassLoader(), conf.strict,
+ consumesFromInnerChannel(methodInfo, innerIncomingChannelNames)
+ ? ReactiveMessagingConfiguration.ExecutionMode.EVENT_LOOP // disable execution mode setting for inner channels
+ : conf.blockingSignaturesExecutionMode);
mediatorConfigurations.add(mediatorConfiguration);
String generatedInvokerName = generateInvoker(bean, methodInfo, isSuspendMethod, mediatorConfiguration,
@@ -567,4 +577,18 @@ private boolean doesImplement(ClassInfo clazz, DotName iface, IndexView index) {
}));
}
+ boolean consumesFromInnerChannel(MethodInfo methodInfo, Set innerChannelNames) {
+ AnnotationInstance incoming = methodInfo.annotation(INCOMING);
+ if (incoming != null) {
+ return innerChannelNames.contains(incoming.value().asString());
+ }
+ AnnotationInstance incomings = methodInfo.annotation(INCOMINGS);
+ if (incomings != null) {
+ return innerChannelNames.containsAll(
+ Arrays.stream(incomings.value().asNestedArray())
+ .map(i -> i.value().asString()).collect(Collectors.toSet()));
+ }
+ return false;
+ }
+
}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/signatures/BlockingSignatureExecutionModeTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/signatures/BlockingSignatureExecutionModeTest.java
new file mode 100644
index 0000000000000..c44f00fd86571
--- /dev/null
+++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/signatures/BlockingSignatureExecutionModeTest.java
@@ -0,0 +1,177 @@
+package io.quarkus.smallrye.reactivemessaging.signatures;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Flow;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+import org.eclipse.microprofile.reactive.messaging.Outgoing;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.quarkus.smallrye.reactivemessaging.config.DumbConnector;
+import io.quarkus.test.QuarkusUnitTest;
+import io.smallrye.common.annotation.NonBlocking;
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+
+public class BlockingSignatureExecutionModeTest {
+
+ @RegisterExtension
+ static final QuarkusUnitTest config = new QuarkusUnitTest()
+ .withApplicationRoot((jar) -> jar
+ .addClasses(DumbConnector.class,
+ ProducerOnC.class,
+ BlockingConsumerFromConnector.class,
+ ConsumerFromConnector.class,
+ ConsumerFromInnerChannel.class))
+ .overrideConfigKey("mp.messaging.incoming.a.connector", "dummy")
+ .overrideConfigKey("mp.messaging.incoming.a.values", "bonjour")
+ .overrideConfigKey("mp.messaging.incoming.b.connector", "dummy")
+ .overrideConfigKey("mp.messaging.incoming.b.values", "bonjour")
+ .overrideConfigKey("mp.messaging.incoming.c.connector", "dummy")
+ .overrideConfigKey("mp.messaging.incoming.c.values", "bonjour");
+
+ @Inject
+ BlockingConsumerFromConnector blockingConsumerFromConnector;
+
+ @Test
+ public void testBlockingSignatureFromConnector() {
+ await().until(() -> blockingConsumerFromConnector.list().size() == 2);
+ List threadNames = blockingConsumerFromConnector.threads().stream().distinct().toList();
+ assertThat(threadNames.contains(Thread.currentThread().getName())).isFalse();
+ for (String name : threadNames) {
+ assertThat(name.startsWith("executor-thread-")).isTrue();
+ }
+ }
+
+ @ApplicationScoped
+ public static class BlockingConsumerFromConnector {
+ private final List list = new CopyOnWriteArrayList<>();
+ private final List threads = new CopyOnWriteArrayList<>();
+
+ @Incoming("a")
+ public void produce(String s) {
+ threads.add(Thread.currentThread().getName());
+ list.add(s);
+ }
+
+ public List threads() {
+ return threads;
+ }
+
+ public List list() {
+ return list;
+ }
+ }
+
+ @Inject
+ ConsumerFromConnector consumerFromConnector;
+
+ @Test
+ public void testNonBlockingSignatureFromConnector() {
+ await().until(() -> consumerFromConnector.list().size() == 2);
+ List threadNames = consumerFromConnector.threads().stream().distinct().toList();
+ assertThat(threadNames).containsOnly(Thread.currentThread().getName());
+ }
+
+ @ApplicationScoped
+ public static class ConsumerFromConnector {
+ private final List list = new CopyOnWriteArrayList<>();
+ private final List threads = new CopyOnWriteArrayList<>();
+
+ @Incoming("b")
+ public Uni produce(String s) {
+ threads.add(Thread.currentThread().getName());
+ list.add(s);
+ return Uni.createFrom().voidItem();
+ }
+
+ public List threads() {
+ return threads;
+ }
+
+ public List list() {
+ return list;
+ }
+ }
+
+ @Inject
+ NonBlockingConsumerFromConnector nonBlockingConsumerFromConnector;
+
+ @Test
+ public void testNonBlockingAnnotationFromConnector() {
+ await().until(() -> nonBlockingConsumerFromConnector.list().size() == 2);
+ List threadNames = nonBlockingConsumerFromConnector.threads().stream().distinct().toList();
+ assertThat(threadNames).containsOnly(Thread.currentThread().getName());
+ }
+
+ @ApplicationScoped
+ public static class NonBlockingConsumerFromConnector {
+ private final List list = new CopyOnWriteArrayList<>();
+ private final List threads = new CopyOnWriteArrayList<>();
+
+ @Incoming("c")
+ @NonBlocking
+ public void produce(String s) {
+ threads.add(Thread.currentThread().getName());
+ list.add(s);
+ }
+
+ public List threads() {
+ return threads;
+ }
+
+ public List list() {
+ return list;
+ }
+ }
+
+ @Inject
+ ConsumerFromInnerChannel consumerFromInnerChannel;
+
+ @Test
+ public void testBlockingSignatureFromInnerChannel() {
+ await().until(() -> consumerFromInnerChannel.list().size() == 3);
+ assertThat(consumerFromInnerChannel.list()).containsExactly("d", "e", "f");
+ List threadNames = consumerFromInnerChannel.threads().stream().distinct().toList();
+ assertThat(threadNames).containsOnly(Thread.currentThread().getName());
+ }
+
+ @ApplicationScoped
+ public static class ConsumerFromInnerChannel {
+
+ private final List list = new CopyOnWriteArrayList<>();
+ private final List threads = new CopyOnWriteArrayList<>();
+
+ @Incoming("d")
+ public void produce(String s) {
+ threads.add(Thread.currentThread().getName());
+ list.add(s);
+ }
+
+ public List threads() {
+ return threads;
+ }
+
+ public List list() {
+ return list;
+ }
+ }
+
+ @ApplicationScoped
+ private static class ProducerOnC {
+
+ @Outgoing("d")
+ public Flow.Publisher produce() {
+ return Multi.createFrom().items("d", "e", "f");
+ }
+
+ }
+}
diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/ReactiveMessagingConfiguration.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/ReactiveMessagingConfiguration.java
index d621e14a685f7..94555611afbe4 100644
--- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/ReactiveMessagingConfiguration.java
+++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/ReactiveMessagingConfiguration.java
@@ -19,4 +19,17 @@ public class ReactiveMessagingConfiguration {
*/
@ConfigItem(name = "strict", defaultValue = "false")
public boolean strict;
+
+ /**
+ * Execution mode for the Messaging signatures considered "blocking", defaults to "worker".
+ * For the previous behaviour set to "event-loop".
+ */
+ @ConfigItem(name = "blocking.signatures.execution.mode", defaultValue = "worker")
+ public ExecutionMode blockingSignaturesExecutionMode;
+
+ public enum ExecutionMode {
+ EVENT_LOOP,
+ WORKER,
+ VIRTUAL_THREAD
+ }
}
diff --git a/independent-projects/arc/pom.xml b/independent-projects/arc/pom.xml
index 764a87f2432b2..0663b2d56d865 100644
--- a/independent-projects/arc/pom.xml
+++ b/independent-projects/arc/pom.xml
@@ -47,7 +47,7 @@
1.8.0
3.1.7
3.5.3.Final
- 2.5.8
+ 2.6.0
1.6.Final
3.25.3
diff --git a/independent-projects/qute/pom.xml b/independent-projects/qute/pom.xml
index 8feb8695e9c4b..d6e4d6940c809 100644
--- a/independent-projects/qute/pom.xml
+++ b/independent-projects/qute/pom.xml
@@ -46,7 +46,7 @@
3.12.1
3.2.1
3.2.5
- 2.5.8
+ 2.6.0
diff --git a/independent-projects/resteasy-reactive/pom.xml b/independent-projects/resteasy-reactive/pom.xml
index 6efff46a3069f..a603a3973b377 100644
--- a/independent-projects/resteasy-reactive/pom.xml
+++ b/independent-projects/resteasy-reactive/pom.xml
@@ -59,7 +59,7 @@
3.12.1
3.2.1
3.2.5
- 2.5.8
+ 2.6.0
2.3.0
4.5.5
5.4.0
@@ -70,10 +70,10 @@
3.0.3
3.0.0
4.2.1
- 3.10.0
+ 3.12.0
1.0.4
5.11.0
- 1.0.0
+ 1.1.0
3.4
diff --git a/integration-tests/mutiny-native-jctools/README.md b/integration-tests/mutiny-native-jctools/README.md
new file mode 100644
index 0000000000000..9f32ccdfe6c3d
--- /dev/null
+++ b/integration-tests/mutiny-native-jctools/README.md
@@ -0,0 +1,10 @@
+# Quarkus - Integration Tests - Mutiny native JCTools support
+
+This integration test checks that the Mutiny extension provides support for the native compilation of JCTools, which is now used internally in Mutiny instead of old custom data structures.
+
+This is important as JCTools makes use of `sun.misc.Unsafe` in some places.
+
+The tests do the following:
+
+- create all kinds of queues behind the factory `io.smallrye.mutiny.helpers.queues.Queues` interface, and
+- expose a few Mutiny pipelines where queues may be needed: overflow, custom emitters, etc.
diff --git a/integration-tests/mutiny-native-jctools/pom.xml b/integration-tests/mutiny-native-jctools/pom.xml
new file mode 100644
index 0000000000000..63745714cff8d
--- /dev/null
+++ b/integration-tests/mutiny-native-jctools/pom.xml
@@ -0,0 +1,104 @@
+
+
+ 4.0.0
+
+
+ quarkus-integration-tests-parent
+ io.quarkus
+ 999-SNAPSHOT
+
+
+ quarkus-integration-test-mutiny-native-jctools
+
+ Quarkus - Integration Tests - Mutiny native JCTools support
+
+
+
+ io.quarkus
+ quarkus-rest
+
+
+ io.quarkus
+ quarkus-rest-client
+
+
+ io.quarkus
+ quarkus-mutiny
+
+
+ io.quarkus
+ quarkus-junit5
+ test
+
+
+ io.rest-assured
+ rest-assured
+ test
+
+
+ org.awaitility
+ awaitility
+ test
+
+
+
+
+ io.quarkus
+ quarkus-rest-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-rest-client-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-mutiny-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+
+
+
+
+ io.quarkus
+ quarkus-maven-plugin
+
+
+
+ build
+
+
+
+
+
+
+
+
diff --git a/integration-tests/mutiny-native-jctools/src/main/java/io/quarkus/it/mutiny/nativejctools/MyResource.java b/integration-tests/mutiny-native-jctools/src/main/java/io/quarkus/it/mutiny/nativejctools/MyResource.java
new file mode 100644
index 0000000000000..1fd34decdc286
--- /dev/null
+++ b/integration-tests/mutiny-native-jctools/src/main/java/io/quarkus/it/mutiny/nativejctools/MyResource.java
@@ -0,0 +1,85 @@
+package io.quarkus.it.mutiny.nativejctools;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import jakarta.ws.rs.GET;
+import jakarta.ws.rs.Path;
+import jakarta.ws.rs.core.MediaType;
+
+import org.jboss.resteasy.reactive.RestStreamElementType;
+
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.helpers.queues.Queues;
+import io.smallrye.mutiny.infrastructure.Infrastructure;
+import io.smallrye.mutiny.subscription.MultiEmitter;
+
+@Path("/tests")
+public class MyResource {
+
+ @GET
+ @Path("create-queues")
+ public String createQueues() {
+ ArrayList sums = new ArrayList<>();
+ List.of(
+ Queues.createMpscQueue(),
+ Queues.createMpscArrayQueue(2048),
+ Queues.createSpscArrayQueue(2048),
+ Queues.createSpscUnboundedQueue(2048),
+ Queues.createSpscUnboundedArrayQueue(2048),
+ Queues.createSpscChunkedArrayQueue(2048)).forEach(queue -> {
+ for (int i = 0; i < 1024; i++) {
+ queue.offer(i);
+ }
+ long sum = 0L;
+ for (Integer n = (Integer) queue.poll(); n != null; n = (Integer) queue.poll()) {
+ sum = sum + n;
+ }
+ sums.add(sum);
+ });
+ return "Ok :: " + sums.stream()
+ .map(Objects::toString)
+ .collect(Collectors.joining("/"));
+ }
+
+ @GET
+ @Path("ticks-overflow")
+ public String ticksWithOverflow() {
+ List ticks = Multi.createFrom().ticks().every(Duration.ofMillis(10))
+ .onOverflow().bufferUnconditionally()
+ .onItem().transform(tick -> ":")
+ .select().first(10)
+ .collect().asList().await().atMost(Duration.ofSeconds(5));
+ return String.join("", ticks);
+ }
+
+ @GET
+ @RestStreamElementType(MediaType.TEXT_PLAIN)
+ @Path("emitter")
+ public Multi emitter() {
+ AtomicInteger counter = new AtomicInteger();
+ return Multi.createFrom().emitter(emitter -> {
+ ScheduledExecutorService scheduler = Infrastructure.getDefaultWorkerPool();
+ scheduler.schedule(() -> emitAndSchedule(counter, emitter), 100, TimeUnit.MILLISECONDS);
+ });
+ }
+
+ private void emitAndSchedule(AtomicInteger counter, MultiEmitter super String> emitter) {
+ ScheduledExecutorService scheduler = Infrastructure.getDefaultWorkerPool();
+ int n = counter.getAndIncrement();
+ if (n < 5) {
+ for (int i = 0; i < n * 10; i++) {
+ emitter.emit(String.valueOf(i));
+ }
+ scheduler.schedule(() -> emitAndSchedule(counter, emitter), 125, TimeUnit.MILLISECONDS);
+ } else {
+ emitter.complete();
+ }
+ }
+}
diff --git a/integration-tests/mutiny-native-jctools/src/main/resources/application.properties b/integration-tests/mutiny-native-jctools/src/main/resources/application.properties
new file mode 100644
index 0000000000000..8b137891791fe
--- /dev/null
+++ b/integration-tests/mutiny-native-jctools/src/main/resources/application.properties
@@ -0,0 +1 @@
+
diff --git a/integration-tests/mutiny-native-jctools/src/test/java/io/quarkus/it/mutiny/nativejctools/MyResourceIT.java b/integration-tests/mutiny-native-jctools/src/test/java/io/quarkus/it/mutiny/nativejctools/MyResourceIT.java
new file mode 100644
index 0000000000000..96798185b47da
--- /dev/null
+++ b/integration-tests/mutiny-native-jctools/src/test/java/io/quarkus/it/mutiny/nativejctools/MyResourceIT.java
@@ -0,0 +1,7 @@
+package io.quarkus.it.mutiny.nativejctools;
+
+import io.quarkus.test.junit.QuarkusIntegrationTest;
+
+@QuarkusIntegrationTest
+public class MyResourceIT extends MyResourceTest {
+}
diff --git a/integration-tests/mutiny-native-jctools/src/test/java/io/quarkus/it/mutiny/nativejctools/MyResourceTest.java b/integration-tests/mutiny-native-jctools/src/test/java/io/quarkus/it/mutiny/nativejctools/MyResourceTest.java
new file mode 100644
index 0000000000000..7569dea6c5c35
--- /dev/null
+++ b/integration-tests/mutiny-native-jctools/src/test/java/io/quarkus/it/mutiny/nativejctools/MyResourceTest.java
@@ -0,0 +1,73 @@
+package io.quarkus.it.mutiny.nativejctools;
+
+import static io.restassured.RestAssured.get;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.jupiter.api.Assertions.assertIterableEquals;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import jakarta.ws.rs.client.Client;
+import jakarta.ws.rs.client.ClientBuilder;
+import jakarta.ws.rs.client.WebTarget;
+import jakarta.ws.rs.sse.SseEventSource;
+
+import org.junit.jupiter.api.Test;
+
+import io.quarkus.test.junit.QuarkusTest;
+import io.restassured.RestAssured;
+
+@QuarkusTest
+public class MyResourceTest {
+
+ @Test
+ public void testCreateQueues() {
+ get("/tests/create-queues")
+ .then()
+ .body(is("Ok :: 523776/523776/523776/523776/523776/523776"))
+ .statusCode(200);
+ }
+
+ @Test
+ public void testTicksWithOverflow() {
+ get("/tests/ticks-overflow")
+ .then()
+ .body(is("::::::::::"))
+ .statusCode(200);
+ }
+
+ @Test
+ public void testEmitter() throws Throwable {
+ Client client = ClientBuilder.newClient();
+ WebTarget target = client.target("http://localhost:" + RestAssured.port + "/tests/emitter");
+ SseEventSource eventSource = SseEventSource.target(target).build();
+
+ AtomicBoolean done = new AtomicBoolean();
+ AtomicReference failure = new AtomicReference<>();
+ ArrayList events = new ArrayList<>();
+
+ eventSource.register(
+ event -> events.add(event.readData()),
+ failure::set,
+ () -> done.set(true));
+ eventSource.open();
+
+ await().atMost(Duration.ofSeconds(10))
+ .until(() -> done.get() || failure.get() != null);
+
+ if (failure.get() != null) {
+ throw failure.get();
+ }
+
+ ArrayList expected = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ for (int j = 0; j < i * 10; j++) {
+ expected.add(String.valueOf(j));
+ }
+ }
+ assertIterableEquals(expected, events);
+ }
+}
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index bd23f41c363e2..4e391c144cb94 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -396,6 +396,7 @@
mtls-certificates
virtual-threads
+ mutiny-native-jctools