From 14672b94ead84646b2caaac68a1b9ffadc7c1249 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Mon, 15 Apr 2024 17:21:58 +0200 Subject: [PATCH] Pulsar nack release message only when the message is not used later --- .../messaging/pulsar/fault/PulsarIgnore.java | 1 - .../reactive/messaging/pulsar/fault/PulsarNack.java | 1 - .../messaging/pulsar/fault/PulsarReconsumeLater.java | 1 - .../pulsar/ack/PulsarAckPoolMessagesTest.java | 12 ++++++++++++ .../pulsar/fault/PulsarNackPoolMessagesTest.java | 12 ++++++++++++ 5 files changed, 24 insertions(+), 3 deletions(-) create mode 100644 smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/ack/PulsarAckPoolMessagesTest.java create mode 100644 smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarNackPoolMessagesTest.java diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarIgnore.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarIgnore.java index 1d39e48aea..8914c95f5d 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarIgnore.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarIgnore.java @@ -42,7 +42,6 @@ public PulsarIgnore(String channel) { public Uni handle(PulsarIncomingMessage message, Throwable reason, Metadata metadata) { log.messageFailureIgnored(channel, reason.getMessage()); log.messageFailureFullCause(reason); - message.unwrap().release(); return Uni.createFrom().completionStage(message.ack()) .emitOn(message::runOnMessageContext); } diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarNack.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarNack.java index 97a0171849..8eff893313 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarNack.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarNack.java @@ -45,7 +45,6 @@ public Uni handle(PulsarIncomingMessage message, Throwable reason, Meta consumer.negativeAcknowledge(message.getMessageId()); log.messageFailureNacked(channel, reason.getMessage()); log.messageFailureFullCause(reason); - message.unwrap().release(); return Uni.createFrom().voidItem() .emitOn(message::runOnMessageContext); } diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarReconsumeLater.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarReconsumeLater.java index cce3988654..c9dd82e8a0 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarReconsumeLater.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarReconsumeLater.java @@ -59,7 +59,6 @@ public Uni handle(PulsarIncomingMessage message, Throwable reason, Meta log.messageFailureDelayed(channel, delay.toSeconds(), reason.getMessage()); log.messageFailureFullCause(reason); - message.unwrap().release(); return Uni.createFrom() .completionStage( () -> consumer.reconsumeLaterAsync(message.unwrap(), customProperties, delay.toSeconds(), SECONDS)) diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/ack/PulsarAckPoolMessagesTest.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/ack/PulsarAckPoolMessagesTest.java new file mode 100644 index 0000000000..74db1ae12e --- /dev/null +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/ack/PulsarAckPoolMessagesTest.java @@ -0,0 +1,12 @@ +package io.smallrye.reactive.messaging.pulsar.ack; + +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +public class PulsarAckPoolMessagesTest extends PulsarAckTest { + + MapBasedConfig config() { + return super.config() + .with("mp.messaging.incoming.data.poolMessages", true); + } + +} diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarNackPoolMessagesTest.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarNackPoolMessagesTest.java new file mode 100644 index 0000000000..38ac29c759 --- /dev/null +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarNackPoolMessagesTest.java @@ -0,0 +1,12 @@ +package io.smallrye.reactive.messaging.pulsar.fault; + +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +public class PulsarNackPoolMessagesTest extends PulsarNackTest { + + MapBasedConfig config() { + return super.config() + .with("mp.messaging.incoming.data.poolMessages", true); + } + +}