Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to Mutiny 2.6.0 and Reactive Messaging 4.20.0 #39768

Merged
merged 4 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@
<smallrye-context-propagation.version>2.1.0</smallrye-context-propagation.version>
<smallrye-reactive-streams-operators.version>1.0.13</smallrye-reactive-streams-operators.version>
<smallrye-reactive-types-converter.version>3.0.1</smallrye-reactive-types-converter.version>
<smallrye-mutiny-vertx-binding.version>3.11.0</smallrye-mutiny-vertx-binding.version>
<smallrye-reactive-messaging.version>4.19.0</smallrye-reactive-messaging.version>
<smallrye-mutiny-vertx-binding.version>3.12.0</smallrye-mutiny-vertx-binding.version>
<smallrye-reactive-messaging.version>4.20.0</smallrye-reactive-messaging.version>
<smallrye-stork.version>2.6.0</smallrye-stork.version>
<jakarta.activation.version>2.1.3</jakarta.activation.version>
<jakarta.annotation-api.version>2.1.1</jakarta.annotation-api.version>
Expand Down Expand Up @@ -148,7 +148,7 @@
<brotli4j.version>1.16.0</brotli4j.version>
<reactive-streams.version>1.0.4</reactive-streams.version>
<jboss-logging.version>3.5.3.Final</jboss-logging.version>
<mutiny.version>2.5.8</mutiny.version>
<mutiny.version>2.6.0</mutiny.version>
<kafka3.version>3.7.0</kafka3.version>
<lz4.version>1.8.0</lz4.version> <!-- dependency of the kafka-clients that could be overridden by other imported BOMs in the platform -->
<snappy.version>1.1.10.5</snappy.version>
Expand Down Expand Up @@ -224,7 +224,7 @@
<org-crac.version>0.1.3</org-crac.version>
<sshd-common.version>2.12.0</sshd-common.version>
<mime4j.version>0.8.11</mime4j.version>
<mutiny-zero.version>1.0.0</mutiny-zero.version>
<mutiny-zero.version>1.1.0</mutiny-zero.version>
<pulsar-client.version>3.0.0</pulsar-client.version>
<async-http-client.version>2.12.3</async-http-client.version>
<!-- Dev UI -->
Expand Down
25 changes: 14 additions & 11 deletions docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -615,9 +615,10 @@
----
package inbound;

import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
Expand All @@ -628,7 +629,7 @@

@Incoming("rebalanced-example")
@Acknowledgment(Acknowledgment.Strategy.NONE)
public CompletionStage<Void> consume(IncomingKafkaRecord<Integer, String> message) {
public CompletionStage<Void> consume(Message<ConsumerRecord<Integer, String>> message) {
// We don't need to ACK messages because in this example,
// we set offset during consumer rebalance
return CompletableFuture.completedFuture(null);
Expand Down Expand Up @@ -715,14 +716,14 @@
}
----

The incoming method can also receive `Message<List<Payload>>`, `KafkaRecordBatch<Key, Payload>`, and `ConsumerRecords<Key, Payload>` types.
The incoming method can also receive `Message<List<Payload>>`, `Message<ConsumerRecords<Key, Payload>>`, and `ConsumerRecords<Key, Payload>` types.
They give access to record details such as offset or timestamp:

Check warning on line 720 in docs/src/main/asciidoc/kafka.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'.", "location": {"path": "docs/src/main/asciidoc/kafka.adoc", "range": {"start": {"line": 720, "column": 30}}}, "severity": "INFO"}

Check warning on line 720 in docs/src/main/asciidoc/kafka.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'.", "location": {"path": "docs/src/main/asciidoc/kafka.adoc", "range": {"start": {"line": 720, "column": 41}}}, "severity": "INFO"}

[source, java]
----
@Incoming("prices")
public CompletionStage<Void> consumeMessage(KafkaRecordBatch<String, Double> records) {
for (KafkaRecord<String, Double> record : records) {
public CompletionStage<Void> consumeMessage(Message<ConsumerRecords<String, Double>> records) {
for (ConsumerRecord<String, Double> record : records.getPayload()) {
String payload = record.getPayload();
String topic = record.getTopic();
// process messages
Expand Down Expand Up @@ -1540,13 +1541,15 @@
----
import jakarta.enterprise.context.ApplicationScoped;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaRecordBatch;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;

@ApplicationScoped
Expand All @@ -1557,10 +1560,10 @@
KafkaTransactions<Integer> txProducer;

@Incoming("prices-in")
public Uni<Void> emitInTransaction(KafkaRecordBatch<String, Integer> batch) { // <1>
public Uni<Void> emitInTransaction(Message<ConsumerRecords<String, Integer>> batch) { // <1>
return txProducer.withTransactionAndAck(batch, emitter -> { // <2>
for (KafkaRecord<String, Integer> record : batch) {
emitter.send(KafkaRecord.of(record.getKey(), record.getPayload() + 1)); // <3>
for (ConsumerRecord<String, Integer> record : batch.getPayload()) {
emitter.send(KafkaRecord.of(record.key(), record.value() + 1)); // <3>
}
return Uni.createFrom().voidItem();
});
Expand All @@ -1571,7 +1574,7 @@

<1> It is recommended to use exactly-once processing along with the batch consumption mode.
While it is possible to use it with a single Kafka message, it'll have a significant performance impact.
<2> The consumed `KafkaRecordBatch` message is passed to the `KafkaTransactions#withTransactionAndAck` in order to handle the offset commits and message acks.
<2> The consumed message is passed to the `KafkaTransactions#withTransactionAndAck` in order to handle the offset commits and message acks.
<3> The `send` method writes records to Kafka inside the transaction, without waiting for send receipt from the broker.
Messages pending to be written to Kafka will be buffered, and flushed before committing the transaction.
It is therefore recommended configuring the `@OnOverflow` `bufferSize` in order to fit enough messages, for example the `max.poll.records`, maximum amount of records returned in a batch.
Expand Down Expand Up @@ -1908,7 +1911,7 @@
[source,java]
----
@Incoming("my-kafka-records")
public void consume(KafkaRecord<Long, byte[]> record) {
public void consume(Record<Long, byte[]> record) {
...
}
----
Expand Down
56 changes: 27 additions & 29 deletions docs/src/main/asciidoc/pulsar.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -416,22 +416,20 @@
[source, java]
----
@Incoming("prices")
public CompletionStage<Void> consumeMessage(PulsarIncomingBatchMessage<Double> messages) {
for (PulsarMessage<Double> msg : messages) {
msg.getMetadata(PulsarIncomingMessageMetadata.class).ifPresent(metadata -> {
String key = metadata.getKey();
String topic = metadata.getTopicName();
long timestamp = metadata.getEventTime();
//... process messages
});
public CompletionStage<Void> consumeMessage(Message<org.apache.pulsar.client.api.Messages<Double>> messages) {
for (org.apache.pulsar.client.api.Message<Double> msg : messages.getPayload()) {
String key = msg.getKey();
String topic = msg.getTopicName();
long timestamp = msg.getEventTime();
//... process messages
}
// ack will commit the latest offsets (per partition) of the batch.
return messages.ack();
}

@Incoming("prices")
public void consumeRecords(Messages<Double> messages) {
for (Message<Double> msg : messages) {
public void consumeRecords(org.apache.pulsar.client.api.Messages<Double> messages) {
for (org.apache.pulsar.client.api.Message<Double> msg : messages) {
//... process messages
}
}
Expand All @@ -457,7 +455,7 @@

== Sending messages to Pulsar

The Pulsar Connector can write Reactive Messaging Messages as Pulsar Message.
The Pulsar Connector can write Reactive Messaging `Message`s as Pulsar Message.

Check warning on line 458 in docs/src/main/asciidoc/pulsar.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'.", "location": {"path": "docs/src/main/asciidoc/pulsar.adoc", "range": {"start": {"line": 458, "column": 62}}}, "severity": "INFO"}

=== Example

Expand Down Expand Up @@ -739,32 +737,32 @@
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.apache.pulsar.client.api.Messages;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessage;
import io.smallrye.reactive.messaging.pulsar.PulsarMessage;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;

@ApplicationScoped
public class PulsarExactlyOnceProcessor {
@ApplicationScoped
public class PulsarExactlyOnceProcessor {

@Inject
@Channel("tx-out-example")
PulsarTransactions<Integer> txProducer;

@Incoming("in-channel")
public Uni<Void> emitInTransaction(PulsarIncomingBatchMessage<Integer> batch) {
return txProducer.withTransactionAndAck(batch, emitter -> {
for (PulsarMessage<Integer> record : batch) {
emitter.send(PulsarMessage.of(record.getPayload() + 1, record.getKey()));
}
return Uni.createFrom().voidItem();
});
}
@Inject
@Channel("tx-out-example")
PulsarTransactions<Integer> txProducer;

}
@Incoming("in-channel")
public Uni<Void> emitInTransaction(Message<Messages<Integer>> batch) {
return txProducer.withTransactionAndAck(batch, emitter -> {
for (org.apache.pulsar.client.api.Message<Integer> record : batch.getPayload()) {
emitter.send(PulsarMessage.of(record.getValue() + 1, record.getKey()));
}
return Uni.createFrom().voidItem();
});
}

}
----

If the processing completes successfully, the message is acknowledged inside the transaction and the transaction is committed.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkus.mutiny.deployment;

import java.util.List;
import java.util.Optional;

import org.jboss.threads.ContextHandler;
Expand All @@ -10,6 +11,7 @@
import io.quarkus.deployment.builditem.ContextHandlerBuildItem;
import io.quarkus.deployment.builditem.ExecutorBuildItem;
import io.quarkus.deployment.builditem.ShutdownContextBuildItem;
import io.quarkus.deployment.builditem.nativeimage.UnsafeAccessedFieldBuildItem;
import io.quarkus.mutiny.runtime.MutinyInfrastructure;

public class MutinyProcessor {
Expand All @@ -31,4 +33,12 @@ public void buildTimeInit(MutinyInfrastructure recorder) {
recorder.configureThreadBlockingChecker();
recorder.configureOperatorLogger();
}

@BuildStep
public List<UnsafeAccessedFieldBuildItem> jctoolsUnsafeAccessedFields() {
return List.of(
new UnsafeAccessedFieldBuildItem(
"org.jctools.util.UnsafeRefArrayAccess",
"REF_ELEMENT_SHIFT"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ final class DotNames {
static final DotName TARGETED_MESSAGES = DotName.createSimple(io.smallrye.reactive.messaging.TargetedMessages.class.getName());

static final DotName MESSAGE = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Message.class.getName());

static final DotName GENERIC_PAYLOAD = DotName.createSimple(io.smallrye.reactive.messaging.GenericPayload.class.getName());

static final DotName KAFKA_RECORD = DotName.createSimple(io.smallrye.reactive.messaging.kafka.KafkaRecord.class.getName());
static final DotName RECORD = DotName.createSimple(io.smallrye.reactive.messaging.kafka.Record.class.getName());
static final DotName CONSUMER_RECORD = DotName.createSimple(org.apache.kafka.clients.consumer.ConsumerRecord.class.getName());
Expand All @@ -30,6 +33,7 @@ final class DotNames {
static final DotName SUBSCRIBER = DotName.createSimple(org.reactivestreams.Subscriber.class.getName());
static final DotName SUBSCRIBER_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder.class.getName());
static final DotName PUBLISHER = DotName.createSimple(org.reactivestreams.Publisher.class.getName());
static final DotName FLOW_PUBLISHER = DotName.createSimple(java.util.concurrent.Flow.Publisher.class.getName());
static final DotName PUBLISHER_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder.class.getName());
static final DotName PROCESSOR = DotName.createSimple(org.reactivestreams.Processor.class.getName());
static final DotName PROCESSOR_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ private Type getIncomingTypeFromMethod(MethodInfo method) {
if ((isCompletionStage(returnType) && parametersCount >= 1)
|| (isUni(returnType) && parametersCount >= 1)
|| (isPublisher(returnType) && parametersCount == 1)
|| (isFlowPublisher(returnType) && parametersCount == 1)
|| (isPublisherBuilder(returnType) && parametersCount == 1)
|| (isMulti(returnType) && parametersCount == 1)) {
incomingType = parameterTypes.get(0);
Expand All @@ -433,8 +434,11 @@ private Type getIncomingTypeFromMethod(MethodInfo method) {
}

// @Incoming @Outgoing stream manipulation
if (incomingType != null
&& (isPublisher(incomingType) || isPublisherBuilder(incomingType) || isMulti(incomingType))) {
if (incomingType != null &&
(isPublisher(incomingType)
|| isFlowPublisher(incomingType)
|| isPublisherBuilder(incomingType)
|| isMulti(incomingType))) {
incomingType = incomingType.asParameterizedType().arguments().get(0);
}
}
Expand All @@ -446,7 +450,10 @@ private Type getIncomingTypeFromChannelInjectionPoint(Type injectionPointType) {
return null;
}

if (isPublisher(injectionPointType) || isPublisherBuilder(injectionPointType) || isMulti(injectionPointType)) {
if (isPublisher(injectionPointType)
|| isPublisherBuilder(injectionPointType)
|| isFlowPublisher(injectionPointType)
|| isMulti(injectionPointType)) {
return injectionPointType.asParameterizedType().arguments().get(0);
} else {
return null;
Expand All @@ -462,6 +469,7 @@ private Type getOutgoingTypeFromMethod(MethodInfo method) {

// @Outgoing
if ((isPublisher(returnType) && parametersCount == 0)
|| (isFlowPublisher(returnType) && parametersCount == 0)
|| (isPublisherBuilder(returnType) && parametersCount == 0)
|| (isMulti(returnType) && parametersCount == 0)
|| (isMultiSplitter(returnType) && parametersCount == 0)
Expand All @@ -476,11 +484,11 @@ private Type getOutgoingTypeFromMethod(MethodInfo method) {

// @Incoming @Outgoing
if (method.hasAnnotation(DotNames.INCOMING) || method.hasAnnotation(DotNames.INCOMINGS)) {
if ((isCompletionStage(returnType) && parametersCount == 1)
|| (isUni(returnType) && parametersCount == 1)
|| (isPublisher(returnType) && parametersCount == 1)
if (isCompletionStage(returnType) || isUni(returnType) || isMulti(returnType)) {
outgoingType = returnType.asParameterizedType().arguments().get(0);
} else if ((isPublisher(returnType) && parametersCount == 1)
|| (isFlowPublisher(returnType) && parametersCount == 1)
|| (isPublisherBuilder(returnType) && parametersCount == 1)
|| (isMulti(returnType) && parametersCount == 1)
|| (isMultiSplitter(returnType) && parametersCount == 1)) {
outgoingType = returnType.asParameterizedType().arguments().get(0);
} else if ((isProcessor(returnType) && parametersCount == 0)
Expand All @@ -494,7 +502,10 @@ private Type getOutgoingTypeFromMethod(MethodInfo method) {

// @Incoming @Outgoing stream manipulation
if (outgoingType != null
&& (isPublisher(outgoingType) || isPublisherBuilder(outgoingType) || isMulti(outgoingType))) {
&& (isPublisher(outgoingType)
|| isFlowPublisher(outgoingType)
|| isPublisherBuilder(outgoingType)
|| isMulti(outgoingType))) {
outgoingType = outgoingType.asParameterizedType().arguments().get(0);
}
}
Expand Down Expand Up @@ -548,6 +559,11 @@ private void extractKeyValueType(Type type, TriConsumer<Type, Type, Boolean> key
return;
}

if (isGenericPayload(type)) {
extractKeyValueType(type.asParameterizedType().arguments().get(0), keyValueTypeAcceptor);
return;
}

if (isMessage(type)) {
List<Type> typeArguments = type.asParameterizedType().arguments();
Type messageTypeParameter = typeArguments.get(0);
Expand Down Expand Up @@ -627,6 +643,13 @@ private static boolean isPublisher(Type type) {
&& type.asParameterizedType().arguments().size() == 1;
}

private static boolean isFlowPublisher(Type type) {
// raw type Flow.Publisher is wrong, must be Flow.Publisher<Something>
return DotNames.FLOW_PUBLISHER.equals(type.name())
&& type.kind() == Type.Kind.PARAMETERIZED_TYPE
&& type.asParameterizedType().arguments().size() == 1;
}

private static boolean isPublisherBuilder(Type type) {
// raw type PublisherBuilder is wrong, must be PublisherBuilder<Something, SomethingElse>
return DotNames.PUBLISHER_BUILDER.equals(type.name())
Expand Down Expand Up @@ -687,6 +710,13 @@ private static boolean isMessage(Type type) {
&& type.asParameterizedType().arguments().size() == 1;
}

private static boolean isGenericPayload(Type type) {
// raw type Message is wrong, must be Message<Something>
return DotNames.GENERIC_PAYLOAD.equals(type.name())
&& type.kind() == Type.Kind.PARAMETERIZED_TYPE
&& type.asParameterizedType().arguments().size() == 1;
}

private static boolean isKafkaRecord(Type type) {
// raw type KafkaRecord is wrong, must be KafkaRecord<Something, SomethingElse>
return DotNames.KAFKA_RECORD.equals(type.name())
Expand Down
Loading
Loading