Skip to content

Commit

Permalink
[improve] [broker] Avoid repeated Read-and-discard when using Key_Sha…
Browse files Browse the repository at this point in the history
…red mode (#22245)
  • Loading branch information
poorbarcode authored Mar 29, 2024
1 parent a3bf4e8 commit e34ea62
Show file tree
Hide file tree
Showing 6 changed files with 477 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ private void removeFromHashBlocker(long ledgerId, long entryId) {
}
}

public Long getHash(long ledgerId, long entryId) {
LongPair value = hashesToBeBlocked.get(ledgerId, entryId);
if (value == null) {
return null;
}
return value.first;
}

public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
if (!allowOutOfOrderDelivery) {
List<LongPair> keysToRemove = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,24 +334,25 @@ public synchronized void readMoreEntries() {
}

NavigableSet<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);

if (!messagesToReplayNow.isEmpty()) {
NavigableSet<PositionImpl> messagesToReplayFiltered = filterOutEntriesWillBeDiscarded(messagesToReplayNow);
if (!messagesToReplayFiltered.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule replay of {} messages for {} consumers", name, messagesToReplayNow.size(),
consumerList.size());
log.debug("[{}] Schedule replay of {} messages for {} consumers", name,
messagesToReplayFiltered.size(), consumerList.size());
}

havePendingReplayRead = true;
minReplayedPosition = messagesToReplayNow.first();
Set<? extends Position> deletedMessages = topic.isDelayedDeliveryEnabled()
? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow);
? asyncReplayEntriesInOrder(messagesToReplayFiltered)
: asyncReplayEntries(messagesToReplayFiltered);
// clear already acked positions from replay bucket

deletedMessages.forEach(position -> redeliveryMessages.remove(((PositionImpl) position).getLedgerId(),
((PositionImpl) position).getEntryId()));
// if all the entries are acked-entries and cleared up from redeliveryMessages, try to read
// next entries as readCompletedEntries-callback was never called
if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) {
if ((messagesToReplayFiltered.size() - deletedMessages.size()) == 0) {
havePendingReplayRead = false;
readMoreEntriesAsync();
}
Expand All @@ -360,7 +361,7 @@ public synchronized void readMoreEntries() {
log.debug("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name,
totalUnackedMessages, topic.getMaxUnackedMessagesOnSubscription());
}
} else if (!havePendingRead) {
} else if (!havePendingRead && hasConsumersNeededNormalRead()) {
if (shouldPauseOnAckStatePersist(ReadType.Normal)) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping read for the topic, Due to blocked on ack state persistent.",
Expand Down Expand Up @@ -396,7 +397,16 @@ public synchronized void readMoreEntries() {
topic.getMaxReadPosition());
}
} else {
log.debug("[{}] Cannot schedule next read until previous one is done", name);
if (log.isDebugEnabled()) {
if (!messagesToReplayNow.isEmpty()) {
log.debug("[{}] [{}] Skipping read for the topic: because all entries in replay queue were"
+ " filtered out due to the mechanism of Key_Shared mode, and the left consumers have"
+ " no permits now",
topic.getName(), getSubscriptionName());
} else {
log.debug("[{}] Cannot schedule next read until previous one is done", name);
}
}
}
} else {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -1179,6 +1189,27 @@ protected synchronized NavigableSet<PositionImpl> getMessagesToReplayNow(int max
}
}

/**
* This is a mode method designed for Key_Shared mode.
* Filter out the entries that will be discarded due to the order guarantee mechanism of Key_Shared mode.
* This method is in order to avoid the scenario below:
* - Get positions from the Replay queue.
* - Read entries from BK.
* - The order guarantee mechanism of Key_Shared mode filtered out all the entries.
* - Delivery non entry to the client, but we did a BK read.
*/
protected NavigableSet<PositionImpl> filterOutEntriesWillBeDiscarded(NavigableSet<PositionImpl> src) {
return src;
}

/**
* This is a mode method designed for Key_Shared mode, to avoid unnecessary stuck.
* See detail {@link PersistentStickyKeyDispatcherMultipleConsumers#hasConsumersNeededNormalRead}.
*/
protected boolean hasConsumersNeededNormalRead() {
return true;
}

protected synchronized boolean shouldPauseDeliveryForDelayTracker() {
return delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().shouldPauseAllDeliveries();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.MapUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
Expand Down Expand Up @@ -165,6 +168,14 @@ protected Map<Consumer, List<Entry>> initialValue() throws Exception {
}
};

private static final FastThreadLocal<Map<Consumer, List<PositionImpl>>> localGroupedPositions =
new FastThreadLocal<Map<Consumer, List<PositionImpl>>>() {
@Override
protected Map<Consumer, List<PositionImpl>> initialValue() throws Exception {
return new HashMap<>();
}
};

@Override
protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List<Entry> entries) {
long totalMessagesSent = 0;
Expand Down Expand Up @@ -248,15 +259,9 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
assert consumer != null; // checked when added to groupedEntries
List<Entry> entriesWithSameKey = current.getValue();
int entriesWithSameKeyCount = entriesWithSameKey.size();
int availablePermits = Math.max(consumer.getAvailablePermits(), 0);
if (consumer.getMaxUnackedMessages() > 0) {
int remainUnAckedMessages =
// Avoid negative number
Math.max(consumer.getMaxUnackedMessages() - consumer.getUnackedMessages(), 0);
availablePermits = Math.min(availablePermits, remainUnAckedMessages);
}
int maxMessagesForC = Math.min(entriesWithSameKeyCount, availablePermits);
int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC,
int availablePermits = getAvailablePermits(consumer);
int messagesForC = getRestrictedMaxEntriesForConsumer(consumer,
entriesWithSameKey.stream().map(Entry::getPosition).collect(Collectors.toList()), availablePermits,
readType, consumerStickyKeyHashesMap.get(consumer));
if (log.isDebugEnabled()) {
log.debug("[{}] select consumer {} with messages num {}, read type is {}",
Expand Down Expand Up @@ -289,7 +294,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(messagesForC);
totalEntries += filterEntriesForConsumer(entriesWithSameKey, batchSizes, sendMessageInfo,
batchIndexesAcks, cursor, readType == ReadType.Replay, consumer);

consumer.sendMessages(entriesWithSameKey, batchSizes, batchIndexesAcks,
sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
Expand Down Expand Up @@ -332,8 +336,9 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
return false;
}

private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> entries, int maxMessages,
ReadType readType, Set<Integer> stickyKeyHashes) {
private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<? extends Position> entries,
int availablePermits, ReadType readType, Set<Integer> stickyKeyHashes) {
int maxMessages = Math.min(entries.size(), availablePermits);
if (maxMessages == 0) {
return 0;
}
Expand Down Expand Up @@ -378,7 +383,7 @@ private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> en
// Here, the consumer is one that has recently joined, so we can only send messages that were
// published before it has joined.
for (int i = 0; i < maxMessages; i++) {
if (((PositionImpl) entries.get(i).getPosition()).compareTo(maxReadPosition) >= 0) {
if (((PositionImpl) entries.get(i)).compareTo(maxReadPosition) >= 0) {
// We have already crossed the divider line. All messages in the list are now
// newer than what we can currently dispatch to this consumer
return i;
Expand All @@ -405,6 +410,9 @@ && removeConsumersFromRecentJoinedConsumers()) {
}

private boolean removeConsumersFromRecentJoinedConsumers() {
if (MapUtils.isEmpty(recentlyJoinedConsumers)) {
return false;
}
Iterator<Map.Entry<Consumer, PositionImpl>> itr = recentlyJoinedConsumers.entrySet().iterator();
boolean hasConsumerRemovedFromTheRecentJoinedConsumers = false;
PositionImpl mdp = (PositionImpl) cursor.getMarkDeletedPosition();
Expand Down Expand Up @@ -437,6 +445,76 @@ protected synchronized NavigableSet<PositionImpl> getMessagesToReplayNow(int max
}
}

private int getAvailablePermits(Consumer c) {
int availablePermits = Math.max(c.getAvailablePermits(), 0);
if (c.getMaxUnackedMessages() > 0) {
// Avoid negative number
int remainUnAckedMessages = Math.max(c.getMaxUnackedMessages() - c.getUnackedMessages(), 0);
availablePermits = Math.min(availablePermits, remainUnAckedMessages);
}
return availablePermits;
}

@Override
protected synchronized NavigableSet<PositionImpl> filterOutEntriesWillBeDiscarded(NavigableSet<PositionImpl> src) {
if (src.isEmpty()) {
return src;
}
NavigableSet<PositionImpl> res = new TreeSet<>();
// Group positions.
final Map<Consumer, List<PositionImpl>> groupedPositions = localGroupedPositions.get();
groupedPositions.clear();
for (PositionImpl pos : src) {
Long stickyKeyHash = redeliveryMessages.getHash(pos.getLedgerId(), pos.getEntryId());
if (stickyKeyHash == null) {
res.add(pos);
continue;
}
Consumer c = selector.select(stickyKeyHash.intValue());
if (c == null) {
// Maybe using HashRangeExclusiveStickyKeyConsumerSelector.
continue;
}
groupedPositions.computeIfAbsent(c, k -> new ArrayList<>()).add(pos);
}
// Filter positions by the Recently Joined Position rule.
for (Map.Entry<Consumer, List<PositionImpl>> item : groupedPositions.entrySet()) {
int availablePermits = getAvailablePermits(item.getKey());
if (availablePermits == 0) {
continue;
}
int posCountToRead = getRestrictedMaxEntriesForConsumer(item.getKey(), item.getValue(), availablePermits,
ReadType.Replay, null);
if (posCountToRead > 0) {
res.addAll(item.getValue().subList(0, posCountToRead));
}
}
return res;
}

/**
* In Key_Shared mode, the consumer will not receive any entries from a normal reading if it is included in
* {@link #recentlyJoinedConsumers}, they can only receive entries from replay reads.
* If all entries in {@link #redeliveryMessages} have been filtered out due to the order guarantee mechanism,
* Broker need a normal read to make the consumers not included in @link #recentlyJoinedConsumers} will not be
* stuck. See https://github.com/apache/pulsar/pull/7105.
*/
@Override
protected boolean hasConsumersNeededNormalRead() {
for (Consumer consumer : consumerList) {
if (consumer == null || consumer.isBlocked()) {
continue;
}
if (recentlyJoinedConsumers.containsKey(consumer)) {
continue;
}
if (consumer.getAvailablePermits() > 0) {
return true;
}
}
return false;
}

@Override
public SubType getType() {
return SubType.Key_Shared;
Expand Down
Loading

0 comments on commit e34ea62

Please sign in to comment.