Skip to content

Commit

Permalink
[fix][txn]fix receive duplicated messages due to pendingAcks in Pendi…
Browse files Browse the repository at this point in the history
…ngAckHandle (#19581)

Co-authored-by: mayozhang <[email protected]>
  • Loading branch information
aloyszhang and mayozhang authored Feb 23, 2023
1 parent d4be954 commit e6bc499
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ public static long[] andAckSet(long[] firstAckSet, long[] secondAckSet) {
return ackSet;
}

public static boolean isAckSetEmpty(long[] ackSet) {
BitSetRecyclable bitSet = BitSetRecyclable.create().resetWords(ackSet);
boolean isEmpty = bitSet.isEmpty();
bitSet.recycle();
return isEmpty;
}

//This method is compare two position which position is bigger than another one.
//When the ledgerId and entryId in this position is same to another one and two position all have ack set, it will
//compare the ack set next bit index is bigger than another one.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;

import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetEmpty;
import io.netty.buffer.ByteBuf;
import io.prometheus.client.Gauge;
import java.util.ArrayList;
Expand Down Expand Up @@ -239,6 +240,18 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i
// if actSet is null, use pendingAck ackSet
ackSet = positionInPendingAck.getAckSet();
}
// if the result of pendingAckSet(in pendingAckHandle) AND the ackSet(in cursor) is empty
// filter this entry
if (isAckSetEmpty(ackSet)) {
entries.set(i, null);
entry.release();
continue;
}
} else {
// filter non-batch message in pendingAck state
entries.set(i, null);
entry.release();
continue;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,84 @@ private void testIndividualAckAbortFilterAckSetInPendingAckState() throws Except
assertNull(consumer.receive(2, TimeUnit.SECONDS));
}


@Test(dataProvider="enableBatch")
private void testFilterMsgsInPendingAckStateWhenConsumerDisconnect(boolean enableBatch) throws Exception {
final String topicName = NAMESPACE1 + "/testFilterMsgsInPendingAckStateWhenConsumerDisconnect-" + enableBatch;
final int count = 10;

@Cleanup
Producer<Integer> producer = null;
if (enableBatch) {
producer = pulsarClient
.newProducer(Schema.INT32)
.topic(topicName)
.enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
.batchingMaxMessages(count).create();
} else {
producer = pulsarClient
.newProducer(Schema.INT32)
.topic(topicName)
.enableBatching(false).create();
}

@Cleanup
Consumer<Integer> consumer = pulsarClient
.newConsumer(Schema.INT32)
.topic(topicName)
.isAckReceiptEnabled(true)
.subscriptionName("test")
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true)
.subscribe();

for (int i = 0; i < count; i++) {
producer.sendAsync(i);
}

Transaction txn1 = getTxn();

Transaction txn2 = getTxn();


// txn1 ack half of messages and don't end the txn1
for (int i = 0; i < count / 2; i++) {
consumer.acknowledgeAsync(consumer.receive().getMessageId(), txn1).get();
}

// txn2 ack the rest half of messages and commit tnx2
for (int i = count / 2; i < count; i++) {
consumer.acknowledgeAsync(consumer.receive().getMessageId(), txn2).get();
}
// commit txn2
txn2.commit().get();

// close and re-create consumer
consumer.close();
consumer = pulsarClient
.newConsumer(Schema.INT32)
.topic(topicName)
.isAckReceiptEnabled(true)
.subscriptionName("test")
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true)
.subscribe();

Message<Integer> message = consumer.receive(3, TimeUnit.SECONDS);
Assert.assertNull(message);

// abort txn1
txn1.abort().get();
// after txn1 aborted, consumer will receive messages txn1 contains
int receiveCounter = 0;
while((message = consumer.receive(3, TimeUnit.SECONDS)) != null) {
Assert.assertEquals(message.getValue().intValue(), receiveCounter);
receiveCounter ++;
}
Assert.assertEquals(receiveCounter, count / 2);
}

@Test(dataProvider="enableBatch")
private void produceCommitTest(boolean enableBatch) throws Exception {
@Cleanup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,21 @@ public void testResetWords() {
Assert.assertTrue(bitset1.get(128));
Assert.assertFalse(bitset1.get(256));
}

@Test
public void testBitSetEmpty() {
BitSetRecyclable bitSet = BitSetRecyclable.create();
bitSet.set(0, 5);
bitSet.clear(1);
bitSet.clear(2);
bitSet.clear(3);
long[] array = bitSet.toLongArray();
Assert.assertFalse(bitSet.isEmpty());
Assert.assertFalse(BitSetRecyclable.create().resetWords(array).isEmpty());
bitSet.clear(0);
bitSet.clear(4);
Assert.assertTrue(bitSet.isEmpty());
long[] array1 = bitSet.toLongArray();
Assert.assertTrue(BitSetRecyclable.create().resetWords(array1).isEmpty());
}
}

0 comments on commit e6bc499

Please sign in to comment.