Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Get lowest PositionImpl from NavigableSet #18278

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.pulsar.broker.delayed;

import com.google.common.annotations.Beta;
import java.util.Set;
import java.util.NavigableSet;
import org.apache.bookkeeper.mledger.impl.PositionImpl;

/**
Expand Down Expand Up @@ -58,7 +58,7 @@ public interface DelayedDeliveryTracker extends AutoCloseable {
/**
* Get a set of position of messages that have already reached the delivery time.
*/
Set<PositionImpl> getScheduledMessages(int maxMessages);
NavigableSet<PositionImpl> getScheduledMessages(int maxMessages);

/**
* Tells whether the dispatcher should pause any message deliveries, until the DelayedDeliveryTracker has
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.time.Clock;
import java.util.Set;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -146,9 +146,9 @@ public boolean hasMessageAvailable() {
* Get a set of position of messages that have already reached.
*/
@Override
public Set<PositionImpl> getScheduledMessages(int maxMessages) {
public NavigableSet<PositionImpl> getScheduledMessages(int maxMessages) {
int n = maxMessages;
Set<PositionImpl> positions = new TreeSet<>();
NavigableSet<PositionImpl> positions = new TreeSet<>();
long cutoffTime = getCutoffTime();

while (n > 0 && !priorityQueue.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.ComparisonChain;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
Expand Down Expand Up @@ -100,7 +101,7 @@ public boolean containsStickyKeyHashes(Set<Integer> stickyKeyHashes) {
return isContained.get();
}

public Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
public NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
return messagesToRedeliver.items(maxMessagesToRead, PositionImpl::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -276,7 +277,7 @@ public synchronized void readMoreEntries() {
return;
}

Set<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);
NavigableSet<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);

if (!messagesToReplayNow.isEmpty()) {
if (log.isDebugEnabled()) {
Expand All @@ -285,7 +286,7 @@ public synchronized void readMoreEntries() {
}

havePendingReplayRead = true;
minReplayedPosition = messagesToReplayNow.stream().min(PositionImpl::compareTo).orElse(null);
minReplayedPosition = messagesToReplayNow.first();
nodece marked this conversation as resolved.
Show resolved Hide resolved
Set<? extends Position> deletedMessages = topic.isDelayedDeliveryEnabled()
? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow);
// clear already acked positions from replay bucket
Expand All @@ -309,11 +310,14 @@ public synchronized void readMoreEntries() {
consumerList.size());
}
havePendingRead = true;
Set<PositionImpl> toReplay = getMessagesToReplayNow(1);
minReplayedPosition = toReplay.stream().findFirst().orElse(null);
if (minReplayedPosition != null) {
NavigableSet<PositionImpl> toReplay = getMessagesToReplayNow(1);
if (!toReplay.isEmpty()) {
minReplayedPosition = toReplay.first();
redeliveryMessages.add(minReplayedPosition.getLedgerId(), minReplayedPosition.getEntryId());
} else {
minReplayedPosition = null;
}

cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this,
ReadType.Normal, topic.getMaxReadPosition());
} else {
Expand Down Expand Up @@ -1020,17 +1024,17 @@ public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata
}
}

protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
protected synchronized NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
if (!redeliveryMessages.isEmpty()) {
return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead);
} else if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) {
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
Set<PositionImpl> messagesAvailableNow =
NavigableSet<PositionImpl> messagesAvailableNow =
delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
messagesAvailableNow.forEach(p -> redeliveryMessages.add(p.getLedgerId(), p.getEntryId()));
return messagesAvailableNow;
} else {
return Collections.emptySet();
return Collections.emptyNavigableSet();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -173,9 +174,10 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
// A corner case that we have to retry a readMoreEntries in order to preserver order delivery.
// This may happen when consumer closed. See issue #12885 for details.
if (!allowOutOfOrderDelivery) {
Set<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(1);
NavigableSet<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(1);
if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty()) {
PositionImpl replayPosition = messagesToReplayNow.stream().findFirst().get();
PositionImpl replayPosition = messagesToReplayNow.first();

// We have received a message potentially from the delayed tracker and, since we're not using it
// right now, it needs to be added to the redelivery tracker or we won't attempt anymore to
// resend it (until we disconnect consumer).
Expand Down Expand Up @@ -435,13 +437,13 @@ private boolean removeConsumersFromRecentJoinedConsumers() {
}

@Override
protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
protected synchronized NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
if (isDispatcherStuckOnReplays) {
// If we're stuck on replay, we want to move forward reading on the topic (until the overall max-unacked
// messages kicks in), instead of keep replaying the same old messages, since the consumer that these
// messages are routing to might be busy at the moment
this.isDispatcherStuckOnReplays = false;
return Collections.emptySet();
return Collections.emptyNavigableSet();
} else {
return super.getMessagesToReplayNow(maxMessagesToRead);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.locks.ReadWriteLock;
Expand Down Expand Up @@ -95,7 +94,8 @@ public void removeUpTo(long item1, long item2) {
}


public <T> Set<T> items(int numberOfItems, LongPairSet.LongPairFunction<T> longPairConverter) {
public <T extends Comparable<T>> NavigableSet<T> items(int numberOfItems,
LongPairSet.LongPairFunction<T> longPairConverter) {
NavigableSet<T> items = new TreeSet<>();
lock.readLock().lock();
try {
Expand Down