From ddd895d5e985ef3fd09a8d40080bc92049a83697 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Thu, 21 Mar 2024 15:59:29 +0100 Subject: [PATCH] Introduce GenericPayload primarily for sending outgoing payloads with additional metadata --- api/revapi.json | 14 +- .../reactive/messaging/GenericPayload.java | 115 +++++++++++++ .../smallrye/reactive/messaging/Messages.java | 2 +- documentation/mkdocs.yml | 1 + .../main/docs/concepts/generic-payloads.md | 34 ++++ documentation/src/main/docs/concepts/model.md | 70 +++++--- .../genericpayload/GenericPayloadExample.java | 35 ++++ .../kafka/inbound/KafkaCheckpointExample.java | 4 +- .../kafka/inbound/KafkaDeadLetterExample.java | 4 +- .../inbound/KafkaRebalancedConsumer.java | 5 +- .../outbound/KafkaExactlyOnceProcessor.java | 11 +- .../inbound/PulsarMessageBatchExample.java | 28 ++- .../outbound/PulsarExactlyOnceProcessor.java | 12 +- .../messaging/kafka/GenericPayloadTest.java | 73 ++++++++ .../messaging/providers/AbstractMediator.java | 4 + .../providers/ProcessorMediator.java | 17 +- .../providers/PublisherMediator.java | 17 +- .../providers/StreamTransformerMediator.java | 10 +- .../helpers/GenericPayloadConverter.java | 24 +++ .../messaging/GenericPayloadTest.java | 161 ++++++++++++++++++ 20 files changed, 570 insertions(+), 71 deletions(-) create mode 100644 api/src/main/java/io/smallrye/reactive/messaging/GenericPayload.java create mode 100644 documentation/src/main/docs/concepts/generic-payloads.md create mode 100644 documentation/src/main/java/genericpayload/GenericPayloadExample.java create mode 100644 smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/GenericPayloadTest.java create mode 100644 smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/GenericPayloadConverter.java create mode 100644 smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/GenericPayloadTest.java diff --git a/api/revapi.json b/api/revapi.json index 7bc3bce7fb..188a37c9bd 100644 --- a/api/revapi.json +++ b/api/revapi.json @@ -27,7 +27,17 @@ "criticality" : "highlight", "minSeverity" : "POTENTIALLY_BREAKING", "minCriticality" : "documented", - "differences" : [ ] + "differences" : [ + { + "ignore": true, + "code": "java.method.visibilityIncreased", + "old": "method org.eclipse.microprofile.reactive.messaging.Metadata io.smallrye.reactive.messaging.Messages::merge(org.eclipse.microprofile.reactive.messaging.Metadata, org.eclipse.microprofile.reactive.messaging.Metadata)", + "new": "method org.eclipse.microprofile.reactive.messaging.Metadata io.smallrye.reactive.messaging.Messages::merge(org.eclipse.microprofile.reactive.messaging.Metadata, org.eclipse.microprofile.reactive.messaging.Metadata)", + "oldVisibility": "private", + "newVisibility": "public", + "justification": "Metadata merge utility function exposed from the Messages API" + } + ] } }, { "extension" : "revapi.reporter.json", @@ -46,4 +56,4 @@ "minCriticality" : "documented", "output" : "out" } -} ] \ No newline at end of file +} ] diff --git a/api/src/main/java/io/smallrye/reactive/messaging/GenericPayload.java b/api/src/main/java/io/smallrye/reactive/messaging/GenericPayload.java new file mode 100644 index 0000000000..ed5a783152 --- /dev/null +++ b/api/src/main/java/io/smallrye/reactive/messaging/GenericPayload.java @@ -0,0 +1,115 @@ +package io.smallrye.reactive.messaging; + +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Metadata; + +/** + * A generic payload that can be used to wrap a payload with metadata. + * Allows associating a payload with metadata to be sent as a message, + * without using signatures supporting {@code Message}. + * + * @param the type of the payload + */ +public class GenericPayload { + + /** + * Creates a new payload with the given payload and empty metadata. + * + * @param payload the payload + * @param the type of the payload + * @return the payload + */ + public static GenericPayload of(T payload) { + return new GenericPayload<>(payload, Metadata.empty()); + } + + /** + * Creates a new payload with the given payload and metadata. + * + * @param payload the payload + * @param metadata the metadata + * @param the type of the payload + * @return the payload + */ + public static GenericPayload of(T payload, Metadata metadata) { + return new GenericPayload<>(payload, metadata); + } + + /** + * Creates a new payload from the given message. + * + * @param message the message + * @param the type of the payload + * @return the payload + */ + public static GenericPayload from(Message message) { + return new GenericPayload<>(message.getPayload(), message.getMetadata()); + } + + private final T payload; + private final Metadata metadata; + + public GenericPayload(T payload, Metadata metadata) { + this.payload = payload; + this.metadata = metadata; + } + + /** + * Gets the payload associated with this payload. + * + * @return the payload + */ + public T getPayload() { + return payload; + } + + /** + * Gets the metadata associated with this payload. + * + * @return the metadata + */ + public Metadata getMetadata() { + return metadata; + } + + /** + * Adds metadata to this payload. + * + * @param metadata the metadata to add + * @return a new payload with the added metadata + */ + public GenericPayload withMetadata(Metadata metadata) { + return GenericPayload.of(this.payload, metadata); + } + + /** + * Adds metadata to this payload. + * + * @param payload the payload to add + * @return a new payload with the added metadata + */ + public GenericPayload withPayload(R payload) { + return GenericPayload.of(payload, this.metadata); + } + + /** + * Converts this payload to a message. + * + * @return the message with the payload and metadata + */ + public Message toMessage() { + return Message.of(payload, metadata); + } + + /** + * Converts this payload to a message, merging the metadata with the given message. + * + * @param message the message to merge the metadata with + * @return the message with the payload and merged metadata + */ + public Message toMessage(Message message) { + Metadata merged = Messages.merge(message.getMetadata(), this.metadata); + return message.withPayload(payload).withMetadata(merged); + } + +} diff --git a/api/src/main/java/io/smallrye/reactive/messaging/Messages.java b/api/src/main/java/io/smallrye/reactive/messaging/Messages.java index aef5d64eed..3aa861fd6f 100644 --- a/api/src/main/java/io/smallrye/reactive/messaging/Messages.java +++ b/api/src/main/java/io/smallrye/reactive/messaging/Messages.java @@ -142,7 +142,7 @@ static Message> merge(List> list) { } @SuppressWarnings({ "unchecked", "rawtypes" }) - private static Metadata merge(Metadata first, Metadata second) { + static Metadata merge(Metadata first, Metadata second) { Metadata result = first; for (Object meta : second) { Class clazz = meta.getClass(); diff --git a/documentation/mkdocs.yml b/documentation/mkdocs.yml index 6fcb4bbeaa..3e57466613 100644 --- a/documentation/mkdocs.yml +++ b/documentation/mkdocs.yml @@ -33,6 +33,7 @@ nav: - 'Advanced Configuration' : concepts/advanced-config.md - 'Message Context' : concepts/message-context.md - 'Metadata Injection': concepts/incoming-metadata-injection.md + - 'Generic Payloads': concepts/generic-payloads.md - Kafka: - kafka/kafka.md diff --git a/documentation/src/main/docs/concepts/generic-payloads.md b/documentation/src/main/docs/concepts/generic-payloads.md new file mode 100644 index 0000000000..3a94d3781c --- /dev/null +++ b/documentation/src/main/docs/concepts/generic-payloads.md @@ -0,0 +1,34 @@ +# Generic Payloads + +!!!warning "Experimental" + Generic payloads are an experimental feature and the API is subject to change. + +When using reactive messaging, `Message` flow in your system +each message has a payload but can also contain _metadata_, as explained in [Messages, Payload, Metadata](concepts.md#messages-payload-metadata). +The metadata can hold information, for example in an outgoing channel, additional properties of the outgoing message to be sent to the broker. + +It is sometimes preferable to continue using the [payload signatures](model.md#messages-vs-payloads), +and also being able to attach metadata. +Using `GenericPayload` allows customizing metadata when handling payloads in SmallRye Reactive Messaging `@Incoming` and `@Outgoing` methods. +`GenericPayload` is a wrapper type, like the `Message`, containing a payload and metadata, +without requiring handling acknowledgments manually. + +``` java +{{ insert('genericpayload/GenericPayloadExample.java', 'code') }} +``` + +You can combine generic payloads with [metadata injection](incoming-metadata-injection.md) : + + +``` java +{{ insert('genericpayload/GenericPayloadExample.java', 'injection') }} +``` + +Note that the metadata provided with the outgoing generic payload is merged with the incoming message metadata. + +!!! warning "Limitations" + There are several limitations for the use of `GenericPayload`: + `GenericPayload` is not supported in emitters, as normal outgoing `Message` can be used for that purpose. + While `GenericPayload` can be used as an incoming payload type, + [message converters](converters.md) are not applied to the payload type `T`. + diff --git a/documentation/src/main/docs/concepts/model.md b/documentation/src/main/docs/concepts/model.md index 7c225dc46c..581e54f4ab 100644 --- a/documentation/src/main/docs/concepts/model.md +++ b/documentation/src/main/docs/concepts/model.md @@ -17,8 +17,8 @@ These annotations are used on *methods*: {{ insert('beans/MessageProcessingBean.java') }} ``` -!!! note -Reactive Messaging beans can either be in the *application* scope (`@ApplicationScoped`) or dependent scope (`@Dependent`). +!!!note + Reactive Messaging beans can either be in the *application* scope (`@ApplicationScoped`) or dependent scope (`@Dependent`). Manipulating messages can be cumbersome. When you are only interested in the payload, you can use the following syntax: The following code is equivalent to the snippet from above: @@ -27,10 +27,10 @@ When you are only interested in the payload, you can use the following syntax: T {{ insert('beans/PayloadProcessingBean.java') }} ``` -!!! important -You should not call methods annotated with `@Incoming` and/or -`@Outgoing` directly from your code. They are invoked by the framework. -Having user code invoking them would not have the expected outcome. +!!!important + You should not call methods annotated with `@Incoming` and/or + `@Outgoing` directly from your code. They are invoked by the framework. + Having user code invoking them would not have the expected outcome. SmallRye Reactive Messaging automatically binds matching `@Outgoing` to @@ -83,15 +83,41 @@ You can also create new instance of `Message` from an existing one: ``` java {{ insert('messages/MessageExamples.java', 'copy') }} ``` - -!!! note "Acknowledgement?" -Acknowledgement is an important part of messaging systems. This will be -covered in the [acknowledgement](acknowledgement.md) -section. +!!! warning "Acknowledgement?" + Acknowledgement is an important part of messaging systems. This will be covered in the [acknowledgement](acknowledgement.md) section. !!! note "Connector Metadata" -Most connectors are providing metadata to let you extract technical -details about the message, but also customize the outbound dispatching. + Most connectors are providing metadata to let you extract technical + details about the message, but also customize the outbound dispatching. + +## Messages vs. Payloads + +Reactive messaging offers flexibility when it comes to handling messages and their acknowledgements. +The application developer can choose to finely handle acknowledgements per-message basis, by handling the +`Message`-based signatures. Otherwise, when handling payloads, acknowledgements +(and negative-acknowledgements) are handled by the framework. +The following sections in this documentation detail both development models. + +While being the easier development model, in the past using payload-based signatures did not allow associating connector-specific metadata, +making the `Message`-based signatures the de-facto choice even for the most common scenarios. +This lead to using the connector custom-message implementation types, +such as `IncomingKafkaRecord`, `KafkaRecord` or `IncomingRabbitMQMessage`, +as a convenience for accessing connector-specific metadata. + +Not only this forces to handle acknowledgements manually, +it also doesn't allow for incoming or outgoing `Messages` to be [intercepted](decorators.md#intercepting-incoming-and-outgoing-messages) +or [observed](observability.md). + +!!! warning "Custom `Message` types & Message interception" + Custom `Message` types such as `IncomingKafkaRecord`, `KafkaRecord` or `IncomingRabbitMQMessage` + are not compatible with features intercepting messages. + Therefore, it is no longer recommended to use custom `Message` implementations in consumptions methods. + Instead, you can either use the generic `Message` type and access specific metadata, + or use the payload with [metadata injection](incoming-metadata-injection.md) + +Since [incoming metadata injection](incoming-metadata-injection.md) and [generic payloads](generic-payloads.md) +features added to SmallRye Reactive Messaging, +it is easier to use payload signatures and benefit from acknowledgement handling and still access connector-specific metadata. ## Generating Messages @@ -109,10 +135,10 @@ called for every *request* from the downstream: ``` !!! note "Requests?" -Reactive Messaging connects components to build a reactive stream. -In a reactive stream, the emissions are controlled by the consumer -(downstream) indicating to the publisher (upstream) how many items it -can consume. With this protocol, the consumers are never flooded. + Reactive Messaging connects components to build a reactive stream. + In a reactive stream, the emissions are controlled by the consumer + (downstream) indicating to the publisher (upstream) how many items it + can consume. With this protocol, the consumers are never flooded. ### Generating messages using CompletionStage @@ -300,7 +326,7 @@ directly either synchronously or asynchronously: ``` !!! note "What about metadata?" -With these methods, the metadata are automatically propagated. + With these methods, the metadata are automatically propagated. ## Processing streams @@ -322,7 +348,7 @@ You can receive either a (Reactive Streams) `Publisher`, a `Publisher` or a `Publisher` directly. !!!important -These signatures do not support metadata propagation. In the case of a -stream of `Message`, you need to propagate the metadata manually. In the -case of a stream of payload, propagation is not supported, and incoming -metadata are lost. + These signatures do not support metadata propagation. In the case of a + stream of `Message`, you need to propagate the metadata manually. In the + case of a stream of payload, propagation is not supported, and incoming + metadata are lost. diff --git a/documentation/src/main/java/genericpayload/GenericPayloadExample.java b/documentation/src/main/java/genericpayload/GenericPayloadExample.java new file mode 100644 index 0000000000..7be3be44ad --- /dev/null +++ b/documentation/src/main/java/genericpayload/GenericPayloadExample.java @@ -0,0 +1,35 @@ +package genericpayload; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Metadata; +import org.eclipse.microprofile.reactive.messaging.Outgoing; + +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.GenericPayload; +import messages.MyMetadata; + +@ApplicationScoped +public class GenericPayloadExample { + + // + @Outgoing("out") + Multi> produce() { + return Multi.createFrom().range(0, 100) + .map(i -> GenericPayload.of(">> " + i, Metadata.of(new MyMetadata()))); + } + // + + // + @Incoming("in") + @Outgoing("out") + GenericPayload process(int payload, MyMetadata metadata) { + // use the injected metadata + String id = metadata.getId(); + return GenericPayload.of(">> " + payload + " " + id, + Metadata.of(metadata, new MyMetadata("Bob", "Alice"))); + } + // + +} diff --git a/documentation/src/main/java/kafka/inbound/KafkaCheckpointExample.java b/documentation/src/main/java/kafka/inbound/KafkaCheckpointExample.java index e2342f650e..ffe8db0526 100644 --- a/documentation/src/main/java/kafka/inbound/KafkaCheckpointExample.java +++ b/documentation/src/main/java/kafka/inbound/KafkaCheckpointExample.java @@ -5,8 +5,8 @@ import jakarta.enterprise.context.ApplicationScoped; import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; -import io.smallrye.reactive.messaging.kafka.KafkaRecord; import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata; @ApplicationScoped @@ -14,7 +14,7 @@ public class KafkaCheckpointExample { // @Incoming("prices") - public CompletionStage consume(KafkaRecord record) { + public CompletionStage consume(Message record) { // Get the `CheckpointMetadata` from the incoming message CheckpointMetadata checkpoint = CheckpointMetadata.fromMessage(record); diff --git a/documentation/src/main/java/kafka/inbound/KafkaDeadLetterExample.java b/documentation/src/main/java/kafka/inbound/KafkaDeadLetterExample.java index 45b675674a..b83bdd08c0 100644 --- a/documentation/src/main/java/kafka/inbound/KafkaDeadLetterExample.java +++ b/documentation/src/main/java/kafka/inbound/KafkaDeadLetterExample.java @@ -7,9 +7,9 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.Metadata; -import io.smallrye.reactive.messaging.kafka.KafkaRecord; import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata; @ApplicationScoped @@ -17,7 +17,7 @@ public class KafkaDeadLetterExample { // @Incoming("in") - public CompletionStage consume(KafkaRecord message) { + public CompletionStage consume(Message message) { return message.nack(new Exception("Failed!"), Metadata.of( OutgoingKafkaRecordMetadata.builder() .withKey("failed-record") diff --git a/documentation/src/main/java/kafka/inbound/KafkaRebalancedConsumer.java b/documentation/src/main/java/kafka/inbound/KafkaRebalancedConsumer.java index 77c9cd4e83..814461dc26 100644 --- a/documentation/src/main/java/kafka/inbound/KafkaRebalancedConsumer.java +++ b/documentation/src/main/java/kafka/inbound/KafkaRebalancedConsumer.java @@ -7,15 +7,14 @@ import org.eclipse.microprofile.reactive.messaging.Acknowledgment; import org.eclipse.microprofile.reactive.messaging.Incoming; - -import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord; +import org.eclipse.microprofile.reactive.messaging.Message; @ApplicationScoped public class KafkaRebalancedConsumer { @Incoming("rebalanced-example") @Acknowledgment(Acknowledgment.Strategy.NONE) - public CompletionStage consume(IncomingKafkaRecord message) { + public CompletionStage consume(Message message) { // We don't need to ACK messages because in this example we set offset during consumer re-balance return CompletableFuture.completedFuture(null); } diff --git a/documentation/src/main/java/kafka/outbound/KafkaExactlyOnceProcessor.java b/documentation/src/main/java/kafka/outbound/KafkaExactlyOnceProcessor.java index 254c16c7d3..49b973232e 100644 --- a/documentation/src/main/java/kafka/outbound/KafkaExactlyOnceProcessor.java +++ b/documentation/src/main/java/kafka/outbound/KafkaExactlyOnceProcessor.java @@ -1,14 +1,17 @@ package kafka.outbound; +import java.util.List; + import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.kafka.KafkaRecord; -import io.smallrye.reactive.messaging.kafka.KafkaRecordBatch; +import io.smallrye.reactive.messaging.kafka.Record; import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions; @ApplicationScoped @@ -19,10 +22,10 @@ public class KafkaExactlyOnceProcessor { KafkaTransactions txProducer; @Incoming("in-channel") - public Uni emitInTransaction(KafkaRecordBatch batch) { + public Uni emitInTransaction(Message>> batch) { return txProducer.withTransaction(batch, emitter -> { - for (KafkaRecord record : batch) { - emitter.send(KafkaRecord.of(record.getKey(), record.getPayload() + 1)); + for (Record record : batch.getPayload()) { + emitter.send(KafkaRecord.of(record.key(), record.value() + 1)); } return Uni.createFrom().voidItem(); }); diff --git a/documentation/src/main/java/pulsar/inbound/PulsarMessageBatchExample.java b/documentation/src/main/java/pulsar/inbound/PulsarMessageBatchExample.java index 6ca4e2985e..2265e4a465 100644 --- a/documentation/src/main/java/pulsar/inbound/PulsarMessageBatchExample.java +++ b/documentation/src/main/java/pulsar/inbound/PulsarMessageBatchExample.java @@ -1,38 +1,36 @@ package pulsar.inbound; +import java.util.List; import java.util.concurrent.CompletionStage; import jakarta.enterprise.context.ApplicationScoped; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.Messages; import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; -import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessage; -import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessageMetadata; -import io.smallrye.reactive.messaging.pulsar.PulsarMessage; +import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessageMetadata; @ApplicationScoped public class PulsarMessageBatchExample { // @Incoming("prices") - public CompletionStage consumeMessage(PulsarIncomingBatchMessage messages) { - for (PulsarMessage msg : messages) { - msg.getMetadata(PulsarIncomingMessageMetadata.class).ifPresent(metadata -> { - String key = metadata.getKey(); - String topic = metadata.getTopicName(); - long timestamp = metadata.getEventTime(); + public CompletionStage consumeMessage(Message> messages) { + messages.getMetadata(PulsarIncomingBatchMessageMetadata.class).ifPresent(metadata -> { + for (org.apache.pulsar.client.api.Message message : metadata.getMessages()) { + String key = message.getKey(); + String topic = message.getTopicName(); + long timestamp = message.getEventTime(); //... process messages - }); - } + } + }); // ack will commit the latest offsets (per partition) of the batch. return messages.ack(); } @Incoming("prices") - public void consumeRecords(Messages messages) { - for (Message msg : messages) { + public void consumeRecords(org.apache.pulsar.client.api.Messages messages) { + for (org.apache.pulsar.client.api.Message msg : messages) { //... process messages } } diff --git a/documentation/src/main/java/pulsar/outbound/PulsarExactlyOnceProcessor.java b/documentation/src/main/java/pulsar/outbound/PulsarExactlyOnceProcessor.java index 1d881b8fa9..c207cba704 100644 --- a/documentation/src/main/java/pulsar/outbound/PulsarExactlyOnceProcessor.java +++ b/documentation/src/main/java/pulsar/outbound/PulsarExactlyOnceProcessor.java @@ -1,13 +1,16 @@ package pulsar.outbound; +import java.util.List; + import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; import io.smallrye.mutiny.Uni; -import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessage; +import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessageMetadata; import io.smallrye.reactive.messaging.pulsar.PulsarMessage; import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions; @@ -19,10 +22,11 @@ public class PulsarExactlyOnceProcessor { PulsarTransactions txProducer; @Incoming("in-channel") - public Uni emitInTransaction(PulsarIncomingBatchMessage batch) { + public Uni emitInTransaction(Message> batch) { return txProducer.withTransactionAndAck(batch, emitter -> { - for (PulsarMessage record : batch) { - emitter.send(PulsarMessage.of(record.getPayload() + 1, record.getKey())); + PulsarIncomingBatchMessageMetadata metadata = batch.getMetadata(PulsarIncomingBatchMessageMetadata.class).get(); + for (org.apache.pulsar.client.api.Message message : metadata. getMessages()) { + emitter.send(PulsarMessage.of(message.getValue() + 1, message.getKey())); } return Uni.createFrom().voidItem(); }); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/GenericPayloadTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/GenericPayloadTest.java new file mode 100644 index 0000000000..2700e05b88 --- /dev/null +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/GenericPayloadTest.java @@ -0,0 +1,73 @@ +package io.smallrye.reactive.messaging.kafka; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.charset.StandardCharsets; +import java.util.UUID; +import java.util.regex.Pattern; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.serialization.StringSerializer; +import org.eclipse.microprofile.reactive.messaging.Metadata; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.junit.jupiter.api.Test; + +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.GenericPayload; +import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata; +import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase; +import io.smallrye.reactive.messaging.kafka.converters.ConsumerRecordConverter; +import io.smallrye.reactive.messaging.providers.helpers.GenericPayloadConverter; + +public class GenericPayloadTest extends KafkaCompanionTestBase { + private static final String TOPIC_NAME_BASE = "GenericPayloadTest-" + UUID.randomUUID() + "-"; + + @Test + public void testProduce() { + for (int i = 0; i < 10; i++) { + companion.topics().createAndWait(TOPIC_NAME_BASE + i, 1); + } + + addBeans(GenericPayloadConverter.class); + addBeans(ConsumerRecordConverter.class); + runApplication(kafkaConfig("mp.messaging.outgoing.generated-producer") + .put("topic", "nonexistent-topic") + .put("key.serializer", StringSerializer.class.getName()) + .put("value.serializer", StringSerializer.class.getName()), MyApp.class); + + assertThat(companion.consumeStrings().fromTopics(Pattern.compile(TOPIC_NAME_BASE + ".+"), 10) + .awaitCompletion()).allSatisfy(consumerRecord -> { + assertThat(consumerRecord.key()).startsWith("key-"); + assertThat(consumerRecord.value()).startsWith("value-"); + assertThat(consumerRecord.topic()).startsWith(TOPIC_NAME_BASE); + + assertThat(consumerRecord.headers()).allSatisfy(header -> { + assertThat(header.key()).startsWith("my-header-"); + assertThat(new String(header.value(), StandardCharsets.UTF_8)).startsWith("my-header-value-"); + }); + }); + + } + + @ApplicationScoped + public static class MyApp { + + @Outgoing("generated-producer") + public Multi> produce() { + return Multi.createFrom().range(0, 10).map(id -> { + Headers headersToBeUsed = new RecordHeaders() + .add("my-header-" + id, ("my-header-value-" + id).getBytes(StandardCharsets.UTF_8)); + return GenericPayload.of("value-" + id, Metadata.of( + OutgoingKafkaRecordMetadata. builder() + .withTopic(TOPIC_NAME_BASE + id) + .withKey("key-" + id) + .withHeaders(headersToBeUsed) + .build())); + }); + } + + } +} diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/AbstractMediator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/AbstractMediator.java index d947ec0ae8..0d0f7c9539 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/AbstractMediator.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/AbstractMediator.java @@ -327,4 +327,8 @@ protected Uni> extractTargetedMessage(String outgoing, Mess return Uni.createFrom().item(message); } + protected Message payloadToMessage(Object payload) { + return (payload instanceof GenericPayload) ? ((GenericPayload) payload).toMessage() : Message.of(payload); + } + } diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java index cde6e8afe1..7efa2d12a7 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java @@ -19,7 +19,9 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.GenericPayload; import io.smallrye.reactive.messaging.MediatorConfiguration; +import io.smallrye.reactive.messaging.Messages; import io.smallrye.reactive.messaging.Shape; import io.smallrye.reactive.messaging.providers.helpers.AcknowledgementCoordinator; import io.smallrye.reactive.messaging.providers.helpers.ClassUtils; @@ -224,7 +226,7 @@ private void processMethodReturningAProcessorBuilderOfPayloads() { Multi multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) .onItem().transform(Message::getPayload); return MultiUtils.via(multi, AdaptersToFlow.processor(returnedProcessorBuilder.buildRs())) - .onItem().transform(Message::of); + .onItem().transform(this::payloadToMessage); }; } @@ -236,7 +238,7 @@ private void processMethodReturningAReactiveStreamsProcessorOfPayloads() { Multi multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) .onItem().transform(Message::getPayload); return MultiUtils.via(multi, AdaptersToFlow.processor(returnedProcessor)) - .onItem().transform(Message::of); + .onItem().transform(this::payloadToMessage); }; } @@ -248,7 +250,7 @@ private void processMethodReturningAProcessorOfPayloads() { Multi multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) .onItem().transform(Message::getPayload); return MultiUtils.via(multi, returnedProcessor) - .onItem().transform(Message::of); + .onItem().transform(this::payloadToMessage); }; } @@ -391,6 +393,15 @@ private Uni> handlePostInvocation(Message message, throw ex.processingException(getMethodAsString(), fail); } } else if (res != null) { + if (res instanceof GenericPayload) { + GenericPayload genericPayload = (GenericPayload) res; + if (isPostAck()) { + return Uni.createFrom().item(genericPayload.toMessage(message)); + } else { + return Uni.createFrom().item(Message.of(genericPayload.getPayload(), + Messages.merge(message.getMetadata(), genericPayload.getMetadata()))); + } + } if (isPostAck()) { return Uni.createFrom().item(message.withPayload(res)); } else { diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/PublisherMediator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/PublisherMediator.java index 7b4d69beb7..743b64a6fa 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/PublisherMediator.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/PublisherMediator.java @@ -112,7 +112,8 @@ private void produceAPublisherBuilderOfMessages() { private

void produceAPublisherBuilderOfPayloads() { PublisherBuilder

builder = invoke(); - this.publisher = decorate(MultiUtils.publisher(AdaptersToFlow.publisher(builder.map(Message::of).buildRs()))); + this.publisher = decorate(MultiUtils.publisher( + AdaptersToFlow.publisher(builder.map(this::payloadToMessage).buildRs()))); } private void produceAPublisherOfMessages() { @@ -127,12 +128,12 @@ private void produceAReactiveStreamsPublisherOfMessages() { private

void produceAPublisherOfPayloads() { Flow.Publisher

pub = invoke(); - this.publisher = decorate(MultiUtils.publisher(pub).map(Message::of)); + this.publisher = decorate(MultiUtils.publisher(pub).map(this::payloadToMessage)); } private

void produceAReactiveStreamsPublisherOfPayloads() { Publisher

pub = invoke(); - this.publisher = decorate(Multi.createFrom().publisher(AdaptersToFlow.publisher(pub)).map(Message::of)); + this.publisher = decorate(Multi.createFrom().publisher(AdaptersToFlow.publisher(pub)).map(this::payloadToMessage)); } private void produceIndividualMessages() { @@ -160,15 +161,15 @@ private void produceIndividualPayloads() { if (configuration.isBlockingExecutionOrdered()) { this.publisher = decorate(MultiUtils.createFromGenerator(this::invokeBlocking) .onItem().transformToUniAndConcatenate(u -> u) - .onItem().transform(Message::of)); + .onItem().transform(this::payloadToMessage)); } else { this.publisher = decorate(MultiUtils.createFromGenerator(this::invokeBlocking) .onItem().transformToUni(u -> u).merge(maxConcurrency()) - .onItem().transform(Message::of)); + .onItem().transform(this::payloadToMessage)); } } else { this.publisher = decorate(MultiUtils.createFromGenerator(this::invoke) - .onItem().transform(Message::of)); + .onItem().transform(this::payloadToMessage)); } } @@ -179,7 +180,7 @@ private void produceIndividualCompletionStageOfMessages() { private

void produceIndividualCompletionStageOfPayloads() { this.publisher = decorate(MultiUtils.> createFromGenerator(this::invoke) - .onItem().transformToUniAndConcatenate(cs -> Uni.createFrom().completionStage(cs).map(Message::of))); + .onItem().transformToUniAndConcatenate(cs -> Uni.createFrom().completionStage(cs).map(this::payloadToMessage))); } private void produceIndividualUniOfMessages() { @@ -189,6 +190,6 @@ private void produceIndividualUniOfMessages() { private void produceIndividualUniOfPayloads() { this.publisher = decorate(MultiUtils.> createFromGenerator(this::invoke) - .onItem().transformToUniAndConcatenate(u -> u.map(Message::of))); + .onItem().transformToUniAndConcatenate(u -> u.map(this::payloadToMessage))); } } diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/StreamTransformerMediator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/StreamTransformerMediator.java index 06bc40f8ff..19c47842b5 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/StreamTransformerMediator.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/StreamTransformerMediator.java @@ -261,7 +261,7 @@ private void processMethodConsumingAPublisherBuilderOfPayload() { PublisherBuilder result = invoke(argument); Objects.requireNonNull(result, msg.methodReturnedNull(configuration.methodAsString())); return MultiUtils.publisher(AdaptersToFlow.publisher(result.buildRs())) - .onItem().transform(Message::of); + .onItem().transform(this::payloadToMessage); }; } @@ -273,7 +273,7 @@ private void processMethodConsumingAReactiveStreamsPublisherOfPayload() { Publisher result = invoke(argument); Objects.requireNonNull(result, msg.methodReturnedNull(configuration.methodAsString())); return Multi.createFrom().publisher(AdaptersToFlow.publisher(result)) - .onItem().transform(Message::of); + .onItem().transform(this::payloadToMessage); }; } @@ -291,7 +291,7 @@ private > MultiSplitter fillSplitFunction(Flow.Publisher MultiSplitter result = invoke(argument); Map keyChannelMappings = findKeyOutgoingChannelMappings(result.keyType().getEnumConstants()); keyChannelMappings.forEach((key, outgoing) -> { - Multi> m = result.get(key).onItem().transform(Message::of) + Multi> m = result.get(key).onItem().transform(this::payloadToMessage) // concat map with prefetch handles the request starvation issue with SplitMulti and also syncs requests .onItem().transformToUni(u -> Uni.createFrom().item(u)).concatenate(true); outgoingPublisherMap.put(outgoing, m); @@ -307,7 +307,7 @@ private void processMethodConsumingAPublisherOfPayload() { Flow.Publisher result = invoke(argument); Objects.requireNonNull(result, msg.methodReturnedNull(configuration.methodAsString())); return MultiUtils.publisher(result) - .onItem().transform(Message::of); + .onItem().transform(this::payloadToMessage); }; } @@ -319,7 +319,7 @@ private void processMethodConsumingAPublisherOfKeyValue() { .flatMap(km -> { Flow.Publisher result = invoke(km); return Objects.requireNonNull(result, msg.methodReturnedNull(configuration.methodAsString())); - }).map(Message::of); + }).map(this::payloadToMessage); }; } diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/GenericPayloadConverter.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/GenericPayloadConverter.java new file mode 100644 index 0000000000..7da5c5c8ea --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/GenericPayloadConverter.java @@ -0,0 +1,24 @@ +package io.smallrye.reactive.messaging.providers.helpers; + +import java.lang.reflect.Type; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.reactive.messaging.GenericPayload; +import io.smallrye.reactive.messaging.MessageConverter; + +@ApplicationScoped +public class GenericPayloadConverter implements MessageConverter { + + @Override + public boolean canConvert(Message in, Type target) { + return TypeUtils.getRawTypeIfParameterized(target).equals(GenericPayload.class); + } + + @Override + public Message convert(Message in, Type target) { + return in.withPayload(GenericPayload.from(in)); + } +} diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/GenericPayloadTest.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/GenericPayloadTest.java new file mode 100644 index 0000000000..47a0753d33 --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/GenericPayloadTest.java @@ -0,0 +1,161 @@ +package io.smallrye.reactive.messaging; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Metadata; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.junit.jupiter.api.Test; + +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.providers.helpers.GenericPayloadConverter; + +public class GenericPayloadTest extends WeldTestBaseWithoutTails { + + @Test + public void testIncomingGenericPayload() { + addBeanClass(MyCollector.class); + addBeanClass(GenericPayloadConverter.class); + addBeanClass(GenericPayloadConsumer.class); + + initialize(); + + GenericPayloadConsumer consumer = get(GenericPayloadConsumer.class); + await().untilAsserted(() -> assertThat(consumer.received()) + .hasSize(10) + .extracting(GenericPayload::getPayload) + .containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)); + } + + @ApplicationScoped + public static class GenericPayloadConsumer { + + List> payloads = new CopyOnWriteArrayList<>(); + + @Incoming("count") + void consume(GenericPayload payload) { + payloads.add(payload); + } + + public List> received() { + return payloads; + } + } + + @Test + public void testOutgoingGenericPayload() { + addBeanClass(MyCollector.class); + addBeanClass(GenericPayloadProducer.class); + + initialize(); + + GenericPayloadProducer producer = get(GenericPayloadProducer.class); + MyCollector collector = get(MyCollector.class); + await().untilAsserted(() -> assertThat(collector.messages()) + .hasSize(10) + .allSatisfy(m -> assertThat(m.getMetadata(Object.class)).hasValue(producer.metadata())) + .extracting(Message::getPayload) + .containsExactly("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")); + } + + @ApplicationScoped + public static class GenericPayloadProducer { + + Object metadata = new Object(); + + @Outgoing("sink") + Multi> produce() { + return Multi.createFrom().range(0, 10) + .map(i -> GenericPayload.of("" + i, Metadata.of(metadata))); + } + + public Object metadata() { + return metadata; + } + + } + + @Test + public void testProcessorGenericPayload() { + addBeanClass(MyCollector.class); + addBeanClass(GenericPayloadProcessor.class); + addBeanClass(GenericPayloadConverter.class); + + initialize(); + + GenericPayloadProcessor processor = get(GenericPayloadProcessor.class); + MyCollector collector = get(MyCollector.class); + await().untilAsserted(() -> assertThat(collector.messages()) + .hasSize(10) + .allSatisfy(m -> assertThat(m.getMetadata(Object.class).get()).isEqualTo(processor.metadata())) + .extracting(Message::getPayload) + .containsExactly("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")); + } + + @ApplicationScoped + private static class GenericPayloadProcessor { + Object metadata = new Object(); + + @Incoming("count") + @Outgoing("sink") + GenericPayload process(GenericPayload payload) { + return payload.withPayload("" + payload.getPayload()) + .withMetadata(Metadata.of(metadata)); + } + + public Object metadata() { + return metadata; + } + + } + + @Test + public void testEmitterGenericPayloadNotSupported() { + addBeanClass(MyCollector.class); + addBeanClass(GenericPayloadEmitter.class); + addBeanClass(GenericPayloadConverter.class); + + initialize(); + + GenericPayloadEmitter emitter = get(GenericPayloadEmitter.class); + MyCollector collector = get(MyCollector.class); + + emitter.send10(); + + await().untilAsserted(() -> assertThat(collector.messages()) + .hasSize(10) + .allSatisfy(m -> assertThat(m.getMetadata(Object.class)).isEmpty()) + .extracting(m -> (GenericPayload) (Object) m.getPayload()) + .allSatisfy(p -> assertThat(p).isInstanceOf(GenericPayload.class))); + } + + @ApplicationScoped + private static class GenericPayloadEmitter { + + Object metadata = new Object(); + + @Inject + @Channel("sink") + Emitter> emitter; + + void send10() { + for (int i = 0; i < 10; i++) { + emitter.send(GenericPayload.of("" + i, Metadata.of(metadata))); + } + } + + public Object metadata() { + return metadata; + } + } +}