From 0c3c175eb132d4ef0fb3a12842127dcb4fa1933d Mon Sep 17 00:00:00 2001 From: WJL3333 Date: Wed, 2 Nov 2022 22:32:42 +0800 Subject: [PATCH] [improve][broker] Get lowest PositionImpl from NavigableSet (#18278) * [cleanup] Direct get lowest PositionImpl from TreeMap change signature from Set to NavigableSet which makes the caller to get lowest PositionImpl more efficient. * change poll to first when call `NavigableSet` * fix check style remove unused import Co-authored-by: wangjinlong --- .../delayed/DelayedDeliveryTracker.java | 4 ++-- .../InMemoryDelayedDeliveryTracker.java | 6 +++--- .../MessageRedeliveryController.java | 3 ++- ...PersistentDispatcherMultipleConsumers.java | 20 +++++++++++-------- ...tStickyKeyDispatcherMultipleConsumers.java | 10 ++++++---- .../ConcurrentBitmapSortedLongPairSet.java | 4 ++-- 6 files changed, 27 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java index b54cbc6982f87..2f248a441cdee 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java @@ -19,7 +19,7 @@ package org.apache.pulsar.broker.delayed; import com.google.common.annotations.Beta; -import java.util.Set; +import java.util.NavigableSet; import org.apache.bookkeeper.mledger.impl.PositionImpl; /** @@ -58,7 +58,7 @@ public interface DelayedDeliveryTracker extends AutoCloseable { /** * Get a set of position of messages that have already reached the delivery time. */ - Set getScheduledMessages(int maxMessages); + NavigableSet getScheduledMessages(int maxMessages); /** * Tells whether the dispatcher should pause any message deliveries, until the DelayedDeliveryTracker has diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index f77fcebfb6ac8..da28ff19234b4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -22,7 +22,7 @@ import io.netty.util.Timer; import io.netty.util.TimerTask; import java.time.Clock; -import java.util.Set; +import java.util.NavigableSet; import java.util.TreeSet; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @@ -146,9 +146,9 @@ public boolean hasMessageAvailable() { * Get a set of position of messages that have already reached. */ @Override - public Set getScheduledMessages(int maxMessages) { + public NavigableSet getScheduledMessages(int maxMessages) { int n = maxMessages; - Set positions = new TreeSet<>(); + NavigableSet positions = new TreeSet<>(); long cutoffTime = getCutoffTime(); while (n > 0 && !priorityQueue.isEmpty()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java index 21dd272acfef0..d8667def5526d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java @@ -21,6 +21,7 @@ import com.google.common.collect.ComparisonChain; import java.util.ArrayList; import java.util.List; +import java.util.NavigableSet; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -100,7 +101,7 @@ public boolean containsStickyKeyHashes(Set stickyKeyHashes) { return isContained.get(); } - public Set getMessagesToReplayNow(int maxMessagesToRead) { + public NavigableSet getMessagesToReplayNow(int maxMessagesToRead) { return messagesToRedeliver.items(maxMessagesToRead, PositionImpl::new); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 2e4cd7dcce508..b777087ba2f39 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -27,6 +27,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.NavigableSet; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -276,7 +277,7 @@ public synchronized void readMoreEntries() { return; } - Set messagesToReplayNow = getMessagesToReplayNow(messagesToRead); + NavigableSet messagesToReplayNow = getMessagesToReplayNow(messagesToRead); if (!messagesToReplayNow.isEmpty()) { if (log.isDebugEnabled()) { @@ -285,7 +286,7 @@ public synchronized void readMoreEntries() { } havePendingReplayRead = true; - minReplayedPosition = messagesToReplayNow.stream().min(PositionImpl::compareTo).orElse(null); + minReplayedPosition = messagesToReplayNow.first(); Set deletedMessages = topic.isDelayedDeliveryEnabled() ? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow); // clear already acked positions from replay bucket @@ -309,11 +310,14 @@ public synchronized void readMoreEntries() { consumerList.size()); } havePendingRead = true; - Set toReplay = getMessagesToReplayNow(1); - minReplayedPosition = toReplay.stream().findFirst().orElse(null); - if (minReplayedPosition != null) { + NavigableSet toReplay = getMessagesToReplayNow(1); + if (!toReplay.isEmpty()) { + minReplayedPosition = toReplay.first(); redeliveryMessages.add(minReplayedPosition.getLedgerId(), minReplayedPosition.getEntryId()); + } else { + minReplayedPosition = null; } + cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this, ReadType.Normal, topic.getMaxReadPosition()); } else { @@ -1020,17 +1024,17 @@ public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata } } - protected synchronized Set getMessagesToReplayNow(int maxMessagesToRead) { + protected synchronized NavigableSet getMessagesToReplayNow(int maxMessagesToRead) { if (!redeliveryMessages.isEmpty()) { return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead); } else if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) { delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis()); - Set messagesAvailableNow = + NavigableSet messagesAvailableNow = delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead); messagesAvailableNow.forEach(p -> redeliveryMessages.add(p.getLedgerId(), p.getEntryId())); return messagesAvailableNow; } else { - return Collections.emptySet(); + return Collections.emptyNavigableSet(); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index d6f85e11316a3..0eeb403d2910b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -28,6 +28,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.NavigableSet; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -173,9 +174,10 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis // A corner case that we have to retry a readMoreEntries in order to preserver order delivery. // This may happen when consumer closed. See issue #12885 for details. if (!allowOutOfOrderDelivery) { - Set messagesToReplayNow = this.getMessagesToReplayNow(1); + NavigableSet messagesToReplayNow = this.getMessagesToReplayNow(1); if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty()) { - PositionImpl replayPosition = messagesToReplayNow.stream().findFirst().get(); + PositionImpl replayPosition = messagesToReplayNow.first(); + // We have received a message potentially from the delayed tracker and, since we're not using it // right now, it needs to be added to the redelivery tracker or we won't attempt anymore to // resend it (until we disconnect consumer). @@ -435,13 +437,13 @@ private boolean removeConsumersFromRecentJoinedConsumers() { } @Override - protected synchronized Set getMessagesToReplayNow(int maxMessagesToRead) { + protected synchronized NavigableSet getMessagesToReplayNow(int maxMessagesToRead) { if (isDispatcherStuckOnReplays) { // If we're stuck on replay, we want to move forward reading on the topic (until the overall max-unacked // messages kicks in), instead of keep replaying the same old messages, since the consumer that these // messages are routing to might be busy at the moment this.isDispatcherStuckOnReplays = false; - return Collections.emptySet(); + return Collections.emptyNavigableSet(); } else { return super.getMessagesToReplayNow(maxMessagesToRead); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java b/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java index ae7b495272b8b..e42cae2580b78 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java @@ -22,7 +22,6 @@ import java.util.Map; import java.util.NavigableMap; import java.util.NavigableSet; -import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.locks.ReadWriteLock; @@ -95,7 +94,8 @@ public void removeUpTo(long item1, long item2) { } - public Set items(int numberOfItems, LongPairSet.LongPairFunction longPairConverter) { + public > NavigableSet items(int numberOfItems, + LongPairSet.LongPairFunction longPairConverter) { NavigableSet items = new TreeSet<>(); lock.readLock().lock(); try {