Skip to content

Commit

Permalink
Fix the cumulative ack
Browse files Browse the repository at this point in the history
  • Loading branch information
sijie committed Mar 24, 2018
1 parent 46266ec commit ceb7b45
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -433,7 +434,7 @@ public void testOutOfOrderAcksForBatchMessage() throws Exception {
Message<byte[]> lastunackedMsg = null;
for (int i = 0; i < numMsgs; i++) {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
LOG.info("received message {}", String.valueOf(msg.getData()));
LOG.info("received message {}", new String(msg.getData(), UTF_8));
assertNotNull(msg);
if (i == 8) {
consumer.acknowledgeCumulative(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ static BatchMessageAcker newAcker(int batchSize) {

// bitset shared across messages in the same batch.
private final BitSet bitSet;
private boolean prevBatchCumulativelyAcked = false;

BatchMessageAcker(BitSet bitSet) {
this.bitSet = bitSet;
Expand Down Expand Up @@ -61,4 +62,12 @@ public synchronized int getOutstandingAcks() {
return bitSet.cardinality();
}

public void setPrevBatchCumulativelyAcked(boolean acked) {
this.prevBatchCumulativelyAcked = acked;
}

public boolean isPrevBatchCumulativelyAcked() {
return prevBatchCumulativelyAcked;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,8 @@ public MessageIdImpl prevBatchMessageId() {
ledgerId, entryId - 1, partitionIndex);
}

public BatchMessageAcker getAcker() {
return acker;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,10 @@ boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType ackTyp
} else {
isAllMsgsAcked = batchMessageId.ackCumulative();
}
int outstandingAcks = batchMessageId.getOutstandingAcksInSameBatch();
int outstandingAcks = 0;
if (log.isDebugEnabled()) {
outstandingAcks = batchMessageId.getOutstandingAcksInSameBatch();
}

int batchSize = batchMessageId.getBatchSize();
// all messages in this batch have been acked
Expand All @@ -371,8 +374,10 @@ boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType ackTyp
}
return true;
} else {
if (AckType.Cumulative == ackType && outstandingAcks == batchSize - 1) {
if (AckType.Cumulative == ackType
&& !batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
sendAcknowledge(batchMessageId.prevBatchMessageId(), AckType.Cumulative, properties);
batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] cannot ack message to broker {}, acktype {}, pending acks - {}", subscription,
Expand Down

0 comments on commit ceb7b45

Please sign in to comment.