Skip to content

Commit

Permalink
Introduce GenericPayload primarily for sending outgoing payloads with…
Browse files Browse the repository at this point in the history
… additional metadata
  • Loading branch information
ozangunalp committed Mar 22, 2024
1 parent d2b5012 commit ddd895d
Show file tree
Hide file tree
Showing 20 changed files with 570 additions and 71 deletions.
14 changes: 12 additions & 2 deletions api/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,17 @@
"criticality" : "highlight",
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [ ]
"differences" : [
{
"ignore": true,
"code": "java.method.visibilityIncreased",
"old": "method org.eclipse.microprofile.reactive.messaging.Metadata io.smallrye.reactive.messaging.Messages::merge(org.eclipse.microprofile.reactive.messaging.Metadata, org.eclipse.microprofile.reactive.messaging.Metadata)",
"new": "method org.eclipse.microprofile.reactive.messaging.Metadata io.smallrye.reactive.messaging.Messages::merge(org.eclipse.microprofile.reactive.messaging.Metadata, org.eclipse.microprofile.reactive.messaging.Metadata)",
"oldVisibility": "private",
"newVisibility": "public",
"justification": "Metadata merge utility function exposed from the Messages API"
}
]
}
}, {
"extension" : "revapi.reporter.json",
Expand All @@ -46,4 +56,4 @@
"minCriticality" : "documented",
"output" : "out"
}
} ]
} ]
115 changes: 115 additions & 0 deletions api/src/main/java/io/smallrye/reactive/messaging/GenericPayload.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package io.smallrye.reactive.messaging;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;

/**
* A generic payload that can be used to wrap a payload with metadata.
* Allows associating a payload with metadata to be sent as a message,
* without using signatures supporting {@code Message<T>}.
*
* @param <T> the type of the payload
*/
public class GenericPayload<T> {

/**
* Creates a new payload with the given payload and empty metadata.
*
* @param payload the payload
* @param <T> the type of the payload
* @return the payload
*/
public static <T> GenericPayload<T> of(T payload) {
return new GenericPayload<>(payload, Metadata.empty());
}

/**
* Creates a new payload with the given payload and metadata.
*
* @param payload the payload
* @param metadata the metadata
* @param <T> the type of the payload
* @return the payload
*/
public static <T> GenericPayload<T> of(T payload, Metadata metadata) {
return new GenericPayload<>(payload, metadata);
}

/**
* Creates a new payload from the given message.
*
* @param message the message
* @param <T> the type of the payload
* @return the payload
*/
public static <T> GenericPayload<T> from(Message<T> message) {
return new GenericPayload<>(message.getPayload(), message.getMetadata());
}

private final T payload;
private final Metadata metadata;

public GenericPayload(T payload, Metadata metadata) {
this.payload = payload;
this.metadata = metadata;
}

/**
* Gets the payload associated with this payload.
*
* @return the payload
*/
public T getPayload() {
return payload;
}

/**
* Gets the metadata associated with this payload.
*
* @return the metadata
*/
public Metadata getMetadata() {
return metadata;
}

/**
* Adds metadata to this payload.
*
* @param metadata the metadata to add
* @return a new payload with the added metadata
*/
public GenericPayload<T> withMetadata(Metadata metadata) {
return GenericPayload.of(this.payload, metadata);
}

/**
* Adds metadata to this payload.
*
* @param payload the payload to add
* @return a new payload with the added metadata
*/
public <R> GenericPayload<R> withPayload(R payload) {
return GenericPayload.of(payload, this.metadata);
}

/**
* Converts this payload to a message.
*
* @return the message with the payload and metadata
*/
public Message<T> toMessage() {
return Message.of(payload, metadata);
}

/**
* Converts this payload to a message, merging the metadata with the given message.
*
* @param message the message to merge the metadata with
* @return the message with the payload and merged metadata
*/
public Message<T> toMessage(Message<?> message) {
Metadata merged = Messages.merge(message.getMetadata(), this.metadata);
return message.withPayload(payload).withMetadata(merged);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ static <T> Message<List<T>> merge(List<Message<T>> list) {
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private static Metadata merge(Metadata first, Metadata second) {
static Metadata merge(Metadata first, Metadata second) {
Metadata result = first;
for (Object meta : second) {
Class<?> clazz = meta.getClass();
Expand Down
1 change: 1 addition & 0 deletions documentation/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ nav:
- 'Advanced Configuration' : concepts/advanced-config.md
- 'Message Context' : concepts/message-context.md
- 'Metadata Injection': concepts/incoming-metadata-injection.md
- 'Generic Payloads': concepts/generic-payloads.md

- Kafka:
- kafka/kafka.md
Expand Down
34 changes: 34 additions & 0 deletions documentation/src/main/docs/concepts/generic-payloads.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Generic Payloads

!!!warning "Experimental"
Generic payloads are an experimental feature and the API is subject to change.

When using reactive messaging, `Message` flow in your system
each message has a payload but can also contain _metadata_, as explained in [Messages, Payload, Metadata](concepts.md#messages-payload-metadata).
The metadata can hold information, for example in an outgoing channel, additional properties of the outgoing message to be sent to the broker.

It is sometimes preferable to continue using the [payload signatures](model.md#messages-vs-payloads),
and also being able to attach metadata.
Using `GenericPayload` allows customizing metadata when handling payloads in SmallRye Reactive Messaging `@Incoming` and `@Outgoing` methods.
`GenericPayload` is a wrapper type, like the `Message`, containing a payload and metadata,
without requiring handling acknowledgments manually.

``` java
{{ insert('genericpayload/GenericPayloadExample.java', 'code') }}
```

You can combine generic payloads with [metadata injection](incoming-metadata-injection.md) :


``` java
{{ insert('genericpayload/GenericPayloadExample.java', 'injection') }}
```

Note that the metadata provided with the outgoing generic payload is merged with the incoming message metadata.

!!! warning "Limitations"
There are several limitations for the use of `GenericPayload`:
`GenericPayload` is not supported in emitters, as normal outgoing `Message` can be used for that purpose.
While `GenericPayload<T>` can be used as an incoming payload type,
[message converters](converters.md) are not applied to the payload type `T`.

70 changes: 48 additions & 22 deletions documentation/src/main/docs/concepts/model.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ These annotations are used on *methods*:
{{ insert('beans/MessageProcessingBean.java') }}
```

!!! note
Reactive Messaging beans can either be in the *application* scope (`@ApplicationScoped`) or dependent scope (`@Dependent`).
!!!note
Reactive Messaging beans can either be in the *application* scope (`@ApplicationScoped`) or dependent scope (`@Dependent`).

Manipulating messages can be cumbersome.
When you are only interested in the payload, you can use the following syntax: The following code is equivalent to the snippet from above:
Expand All @@ -27,10 +27,10 @@ When you are only interested in the payload, you can use the following syntax: T
{{ insert('beans/PayloadProcessingBean.java') }}
```

!!! important
You should not call methods annotated with `@Incoming` and/or
`@Outgoing` directly from your code. They are invoked by the framework.
Having user code invoking them would not have the expected outcome.
!!!important
You should not call methods annotated with `@Incoming` and/or
`@Outgoing` directly from your code. They are invoked by the framework.
Having user code invoking them would not have the expected outcome.


SmallRye Reactive Messaging automatically binds matching `@Outgoing` to
Expand Down Expand Up @@ -83,15 +83,41 @@ You can also create new instance of `Message` from an existing one:
``` java
{{ insert('messages/MessageExamples.java', 'copy') }}
```

!!! note "Acknowledgement?"
Acknowledgement is an important part of messaging systems. This will be
covered in the [acknowledgement](acknowledgement.md)
section.
!!! warning "Acknowledgement?"
Acknowledgement is an important part of messaging systems. This will be covered in the [acknowledgement](acknowledgement.md) section.

!!! note "Connector Metadata"
Most connectors are providing metadata to let you extract technical
details about the message, but also customize the outbound dispatching.
Most connectors are providing metadata to let you extract technical
details about the message, but also customize the outbound dispatching.

## Messages vs. Payloads

Reactive messaging offers flexibility when it comes to handling messages and their acknowledgements.
The application developer can choose to finely handle acknowledgements per-message basis, by handling the
`Message`-based signatures. Otherwise, when handling payloads, acknowledgements
(and negative-acknowledgements) are handled by the framework.
The following sections in this documentation detail both development models.

While being the easier development model, in the past using payload-based signatures did not allow associating connector-specific metadata,
making the `Message`-based signatures the de-facto choice even for the most common scenarios.
This lead to using the connector custom-message implementation types,
such as `IncomingKafkaRecord`, `KafkaRecord` or `IncomingRabbitMQMessage`,
as a convenience for accessing connector-specific metadata.

Not only this forces to handle acknowledgements manually,
it also doesn't allow for incoming or outgoing `Messages` to be [intercepted](decorators.md#intercepting-incoming-and-outgoing-messages)
or [observed](observability.md).

!!! warning "Custom `Message` types & Message interception"
Custom `Message` types such as `IncomingKafkaRecord`, `KafkaRecord` or `IncomingRabbitMQMessage`
are not compatible with features intercepting messages.
Therefore, it is no longer recommended to use custom `Message` implementations in consumptions methods.
Instead, you can either use the generic `Message` type and access specific metadata,
or use the payload with [metadata injection](incoming-metadata-injection.md)

Since [incoming metadata injection](incoming-metadata-injection.md) and [generic payloads](generic-payloads.md)
features added to SmallRye Reactive Messaging,
it is easier to use payload signatures and benefit from acknowledgement handling and still access connector-specific metadata.

## Generating Messages

Expand All @@ -109,10 +135,10 @@ called for every *request* from the downstream:
```

!!! note "Requests?"
Reactive Messaging connects components to build a reactive stream.
In a reactive stream, the emissions are controlled by the consumer
(downstream) indicating to the publisher (upstream) how many items it
can consume. With this protocol, the consumers are never flooded.
Reactive Messaging connects components to build a reactive stream.
In a reactive stream, the emissions are controlled by the consumer
(downstream) indicating to the publisher (upstream) how many items it
can consume. With this protocol, the consumers are never flooded.


### Generating messages using CompletionStage
Expand Down Expand Up @@ -300,7 +326,7 @@ directly either synchronously or asynchronously:
```

!!! note "What about metadata?"
With these methods, the metadata are automatically propagated.
With these methods, the metadata are automatically propagated.

## Processing streams

Expand All @@ -322,7 +348,7 @@ You can receive either a (Reactive Streams) `Publisher`, a
`Publisher` or a `Publisher` directly.

!!!important
These signatures do not support metadata propagation. In the case of a
stream of `Message`, you need to propagate the metadata manually. In the
case of a stream of payload, propagation is not supported, and incoming
metadata are lost.
These signatures do not support metadata propagation. In the case of a
stream of `Message`, you need to propagate the metadata manually. In the
case of a stream of payload, propagation is not supported, and incoming
metadata are lost.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package genericpayload;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.GenericPayload;
import messages.MyMetadata;

@ApplicationScoped
public class GenericPayloadExample {

// <code>
@Outgoing("out")
Multi<GenericPayload<String>> produce() {
return Multi.createFrom().range(0, 100)
.map(i -> GenericPayload.of(">> " + i, Metadata.of(new MyMetadata())));
}
// </code>

// <injection>
@Incoming("in")
@Outgoing("out")
GenericPayload<String> process(int payload, MyMetadata metadata) {
// use the injected metadata
String id = metadata.getId();
return GenericPayload.of(">> " + payload + " " + id,
Metadata.of(metadata, new MyMetadata("Bob", "Alice")));
}
// </injection>

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;

@ApplicationScoped
public class KafkaCheckpointExample {

// <code>
@Incoming("prices")
public CompletionStage<Void> consume(KafkaRecord<String, Double> record) {
public CompletionStage<Void> consume(Message<Double> record) {
// Get the `CheckpointMetadata` from the incoming message
CheckpointMetadata<Double> checkpoint = CheckpointMetadata.fromMessage(record);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@

import org.apache.kafka.common.header.internals.RecordHeaders;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;

@ApplicationScoped
public class KafkaDeadLetterExample {

// <code>
@Incoming("in")
public CompletionStage<Void> consume(KafkaRecord<String, String> message) {
public CompletionStage<Void> consume(Message<String> message) {
return message.nack(new Exception("Failed!"), Metadata.of(
OutgoingKafkaRecordMetadata.builder()
.withKey("failed-record")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@

import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import org.eclipse.microprofile.reactive.messaging.Message;

@ApplicationScoped
public class KafkaRebalancedConsumer {

@Incoming("rebalanced-example")
@Acknowledgment(Acknowledgment.Strategy.NONE)
public CompletionStage<Void> consume(IncomingKafkaRecord<Integer, String> message) {
public CompletionStage<Void> consume(Message<String> message) {
// We don't need to ACK messages because in this example we set offset during consumer re-balance
return CompletableFuture.completedFuture(null);
}
Expand Down
Loading

0 comments on commit ddd895d

Please sign in to comment.