Skip to content

Commit

Permalink
[improve] [broker] replace HashMap with inner implementation Concurre…
Browse files Browse the repository at this point in the history
…ntLongLongPairHashMap in Negative Ack Tracker. (apache#23582)

(cherry picked from commit 9d65a85)
(cherry picked from commit 431c232)
  • Loading branch information
thetumbled authored and nikhil-ctds committed Nov 20, 2024
1 parent b935c82 commit 06f222b
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -311,15 +311,15 @@ public void testNegativeAcksDeleteFromUnackedTracker() throws Exception {
// negative topic message id
consumer.negativeAcknowledge(topicMessageId);
NegativeAcksTracker negativeAcksTracker = consumer.getNegativeAcksTracker();
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse(-1).intValue(), 1);
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) -1).longValue(), 1L);
assertEquals(unAckedMessageTracker.size(), 0);
negativeAcksTracker.close();
// negative batch message id
unAckedMessageTracker.add(messageId);
consumer.negativeAcknowledge(batchMessageId);
consumer.negativeAcknowledge(batchMessageId2);
consumer.negativeAcknowledge(batchMessageId3);
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse(-1).intValue(), 1);
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) -1).longValue(), 1L);
assertEquals(unAckedMessageTracker.size(), 0);
negativeAcksTracker.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,23 @@
import io.netty.util.Timeout;
import io.netty.util.Timer;
import java.io.Closeable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class NegativeAcksTracker implements Closeable {
private static final Logger log = LoggerFactory.getLogger(NegativeAcksTracker.class);

private HashMap<MessageId, Long> nackedMessages = null;
private ConcurrentLongLongPairHashMap nackedMessages = null;

private final ConsumerBase<?> consumer;
private final Timer timer;
Expand All @@ -50,6 +51,7 @@ class NegativeAcksTracker implements Closeable {

// Set a min delay to allow for grouping nacks within a single batch
private static final long MIN_NACK_DELAY_NANOS = TimeUnit.MILLISECONDS.toNanos(100);
private static final long NON_PARTITIONED_TOPIC_PARTITION_INDEX = Long.MAX_VALUE;

public NegativeAcksTracker(ConsumerBase<?> consumer, ConsumerConfigurationData<?> conf) {
this.consumer = consumer;
Expand All @@ -75,15 +77,21 @@ private synchronized void triggerRedelivery(Timeout t) {
// Group all the nacked messages into one single re-delivery request
Set<MessageId> messagesToRedeliver = new HashSet<>();
long now = System.nanoTime();
nackedMessages.forEach((msgId, timestamp) -> {
nackedMessages.forEach((ledgerId, entryId, partitionIndex, timestamp) -> {
if (timestamp < now) {
MessageId msgId = new MessageIdImpl(ledgerId, entryId,
// need to covert non-partitioned topic partition index to -1
(int) (partitionIndex == NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex));
addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer);
messagesToRedeliver.add(msgId);
}
});

if (!messagesToRedeliver.isEmpty()) {
messagesToRedeliver.forEach(nackedMessages::remove);
for (MessageId messageId : messagesToRedeliver) {
nackedMessages.remove(((MessageIdImpl) messageId).getLedgerId(),
((MessageIdImpl) messageId).getEntryId());
}
consumer.onNegativeAcksSend(messagesToRedeliver);
log.info("[{}] {} messages will be re-delivered", consumer, messagesToRedeliver.size());
consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
Expand All @@ -102,7 +110,10 @@ public synchronized void add(Message<?> message) {

private synchronized void add(MessageId messageId, int redeliveryCount) {
if (nackedMessages == null) {
nackedMessages = new HashMap<>();
nackedMessages = ConcurrentLongLongPairHashMap.newBuilder()
.autoShrink(true)
.concurrencyLevel(1)
.build();
}

long backoffNs;
Expand All @@ -111,7 +122,14 @@ private synchronized void add(MessageId messageId, int redeliveryCount) {
} else {
backoffNs = nackDelayNanos;
}
nackedMessages.put(MessageIdAdvUtils.discardBatch(messageId), System.nanoTime() + backoffNs);
MessageIdAdv messageIdAdv = MessageIdAdvUtils.discardBatch(messageId);
// ConcurrentLongLongPairHashMap requires the key and value >=0.
// partitionIndex is -1 if the message is from a non-partitioned topic, but we don't use
// partitionIndex actually, so we can set it to Long.MAX_VALUE in the case of non-partitioned topic to
// avoid exception from ConcurrentLongLongPairHashMap.
nackedMessages.put(messageIdAdv.getLedgerId(), messageIdAdv.getEntryId(),
messageIdAdv.getPartitionIndex() >= 0 ? messageIdAdv.getPartitionIndex() :
NON_PARTITIONED_TOPIC_PARTITION_INDEX, System.nanoTime() + backoffNs);

if (this.timeout == null) {
// Schedule a task and group all the redeliveries for same period. Leave a small buffer to allow for
Expand All @@ -121,8 +139,8 @@ private synchronized void add(MessageId messageId, int redeliveryCount) {
}

@VisibleForTesting
Optional<Integer> getNackedMessagesCount() {
return Optional.ofNullable(nackedMessages).map(HashMap::size);
Optional<Long> getNackedMessagesCount() {
return Optional.ofNullable(nackedMessages).map(ConcurrentLongLongPairHashMap::size);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public void testClose() {
Exception checkException = null;
try {
if (consumer != null) {
consumer.negativeAcknowledge(new MessageIdImpl(-1, -1, -1));
consumer.negativeAcknowledge(new MessageIdImpl(0, 0, -1));
consumer.close();
}
} catch (Exception e) {
Expand Down

0 comments on commit 06f222b

Please sign in to comment.