Skip to content

Commit

Permalink
guarantee the mechanism provided by #7105
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Mar 25, 2024
1 parent 8671c62 commit 326c0cc
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -328,34 +328,25 @@ public synchronized void readMoreEntries() {
}

NavigableSet<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);

if (!messagesToReplayNow.isEmpty()) {
// In Key_Shared mode: It is possible that all resend messages will be discarded:
// - Partly because the target consumer does not have enough permits
// - Partly because of mechanism “recentJoinedConsumers”.
NavigableSet<PositionImpl> messagesToReplayNowFiltered =
filterOutMessagesWillBeDiscarded(messagesToReplayNow);
if (messagesToReplayNowFiltered.isEmpty()) {
// No messages can be delivery now.
return;
}
NavigableSet<PositionImpl> messagesToReplayFiltered = filterOutEntriesWillBeDiscarded(messagesToReplayNow);
if (!messagesToReplayFiltered.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule replay of {} messages for {} consumers", name,
messagesToReplayNowFiltered.size(), consumerList.size());
messagesToReplayFiltered.size(), consumerList.size());
}

havePendingReplayRead = true;
minReplayedPosition = messagesToReplayNow.first();
Set<? extends Position> deletedMessages = topic.isDelayedDeliveryEnabled()
? asyncReplayEntriesInOrder(messagesToReplayNowFiltered)
: asyncReplayEntries(messagesToReplayNowFiltered);
? asyncReplayEntriesInOrder(messagesToReplayFiltered)
: asyncReplayEntries(messagesToReplayFiltered);
// clear already acked positions from replay bucket

deletedMessages.forEach(position -> redeliveryMessages.remove(((PositionImpl) position).getLedgerId(),
((PositionImpl) position).getEntryId()));
// if all the entries are acked-entries and cleared up from redeliveryMessages, try to read
// next entries as readCompletedEntries-callback was never called
if ((messagesToReplayNowFiltered.size() - deletedMessages.size()) == 0) {
if ((messagesToReplayFiltered.size() - deletedMessages.size()) == 0) {
havePendingReplayRead = false;
readMoreEntriesAsync();
}
Expand All @@ -364,7 +355,7 @@ public synchronized void readMoreEntries() {
log.debug("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name,
totalUnackedMessages, topic.getMaxUnackedMessagesOnSubscription());
}
} else if (!havePendingRead) {
} else if (!havePendingRead && hasConsumersNeededNormalRead()) {
if (shouldPauseOnAckStatePersist(ReadType.Normal)) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping read for the topic, Due to blocked on ack state persistent.",
Expand Down Expand Up @@ -400,7 +391,16 @@ public synchronized void readMoreEntries() {
topic.getMaxReadPosition());
}
} else {
log.debug("[{}] Cannot schedule next read until previous one is done", name);
if (log.isDebugEnabled()) {
if (!messagesToReplayNow.isEmpty()) {
log.debug("[{}] [{}] Skipping read for the topic: because all entries in replay queue were"
+ " filtered out due to the mechanism of Key_Shared mode, and the left consumers have"
+ " no permits now",
topic.getName(), getSubscriptionName());
} else {
log.debug("[{}] Cannot schedule next read until previous one is done", name);
}
}
}
} else {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -1184,15 +1184,26 @@ protected synchronized NavigableSet<PositionImpl> getMessagesToReplayNow(int max
}

/**
* This is a mode method designed for Key_Shared mode.
* Filter out the entries that will be discarded due to the order guarantee mechanism of Key_Shared mode.
* This method is in order to avoid the scenario below:
* - Read entries from the Replay queue.
* - The Key_Shared anti-ordering mechanism filtered out all of the entries.
* - Get positions from the Replay queue.
* - Read entries from BK.
* - The order guarantee mechanism of Key_Shared mode filtered out all the entries.
* - Delivery non entry to the client, but we did a BK read.
*/
protected NavigableSet<PositionImpl> filterOutMessagesWillBeDiscarded(NavigableSet<PositionImpl> src) {
protected NavigableSet<PositionImpl> filterOutEntriesWillBeDiscarded(NavigableSet<PositionImpl> src) {
return src;
}

/**
* This is a mode method designed for Key_Shared mode, to avoid unnecessary stuck.
* See detail {@link PersistentStickyKeyDispatcherMultipleConsumers#hasConsumersNeededNormalRead}.
*/
protected boolean hasConsumersNeededNormalRead() {
return true;
}

protected synchronized boolean shouldPauseDeliveryForDelayTracker() {
return delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().shouldPauseAllDeliveries();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ private int getAvailablePermits(Consumer c) {
}

@Override
protected NavigableSet<PositionImpl> filterOutMessagesWillBeDiscarded(NavigableSet<PositionImpl> src) {
protected NavigableSet<PositionImpl> filterOutEntriesWillBeDiscarded(NavigableSet<PositionImpl> src) {
// Remove invalid items.
removeConsumersFromRecentJoinedConsumers();
if (MapUtils.isEmpty(recentlyJoinedConsumers) || src.isEmpty()) {
Expand Down Expand Up @@ -489,6 +489,29 @@ protected NavigableSet<PositionImpl> filterOutMessagesWillBeDiscarded(NavigableS
return res;
}

/**
* In Key_Shared mode, the consumer will not receive any entries from a normal reading if it is included in
* {@link #recentlyJoinedConsumers}, they can only receive entries from replay reads.
* If all entries in {@link #redeliveryMessages} have been filtered out due to the order guarantee mechanism,
* Broker need a normal read to make the consumers not included in @link #recentlyJoinedConsumers} will not be
* stuck. See https://github.com/apache/pulsar/pull/7105.
*/
@Override
protected boolean hasConsumersNeededNormalRead() {
for (Consumer consumer : consumerList) {
if (consumer == null || consumer.isBlocked()) {
continue;
}
if (recentlyJoinedConsumers.containsKey(consumer)) {
continue;
}
if (consumer.getAvailablePermits() > 0) {
return true;
}
}
return false;
}

@Override
public SubType getType() {
return SubType.Key_Shared;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1757,7 +1757,7 @@ public void testNoRepeatedReadAndDiscard() throws Exception {
*/
@Test(timeOut = 180 * 1000) // the test will be finished in 60s.
public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception {
final int messagesSentCount = 100;
final int messagesSentPerTime = 100;
final Set<Integer> totalReceivedMessages = new TreeSet<>();
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
final String subName = "my-sub";
Expand All @@ -1767,14 +1767,13 @@ public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception {
// Send messages.
@Cleanup
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32).topic(topic).enableBatching(false).create();
for (int i = 0; i < messagesSentCount; i++) {
for (int i = 0; i < messagesSentPerTime; i++) {
MessageId messageId = producer.newMessage()
.key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
.value(100 + i)
.send();
log.info("Published delayed message :{}", messageId);
log.info("Published message :{}", messageId);
}
producer.close();

// 1. Start 3 consumers and make ack holes.
// - one consumer will be closed and trigger a messages redeliver.
Expand Down Expand Up @@ -1849,6 +1848,17 @@ public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception {
.subscribe();
consumerWillBeClose.close();

Thread.sleep(2000);

for (int i = messagesSentPerTime; i < messagesSentPerTime * 2; i++) {
MessageId messageId = producer.newMessage()
.key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
.value(100 + i)
.send();
log.info("Published message :{}", messageId);
}

// Send messages again.
// Verify: "consumerAlwaysAck" can receive messages util the cursor.readerPosition is larger than LAC.
while (true) {
Message<Integer> msg = consumerAlwaysAck.receive(2, TimeUnit.SECONDS);
Expand All @@ -1865,22 +1875,25 @@ public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception {
log.info("cursor_readPosition {}, LAC {}", cursor.getReadPosition(), managedLedger.getLastConfirmedEntry());
assertTrue(((PositionImpl) cursor.getReadPosition())
.compareTo((PositionImpl) managedLedger.getLastConfirmedEntry()) > 0);

// Make all consumers to start to read and acknowledge messages.
// Verify: no repeated Read-and-discard.
Thread.sleep(5 * 1000);
int maxReplayCount = messagesSentCount * 2;
int maxReplayCount = messagesSentPerTime * 2;
log.info("Reply read count: {}", replyReadCounter.get());
assertTrue(replyReadCounter.get() < maxReplayCount);
// Verify: at last, all messages will be received.
ReceivedMessages<Integer> receivedMessages = ackAllMessages(consumerAlwaysAck, consumerStuck, consumer4);
totalReceivedMessages.addAll(receivedMessages.messagesReceived.stream().map(p -> p.getRight()).collect(
Collectors.toList()));
assertEquals(totalReceivedMessages.size(), messagesSentCount);
assertEquals(totalReceivedMessages.size(), messagesSentPerTime * 2);

// cleanup.
consumer1.close();
consumer2.close();
consumer3.close();
consumer4.close();
producer.close();
admin.topics().delete(topic, false);
}
}

0 comments on commit 326c0cc

Please sign in to comment.