Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 5969] prevent redelivery of acked batch message at the client api #5990

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,144 @@ public void testNegativeAcks(boolean batching, boolean usePartitions, Subscripti
consumer.close();
producer.close();
}

@Test
public void testBatchNegativeAcks()
throws Exception {

boolean batching = true;
SubscriptionType subscriptionType = SubscriptionType.Shared;
int negAcksDelayMillis = 0;
int ackTimeout = 0;
String topic = "testNegativeAcks-" + System.nanoTime();

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub1")
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscriptionType(subscriptionType)
.negativeAckRedeliveryDelay(negAcksDelayMillis, TimeUnit.MILLISECONDS)
.ackTimeout(ackTimeout, TimeUnit.MILLISECONDS)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(batching)
.create();

Set<String> nackedMessages = new HashSet<>();

final int N = 10;
final int A = 10;
for (int i = 0; i < A; i++) {
String value = "positive-" + i;
producer.sendAsync(value);
}
for (int i = 0; i < N; i++) {
String value = "negative-" + i;
producer.sendAsync(value);
}
producer.flush();

// negatively ack negative message
int nacked = 0;
for (int i = 0; i < N + A; i++) {
Message<String> msg = consumer.receive();
String value = msg.getValue();
if (value.startsWith("negative")) {
consumer.negativeAcknowledge(msg);
nackedMessages.add(value);
nacked++;
} else {
consumer.acknowledge(msg);
}
}

assertEquals(nacked, N);

Set<String> receivedMessages = new HashSet<>();

// Only the negatively acknowledged messages should be received again
for (int i = 0; i < N; i++) {
Message<String> msg = consumer.receive();
receivedMessages.add(msg.getValue());
consumer.acknowledge(msg);
}

assertEquals(receivedMessages, nackedMessages);

// There should be no more messages
assertNull(consumer.receive(100, TimeUnit.MILLISECONDS));
consumer.close();
producer.close();
}

@Test
public void testBatchNegativeCumulativeAcks()
throws Exception {
boolean batching = true;
SubscriptionType subscriptionType = SubscriptionType.Exclusive;
int negAcksDelayMillis = 0;
int ackTimeout = 1000;
String topic = "testNegativeAcks-" + System.nanoTime();
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub1")
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscriptionType(subscriptionType)
.negativeAckRedeliveryDelay(negAcksDelayMillis, TimeUnit.MILLISECONDS)
.ackTimeout(ackTimeout, TimeUnit.MILLISECONDS)
.subscribe();
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(batching)
.create();
Set<String> nackedMessages = new HashSet<>();
Message<String> msg = null;
final int N = 10;
final int A = 10;
for (int i = 0; i < A; i++) {
String value = "positive-" + i;
producer.sendAsync(value);
}
for (int i = 0; i < N; i++) {
String value = "negative-" + i;
producer.sendAsync(value);
}
producer.flush();
for (int i = 0; i < A; i++) {
msg = consumer.receive();
}
// Cumulative ack of messages
consumer.acknowledgeCumulative(msg);
// negatively ack negative message
int nacked = 0;
for (int i = 0; i < N; i++) {
msg = consumer.receive();
String value = msg.getValue();
consumer.negativeAcknowledge(msg);
nackedMessages.add(value);
nacked++;
}
assertEquals(nacked, N);
Set<String> receivedMessages = new HashSet<>();
// Only the negatively acknowledged messages should be received again
for (int i = 0; i < N; i++) {
msg = consumer.receive();
receivedMessages.add(msg.getValue());
}
// Cumulative ack of messages
consumer.acknowledgeCumulative(msg);
assertEquals(receivedMessages, nackedMessages);
// Wait for unacked timer to trigger if there are unacked messages
Thread.sleep(1500);
// There should be no more messages since they have all been acked
assertNull(consumer.receive(100, TimeUnit.MILLISECONDS));
consumer.close();
producer.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.google.common.annotations.VisibleForTesting;

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;

/**
* Tracks any partial acked batches and its acked messages
* This will prevent acked message redelivery to the client only at the client API level.
* This class does not track batch with all acked message. We trust broker won't deliver again.
*/
class BatchAckedTracker {

// a map of partial acked batch and its messages already acked
// Key is the string Id for batch, the value is bit set for message index whether ack-ed or not
@VisibleForTesting
Map<String, BitSet> ackedBatches = Collections.synchronizedMap(new HashMap<String, BitSet>());

public BatchAckedTracker() {
}

// If this message should be delivered and tracks this message if it is a batch message
public boolean deliver(MessageId messageId) {
if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl id = (BatchMessageIdImpl) messageId;
String batchId = getBatchId(id);
if (ackedBatches.containsKey(batchId)) {
return ackedBatches.get(batchId).get(id.getBatchIndex());
}
}
// deliver non batch message and any other cases
return true;
}

private BitSet initBatchSet(int size) {
BitSet set = new BitSet(size);
set.set(0, size);
return set;
}
/**
*
* @param messageId batchMessageIdImpl
* @return boolean isAllMsgAcked for the batch
*/
public boolean ack (BatchMessageIdImpl messageId, AckType ackType) {
String batchId = getBatchId(messageId);
int batchSize = messageId.getBatchSize();
BitSet batch = ackedBatches.getOrDefault(batchId, initBatchSet(batchSize));
if (ackType == AckType.Individual) {
batch.clear(messageId.getBatchIndex());
} else {
batch.clear(0, messageId.getBatchIndex() + 1);
}

if (batch.isEmpty()) {
//we ack complete batch now so delete it from the tracker
ackedBatches.remove(batchId);
return true;
} else {
ackedBatches.put(batchId, batch);
return false;
}
}

private static String getBatchId(BatchMessageIdImpl id) {
return id.ledgerId + "-" + id.entryId + "-" + id.partitionIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle

private final ReadWriteLock lock = new ReentrantReadWriteLock();

private final BatchAckedTracker batchAckedTracker;
private final UnAckedMessageTracker unAckedMessageTracker;
private final AcknowledgmentsGroupingTracker acknowledgmentsGroupingTracker;
private final NegativeAcksTracker negativeAcksTracker;
Expand Down Expand Up @@ -196,6 +197,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
this.readCompacted = conf.isReadCompacted();
this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition();
this.negativeAcksTracker = new NegativeAcksTracker(this, conf);
this.batchAckedTracker = new BatchAckedTracker();
this.resetIncludeHead = conf.isResetIncludeHead();
this.createTopicIfDoesNotExist = createTopicIfDoesNotExist;

Expand Down Expand Up @@ -435,6 +437,12 @@ boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType ackTyp
outstandingAcks = batchMessageId.getOutstandingAcksInSameBatch();
}

if (batchAckedTracker.ack(batchMessageId, ackType)) {
// the batch all delievered including previous acked
outstandingAcks = 0;
isAllMsgsAcked = true;
}

int batchSize = batchMessageId.getBatchSize();
// all messages in this batch have been acked
if (isAllMsgsAcked) {
Expand Down Expand Up @@ -1042,6 +1050,16 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv

BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), getPartitionIndex(), i, acker);

if (!batchAckedTracker.deliver(batchMessageIdImpl)) {
// individual batch message has been acked earlier
singleMessagePayload.release();
singleMessageMetadataBuilder.recycle();

++skippedMessages;
continue;
}

final MessageImpl<T> message = new MessageImpl<>(topicName.toString(), batchMessageIdImpl,
msgMetadata, singleMessageMetadataBuilder.build(), singleMessagePayload,
createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount);
Expand Down
Loading