Skip to content

Commit

Permalink
Pulsar nack release message only when the message is not used later
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Apr 15, 2024
1 parent 02538ac commit 14672b9
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public PulsarIgnore(String channel) {
public Uni<Void> handle(PulsarIncomingMessage<?> message, Throwable reason, Metadata metadata) {
log.messageFailureIgnored(channel, reason.getMessage());
log.messageFailureFullCause(reason);
message.unwrap().release();
return Uni.createFrom().completionStage(message.ack())
.emitOn(message::runOnMessageContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public Uni<Void> handle(PulsarIncomingMessage<?> message, Throwable reason, Meta
consumer.negativeAcknowledge(message.getMessageId());
log.messageFailureNacked(channel, reason.getMessage());
log.messageFailureFullCause(reason);
message.unwrap().release();
return Uni.createFrom().voidItem()
.emitOn(message::runOnMessageContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public Uni<Void> handle(PulsarIncomingMessage<?> message, Throwable reason, Meta

log.messageFailureDelayed(channel, delay.toSeconds(), reason.getMessage());
log.messageFailureFullCause(reason);
message.unwrap().release();
return Uni.createFrom()
.completionStage(
() -> consumer.reconsumeLaterAsync(message.unwrap(), customProperties, delay.toSeconds(), SECONDS))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.smallrye.reactive.messaging.pulsar.ack;

import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

public class PulsarAckPoolMessagesTest extends PulsarAckTest {

MapBasedConfig config() {
return super.config()
.with("mp.messaging.incoming.data.poolMessages", true);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.smallrye.reactive.messaging.pulsar.fault;

import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

public class PulsarNackPoolMessagesTest extends PulsarNackTest {

MapBasedConfig config() {
return super.config()
.with("mp.messaging.incoming.data.poolMessages", true);
}

}

0 comments on commit 14672b9

Please sign in to comment.