diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index 7f343617ecafe..3d3f3db970ca6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -183,7 +183,9 @@ public int filterEntriesForConsumer(Optional entryWrapper, int e entry.release(); continue; } - } else if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) { + } + + if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) { PositionImpl pos = (PositionImpl) entry.getPosition(); // Message metadata was corrupted or the messages was a server-only marker diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index 7ff26ddd30953..b90037bdbdcf9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -32,6 +32,8 @@ import java.lang.reflect.Field; import java.util.Collection; import java.util.Optional; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.ArrayList; import java.util.List; @@ -1276,4 +1278,63 @@ public void testSendTxnAckBatchMessageToDLQ() throws Exception { assertEquals(value1, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue())); assertEquals(value2, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue())); } + + @Test + public void testDelayedTransactionMessages() throws Exception { + String topic = NAMESPACE1 + "/testDelayedTransactionMessages"; + + @Cleanup + Consumer failoverConsumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("failover-sub") + .subscriptionType(SubscriptionType.Failover) + .subscribe(); + + @Cleanup + Consumer sharedConsumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("shared-sub") + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .enableBatching(false) + .create(); + + Transaction transaction = pulsarClient.newTransaction() + .withTransactionTimeout(10, TimeUnit.SECONDS).build().get(); + for (int i = 0; i < 10; i++) { + producer.newMessage(transaction) + .value("msg-" + i) + .deliverAfter(5, TimeUnit.SECONDS) + .sendAsync(); + } + + producer.flush(); + + transaction.commit().get(); + + // Failover consumer will receive the messages immediately while + // the shared consumer will get them after the delay + Message msg = sharedConsumer.receive(1, TimeUnit.SECONDS); + assertNull(msg); + + for (int i = 0; i < 10; i++) { + msg = failoverConsumer.receive(100, TimeUnit.MILLISECONDS); + assertEquals(msg.getValue(), "msg-" + i); + } + + Set receivedMsgs = new TreeSet<>(); + for (int i = 0; i < 10; i++) { + msg = sharedConsumer.receive(10, TimeUnit.SECONDS); + receivedMsgs.add(msg.getValue()); + } + + assertEquals(receivedMsgs.size(), 10); + for (int i = 0; i < 10; i++) { + assertTrue(receivedMsgs.contains("msg-" + i)); + } + } }