Skip to content

Commit

Permalink
[improve][broker] Get lowest PositionImpl from NavigableSet (#18278)
Browse files Browse the repository at this point in the history
* [cleanup] Direct get lowest PositionImpl from TreeMap

change signature from Set<T> to NavigableSet<T>
which makes the caller to get lowest PositionImpl more efficient.

* change poll to first when call `NavigableSet`

* fix check style remove unused import

Co-authored-by: wangjinlong <[email protected]>
  • Loading branch information
lifepuzzlefun and lifepuzzlefun1 authored Nov 2, 2022
1 parent 2e878e8 commit 0c3c175
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.pulsar.broker.delayed;

import com.google.common.annotations.Beta;
import java.util.Set;
import java.util.NavigableSet;
import org.apache.bookkeeper.mledger.impl.PositionImpl;

/**
Expand Down Expand Up @@ -58,7 +58,7 @@ public interface DelayedDeliveryTracker extends AutoCloseable {
/**
* Get a set of position of messages that have already reached the delivery time.
*/
Set<PositionImpl> getScheduledMessages(int maxMessages);
NavigableSet<PositionImpl> getScheduledMessages(int maxMessages);

/**
* Tells whether the dispatcher should pause any message deliveries, until the DelayedDeliveryTracker has
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.time.Clock;
import java.util.Set;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -146,9 +146,9 @@ public boolean hasMessageAvailable() {
* Get a set of position of messages that have already reached.
*/
@Override
public Set<PositionImpl> getScheduledMessages(int maxMessages) {
public NavigableSet<PositionImpl> getScheduledMessages(int maxMessages) {
int n = maxMessages;
Set<PositionImpl> positions = new TreeSet<>();
NavigableSet<PositionImpl> positions = new TreeSet<>();
long cutoffTime = getCutoffTime();

while (n > 0 && !priorityQueue.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.ComparisonChain;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
Expand Down Expand Up @@ -100,7 +101,7 @@ public boolean containsStickyKeyHashes(Set<Integer> stickyKeyHashes) {
return isContained.get();
}

public Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
public NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
return messagesToRedeliver.items(maxMessagesToRead, PositionImpl::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -276,7 +277,7 @@ public synchronized void readMoreEntries() {
return;
}

Set<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);
NavigableSet<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);

if (!messagesToReplayNow.isEmpty()) {
if (log.isDebugEnabled()) {
Expand All @@ -285,7 +286,7 @@ public synchronized void readMoreEntries() {
}

havePendingReplayRead = true;
minReplayedPosition = messagesToReplayNow.stream().min(PositionImpl::compareTo).orElse(null);
minReplayedPosition = messagesToReplayNow.first();
Set<? extends Position> deletedMessages = topic.isDelayedDeliveryEnabled()
? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow);
// clear already acked positions from replay bucket
Expand All @@ -309,11 +310,14 @@ public synchronized void readMoreEntries() {
consumerList.size());
}
havePendingRead = true;
Set<PositionImpl> toReplay = getMessagesToReplayNow(1);
minReplayedPosition = toReplay.stream().findFirst().orElse(null);
if (minReplayedPosition != null) {
NavigableSet<PositionImpl> toReplay = getMessagesToReplayNow(1);
if (!toReplay.isEmpty()) {
minReplayedPosition = toReplay.first();
redeliveryMessages.add(minReplayedPosition.getLedgerId(), minReplayedPosition.getEntryId());
} else {
minReplayedPosition = null;
}

cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this,
ReadType.Normal, topic.getMaxReadPosition());
} else {
Expand Down Expand Up @@ -1020,17 +1024,17 @@ public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata
}
}

protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
protected synchronized NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
if (!redeliveryMessages.isEmpty()) {
return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead);
} else if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) {
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
Set<PositionImpl> messagesAvailableNow =
NavigableSet<PositionImpl> messagesAvailableNow =
delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
messagesAvailableNow.forEach(p -> redeliveryMessages.add(p.getLedgerId(), p.getEntryId()));
return messagesAvailableNow;
} else {
return Collections.emptySet();
return Collections.emptyNavigableSet();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -173,9 +174,10 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
// A corner case that we have to retry a readMoreEntries in order to preserver order delivery.
// This may happen when consumer closed. See issue #12885 for details.
if (!allowOutOfOrderDelivery) {
Set<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(1);
NavigableSet<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(1);
if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty()) {
PositionImpl replayPosition = messagesToReplayNow.stream().findFirst().get();
PositionImpl replayPosition = messagesToReplayNow.first();

// We have received a message potentially from the delayed tracker and, since we're not using it
// right now, it needs to be added to the redelivery tracker or we won't attempt anymore to
// resend it (until we disconnect consumer).
Expand Down Expand Up @@ -435,13 +437,13 @@ private boolean removeConsumersFromRecentJoinedConsumers() {
}

@Override
protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
protected synchronized NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
if (isDispatcherStuckOnReplays) {
// If we're stuck on replay, we want to move forward reading on the topic (until the overall max-unacked
// messages kicks in), instead of keep replaying the same old messages, since the consumer that these
// messages are routing to might be busy at the moment
this.isDispatcherStuckOnReplays = false;
return Collections.emptySet();
return Collections.emptyNavigableSet();
} else {
return super.getMessagesToReplayNow(maxMessagesToRead);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.locks.ReadWriteLock;
Expand Down Expand Up @@ -95,7 +94,8 @@ public void removeUpTo(long item1, long item2) {
}


public <T> Set<T> items(int numberOfItems, LongPairSet.LongPairFunction<T> longPairConverter) {
public <T extends Comparable<T>> NavigableSet<T> items(int numberOfItems,
LongPairSet.LongPairFunction<T> longPairConverter) {
NavigableSet<T> items = new TreeSet<>();
lock.readLock().lock();
try {
Expand Down

0 comments on commit 0c3c175

Please sign in to comment.