From dfb79f801ed2cfbc1c1ef3783d2d2db5280a54dd Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Tue, 25 Jul 2023 16:02:32 +0200 Subject: [PATCH] Provide a utility method to coordinate (negative) acknowledgement when multiple messages are created from one --- .../smallrye/reactive/messaging/Messages.java | 154 +++++++++++++++++- .../reactive/messaging/MessagesTest.java | 142 ++++++++++++++++ 2 files changed, 295 insertions(+), 1 deletion(-) diff --git a/api/src/main/java/io/smallrye/reactive/messaging/Messages.java b/api/src/main/java/io/smallrye/reactive/messaging/Messages.java index 097039028d..a5e1d3fa32 100644 --- a/api/src/main/java/io/smallrye/reactive/messaging/Messages.java +++ b/api/src/main/java/io/smallrye/reactive/messaging/Messages.java @@ -1,11 +1,14 @@ package io.smallrye.reactive.messaging; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -13,12 +16,48 @@ import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.Metadata; +import io.smallrye.common.annotation.CheckReturnValue; + +/** + * A class handling coordination between messages. + */ public class Messages { private Messages() { // Avoid direct instantiation. } + /** + * Chains the given message with some other messages. + * It coordinates the acknowledgement. When all the other messages are acknowledged successfully, the passed + * message is acknowledged. If one of the other messages is acknowledged negatively, the passed message is also + * nacked (with the same reason). Subsequent ack/nack will be ignored. + *

+ * + * @param message the message + * @return the chain builder that let you decide how the metadata are passed, and the set of messages. + */ + @CheckReturnValue + public static MessageChainBuilder chain(Message message) { + return new MessageChainBuilder(message); + } + + /** + * Merges multiple messages into a single one. + * This is an implementation of a merge pattern: n messages combined into 1. + *

+ * Whe resulting message payload is computed using the combinator function. + * When the returned message is acked/nacked, the passes messages are acked/nacked accordingly. + *

+ * Metadata are also merged. The metadata of all the messages are copied into the resulting message. If, for a given + * class, the metadata is already present in the result message, it's either ignored, or merged if the class + * implements {@link MergeableMetadata}. + * + * @param list the list of message, must not be empty, must not be null + * @param combinator the combinator method, must not be null + * @param the payload type of the produced message + * @return the resulting message + */ public static Message merge(List> list, Function, T> combinator) { if (list.isEmpty()) { return Message.of(combinator.apply(Collections.emptyList())); @@ -59,6 +98,20 @@ public static Message merge(List> list, Function, T> c .withMetadata(metadata); } + /** + * Merges multiple messages into a single one. + *

+ * Whe resulting message payload is computed using the combinator function. + * When the returned message is acked/nacked, the passes messages are acked/nacked accordingly. + *

+ * Metadata are also merged. The metadata of all the messages are copied into the resulting message. If, for a given + * class, the metadata is already present in the result message, it's either ignored, or merged if the class + * implements {@link MergeableMetadata}. + * + * @param list the list of message, must not be empty, must not be null + * @param the payload type of the passed messages + * @return the resulting message + */ public static Message> merge(List> list) { if (list.isEmpty()) { return Message.of(Collections.emptyList()); @@ -90,7 +143,7 @@ public static Message> merge(List> list) { .withMetadata(metadata); } - @SuppressWarnings("unchecked") + @SuppressWarnings({ "unchecked", "rawtypes" }) private static Metadata merge(Metadata first, Metadata second) { Metadata result = first; for (Object meta : second) { @@ -121,4 +174,103 @@ private static Metadata merge(Metadata first, Metadata second) { return result; } + /** + * The message chain builder allows chaining message and configure metadata propagation. + * By default, all the metadata from the given message are copied into the chained messages. + */ + public static class MessageChainBuilder { + private final Message input; + private Metadata metadata; + + private MessageChainBuilder(Message message) { + this.input = message; + this.metadata = message.getMetadata().copy(); + } + + /** + * Do not copy any metadata from the initial message to the chained message. + * + * @return the current {@link MessageChainBuilder} + */ + @CheckReturnValue + public MessageChainBuilder withoutMetadata() { + this.metadata = Metadata.empty(); + return this; + } + + /** + * Copy the given metadata of the given classes from the initial message to the chained message, if the initial + * message does not include a metadata object of the given class. + * + * In general, this method must be used after {@link #withoutMetadata()}. + * + * @return the current {@link MessageChainBuilder} + */ + @CheckReturnValue + public MessageChainBuilder withMetadata(Class... mc) { + for (Class clazz : mc) { + Optional o = input.getMetadata().get(clazz); + o.ifPresent(value -> this.metadata = metadata.with(value)); + } + return this; + } + + /** + * Do not the given metadata of the given classes from the initial message to the chained message, if the initial + * message does not include a metadata object of the given class. + * + * @return the current {@link MessageChainBuilder} + */ + @CheckReturnValue + public MessageChainBuilder withoutMetadata(Class... mc) { + for (Class clazz : mc) { + this.metadata = this.metadata.without(clazz); + } + return this; + } + + /** + * Passed the chained messages. + * The messages are not modified, but should not be used afterward, and should be replaced by the messages contained + * in the returned list. + * This method preserve the order. So, the first message corresponds to the first message in the returned list. + * The message from the returned list have the necessary logic to chain the ack/nack signals and the copied metadata. + * + * @param messages the chained messages, must not be empty, must not be null, must not contain null + * @return the list of modified messages + */ + public List> with(Message... messages) { + AtomicBoolean done = new AtomicBoolean(); + + // Must be modifiable + List> trackers = Arrays.stream(messages).collect(Collectors.toCollection(CopyOnWriteArrayList::new)); + List> outcomes = new ArrayList<>(); + for (Message message : messages) { + Message tmp = message; + for (Object metadatum : metadata) { + tmp = tmp.addMetadata(metadatum); + } + outcomes.add(tmp + .withAck(() -> { + CompletionStage acked = message.ack(); + if (trackers.remove(message)) { + if (trackers.isEmpty() && done.compareAndSet(false, true)) { + return acked.thenCompose(x -> input.ack()); + } + } + return acked; + }) + .withNack((reason) -> { + CompletionStage nacked = message.nack(reason); + if (trackers.remove(message)) { + if (done.compareAndSet(false, true)) { + return nacked.thenCompose(x -> input.nack(reason)); + } + } + return nacked; + })); + } + return outcomes; + } + } } diff --git a/api/src/test/java/io/smallrye/reactive/messaging/MessagesTest.java b/api/src/test/java/io/smallrye/reactive/messaging/MessagesTest.java index 23bf489f75..aebf2f405e 100644 --- a/api/src/test/java/io/smallrye/reactive/messaging/MessagesTest.java +++ b/api/src/test/java/io/smallrye/reactive/messaging/MessagesTest.java @@ -8,6 +8,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.microprofile.reactive.messaging.Message; import org.junit.jupiter.api.Test; @@ -216,6 +217,147 @@ void checkWithEmptyList() { assertThat(Messages.merge(List.of()).getPayload()).isEqualTo(Collections.emptyList()); } + @Test + void checkSimpleChainAcknowledgement() { + AtomicBoolean o1Ack = new AtomicBoolean(); + AtomicBoolean o2Ack = new AtomicBoolean(); + AtomicInteger i1Ack = new AtomicInteger(); + Message o1 = Message.of("foo", () -> { + o1Ack.set(true); + return CompletableFuture.completedFuture(null); + }); + Message o2 = Message.of("bar", () -> { + o2Ack.set(true); + return CompletableFuture.completedFuture(null); + }); + + Message i = Message.of(1, () -> { + i1Ack.incrementAndGet(); + return CompletableFuture.completedFuture(null); + }); + + List> outcomes = Messages.chain(i).with(o1, o2); + assertThat(i1Ack).hasValue(0); + assertThat(o1Ack).isFalse(); + assertThat(o2Ack).isFalse(); + + outcomes.get(0).ack(); + assertThat(i1Ack).hasValue(0); + assertThat(o1Ack).isTrue(); + assertThat(o2Ack).isFalse(); + + outcomes.get(1).ack(); + assertThat(i1Ack).hasValue(1); + assertThat(o1Ack).isTrue(); + assertThat(o1Ack).isTrue(); + + outcomes.get(1).ack(); + outcomes.get(0).ack(); + assertThat(i1Ack).hasValue(1); + + outcomes.get(1).nack(new Exception("boom")); + outcomes.get(0).nack(new Exception("boom")); + assertThat(i1Ack).hasValue(1); + } + + @Test + void checkSimpleChainNegativeAcknowledgement() { + AtomicBoolean o1Ack = new AtomicBoolean(); + AtomicBoolean o2Ack = new AtomicBoolean(); + AtomicBoolean o1Nack = new AtomicBoolean(); + AtomicBoolean o2Nack = new AtomicBoolean(); + AtomicInteger i1Ack = new AtomicInteger(); + AtomicInteger i1Nack = new AtomicInteger(); + + Message o1 = Message.of("foo", () -> { + o1Ack.set(true); + return CompletableFuture.completedFuture(null); + }, t -> { + o1Nack.set(true); + return CompletableFuture.completedFuture(null); + }); + Message o2 = Message.of("bar", () -> { + o2Ack.set(true); + return CompletableFuture.completedFuture(null); + }, t -> { + o2Nack.set(true); + return CompletableFuture.completedFuture(null); + }); + + Message i = Message.of(1, () -> { + i1Ack.incrementAndGet(); + return CompletableFuture.completedFuture(null); + }, t -> { + i1Nack.incrementAndGet(); + return CompletableFuture.completedFuture(null); + }); + + List> outcomes = Messages.chain(i).with(o1, o2); + assertThat(i1Ack).hasValue(0); + assertThat(o1Ack).isFalse(); + assertThat(o2Ack).isFalse(); + assertThat(i1Nack).hasValue(0); + assertThat(o1Nack).isFalse(); + assertThat(o2Nack).isFalse(); + + outcomes.get(0).ack(); + assertThat(i1Ack).hasValue(0); + assertThat(o1Ack).isTrue(); + assertThat(o2Ack).isFalse(); + assertThat(i1Nack).hasValue(0); + assertThat(o1Nack).isFalse(); + assertThat(o2Nack).isFalse(); + + outcomes.get(0).nack(new Exception("boom")); + assertThat(i1Ack).hasValue(0); + assertThat(i1Nack).hasValue(0); + + outcomes.get(1).nack(new Exception("boom")); + assertThat(i1Nack).hasValue(1); + assertThat(i1Ack).hasValue(0); + assertThat(o2Nack).isTrue(); + + outcomes.get(1).ack(); + assertThat(i1Nack).hasValue(1); + assertThat(i1Ack).hasValue(0); + } + + @Test + void testChainWithMetadataSelection() { + Message i = Message.of(1) + .withMetadata(List.of(new NonMergeableMetadata("hello"), new MergeableMetadata("hello"), + new AnotherMetadata("hello"))); + + Message m1 = Message.of("a"); + AnotherMetadata am = new AnotherMetadata("hello"); + Message m2 = Message.of("b").addMetadata(am); + + // No metadata copied from the original message + List> out = Messages.chain(i).withoutMetadata().with(m1, m2); + assertThat(out.get(0).getMetadata()).isEmpty(); + assertThat(out.get(1).getMetadata()).hasSize(1).containsOnly(am); + + // All metadata are copied from the original message + out = Messages.chain(i).with(m1, m2); + assertThat(out.get(0).getMetadata()).hasSize(3); + assertThat(out.get(1).getMetadata()).hasSize(3).doesNotContain(am); + + // All metadata but MergeableMetadata are copied from the original message + out = Messages.chain(i).withoutMetadata(MergeableMetadata.class).with(m1, m2); + assertThat(out.get(0).getMetadata()).hasSize(2); + assertThat(out.get(1).getMetadata()).hasSize(2).doesNotContain(am); + + // All metadata but AnotherMetadata are copied from the original message + out = Messages.chain(i).withoutMetadata(AnotherMetadata.class).with(m1, m2); + assertThat(out.get(0).getMetadata()).hasSize(2); + assertThat(out.get(1).getMetadata()).hasSize(3).contains(am); + + out = Messages.chain(i).withoutMetadata().withMetadata(AnotherMetadata.class).with(m1, m2); + assertThat(out.get(0).getMetadata()).hasSize(1); + assertThat(out.get(1).getMetadata()).hasSize(1); + + } + public static class NonMergeableMetadata { String value;