From be1d07e16119fafbe489315ab68c4751bf03c31d Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 21 Nov 2022 17:43:01 +0800 Subject: [PATCH] [fix] Avoid redelivering duplicated messages when batching is enabled (#18486) ### Motivation https://github.com/apache/pulsar/pull/18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by #18454. The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations. ### Modifications Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in #18454. `ConsumerRedeliveryTest#testAckNotSent` is added to verify it works. ### TODO The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it. ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: https://github.com/BewareMyPower/pulsar/pull/8 --- .../client/api/ConsumerRedeliveryTest.java | 69 +++++++++++++++++++ .../impl/ConsumerDedupPermitsUpdateTest.java | 21 ++---- ...sistentAcknowledgmentsGroupingTracker.java | 32 +++++++-- .../client/impl/LastCumulativeAckTest.java | 20 ++++++ 4 files changed, 121 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java index 41e451f9ed18f..7e3b68a5445be 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java @@ -30,9 +30,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import lombok.Cleanup; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +68,19 @@ public Object[][] ackReceiptEnabled() { return new Object[][] { { true }, { false } }; } + @DataProvider(name = "batchedMessageAck") + public Object[][] batchedMessageAck() { + // When batch index ack is disabled (by default), only after all single messages were sent would the pending + // ACK be added into the ACK tracker. + return new Object[][] { + // numAcked, batchSize, ack type + { 3, 5, CommandAck.AckType.Individual }, + { 5, 5, CommandAck.AckType.Individual }, + { 3, 5, CommandAck.AckType.Cumulative }, + { 5, 5, CommandAck.AckType.Cumulative } + }; + } + /** * It verifies that redelivered messages are sorted based on the ledger-ids. *
@@ -297,4 +313,57 @@ public void testMessageRedeliveryAfterUnloadedWithEarliestPosition() throws Exce
         consumer.close();
         producer.close();
     }
+
+    @Test(timeOut = 30000, dataProvider = "batchedMessageAck")
+    public void testAckNotSent(int numAcked, int batchSize, CommandAck.AckType ackType) throws Exception {
+        String topic = "persistent://my-property/my-ns/test-ack-not-sent-"
+                + numAcked + "-" + batchSize + "-" + ackType.getValue();
+        @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub")
+                .enableBatchIndexAcknowledgment(false)
+                .acknowledgmentGroupTime(1, TimeUnit.HOURS) // ACK won't be sent
+                .subscribe();
+        @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(true)
+                .batchingMaxMessages(batchSize)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                .create();
+        for (int i = 0; i < batchSize; i++) {
+            String value = "msg-" + i;
+            producer.sendAsync(value).thenAccept(id -> log.info("{} was sent to {}", value, id));
+        }
+        List> messages = new ArrayList<>();
+        for (int i = 0; i < batchSize; i++) {
+            messages.add(consumer.receive());
+        }
+        if (ackType == CommandAck.AckType.Individual) {
+            for (int i = 0; i < numAcked; i++) {
+                consumer.acknowledge(messages.get(i));
+            }
+        } else {
+            consumer.acknowledgeCumulative(messages.get(numAcked - 1));
+        }
+
+        consumer.redeliverUnacknowledgedMessages();
+
+        messages.clear();
+        for (int i = 0; i < batchSize; i++) {
+            Message msg = consumer.receive(2, TimeUnit.SECONDS);
+            if (msg == null) {
+                break;
+            }
+            log.info("Received {} from {}", msg.getValue(), msg.getMessageId());
+            messages.add(msg);
+        }
+        List values = messages.stream().map(Message::getValue).collect(Collectors.toList());
+        // All messages are redelivered because only if the whole batch are acknowledged would the message ID be
+        // added into the ACK tracker.
+        if (numAcked < batchSize) {
+            assertEquals(values, IntStream.range(0, batchSize).mapToObj(i -> "msg-" + i).collect(Collectors.toList()));
+        } else {
+            assertTrue(values.isEmpty());
+        }
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
index 5ef3054d16516..6fc8268ae2165 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
@@ -116,23 +116,10 @@ public void testConsumerDedup(boolean batchingEnabled, int receiverQueueSize) th
         }
         producer.flush();
 
-        if (batchingEnabled) {
-            for (int i = 0; i < 30; i++) {
-                Message msg = consumer.receive();
-                assertEquals(msg.getValue(), "hello-" + i);
-                consumer.acknowledge(msg);
-            }
-            for (int i = 0; i < 30; i++) {
-                Message msg = consumer.receive();
-                assertEquals(msg.getValue(), "new-message-" + i);
-                consumer.acknowledge(msg);
-            }
-        } else {
-            for (int i = 0; i < 30; i++) {
-                Message msg = consumer.receive();
-                assertEquals(msg.getValue(), "new-message-" + i);
-                consumer.acknowledge(msg);
-            }
+        for (int i = 0; i < 30; i++) {
+            Message msg = consumer.receive();
+            assertEquals(msg.getValue(), "new-message-" + i);
+            consumer.acknowledge(msg);
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index b44670c14631f..fef0bcb8906f1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -113,12 +113,18 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl consumer, Consum
      */
     @Override
     public boolean isDuplicate(MessageId messageId) {
-        final MessageIdImpl messageIdOfLastAck = lastCumulativeAck.getMessageId();
-        if (messageIdOfLastAck != null && messageId.compareTo(messageIdOfLastAck) <= 0) {
+        if (!(messageId instanceof MessageIdImpl)) {
+            throw new IllegalArgumentException("isDuplicated cannot accept "
+                    + messageId.getClass().getName() + ": " + messageId);
+        }
+        if (lastCumulativeAck.compareTo(messageId) >= 0) {
             // Already included in a cumulative ack
             return true;
         } else {
-            return pendingIndividualAcks.contains((MessageIdImpl) messageId);
+            final MessageIdImpl messageIdImpl = (messageId instanceof BatchMessageIdImpl)
+                    ? ((BatchMessageIdImpl) messageId).toMessageIdImpl()
+                    : (MessageIdImpl) messageId;
+            return pendingIndividualAcks.contains(messageIdImpl);
         }
     }
 
@@ -622,7 +628,7 @@ protected LastCumulativeAck initialValue() {
     private boolean flushRequired = false;
 
     public synchronized void update(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
-        if (messageId.compareTo(this.messageId) > 0) {
+        if (compareTo(messageId) < 0) {
             if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) {
                 this.bitSetRecyclable.recycle();
             }
@@ -656,6 +662,24 @@ public synchronized void reset() {
         flushRequired = false;
     }
 
+    public synchronized int compareTo(MessageId messageId) {
+        if (this.messageId instanceof BatchMessageIdImpl && (!(messageId instanceof BatchMessageIdImpl))) {
+            final BatchMessageIdImpl lhs = (BatchMessageIdImpl) this.messageId;
+            final MessageIdImpl rhs = (MessageIdImpl) messageId;
+            return MessageIdImpl.messageIdCompare(
+                    lhs.getLedgerId(), lhs.getEntryId(), lhs.getPartitionIndex(), lhs.getBatchIndex(),
+                    rhs.getLedgerId(), rhs.getEntryId(), rhs.getPartitionIndex(), Integer.MAX_VALUE);
+        } else if (messageId instanceof BatchMessageIdImpl && (!(this.messageId instanceof BatchMessageIdImpl))){
+            final MessageIdImpl lhs = this.messageId;
+            final BatchMessageIdImpl rhs = (BatchMessageIdImpl) messageId;
+            return MessageIdImpl.messageIdCompare(
+                    lhs.getLedgerId(), lhs.getEntryId(), lhs.getPartitionIndex(), Integer.MAX_VALUE,
+                    rhs.getLedgerId(), rhs.getEntryId(), rhs.getPartitionIndex(), rhs.getBatchIndex());
+        } else {
+            return this.messageId.compareTo(messageId);
+        }
+    }
+
     private synchronized void set(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
         this.messageId = messageId;
         this.bitSetRecyclable = bitSetRecyclable;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java
index 245866749e944..27ee9d1dc3648 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java
@@ -83,4 +83,24 @@ public void testFlush() {
         assertEquals(lastCumulativeAckToFlush.getBitSetRecyclable(), bitSetRecyclable);
     }
 
+    @Test
+    public void testCompareTo() {
+        LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
+        lastCumulativeAck.update(new MessageIdImpl(0L, 1L, -1), null);
+
+        assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 0L, -1)) > 0);
+        assertEquals(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 1L, -1)), 0);
+        assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 2L, -1)) < 0);
+        assertTrue(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, -1, 0)) > 0);
+
+        lastCumulativeAck = new LastCumulativeAck();
+        lastCumulativeAck.update(new BatchMessageIdImpl(0L, 1L, -1, 1), null);
+
+        assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 0L, -1)) > 0);
+        assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 1L, -1)) < 0);
+        assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 2L, -1)) < 0);
+        assertTrue(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, -1, 0)) > 0);
+        assertTrue(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, -1, 2)) < 0);
+        assertEquals(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, -1, 1)), 0);
+    }
 }