From 89e5cb45ecde066588d5bacac072d208d66b2c90 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Wed, 10 Aug 2022 19:11:15 +0800 Subject: [PATCH] [cleanup][broker] Follow up on #16968 to restore some behavior in PersistentDispatcherMultipleConsumers class (#17018) --- .../PersistentDispatcherMultipleConsumers.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 4887f6b0541d90..ea3b9bbe54064e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -93,7 +93,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul protected volatile PositionImpl minReplayedPosition = null; protected boolean shouldRewindBeforeReadingOrReplaying = false; protected final String name; - protected volatile boolean sendInProgress; + protected boolean sendInProgress; protected static final AtomicIntegerFieldUpdater TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, @@ -248,8 +248,8 @@ private synchronized void internalConsumerFlow(Consumer consumer, int additional * We should not call readMoreEntries() recursively in the same thread as there is a risk of StackOverflowError. * */ - public void readMoreEntiresAsync() { - topic.getBrokerService().executor().execute(() -> readMoreEntries()); + public void readMoreEntriesAsync() { + topic.getBrokerService().executor().execute(this::readMoreEntries); } public synchronized void readMoreEntries() { @@ -295,7 +295,7 @@ public synchronized void readMoreEntries() { // next entries as readCompletedEntries-callback was never called if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) { havePendingReplayRead = false; - readMoreEntiresAsync(); + readMoreEntriesAsync(); } } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) { if (log.isDebugEnabled()) { @@ -550,6 +550,7 @@ public final synchronized void readEntriesComplete(List entries, Object c if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) { // setting sendInProgress here, because sendMessagesToConsumers will be executed // in a separate thread, and we want to prevent more reads + sendInProgress = true; dispatchMessagesThread.execute(safeRun(() -> { if (sendMessagesToConsumers(readType, entries)) { readMoreEntries(); @@ -557,7 +558,7 @@ public final synchronized void readEntriesComplete(List entries, Object c })); } else { if (sendMessagesToConsumers(readType, entries)) { - readMoreEntiresAsync(); + readMoreEntriesAsync(); } } } @@ -923,7 +924,7 @@ public void addUnAckedMessages(int numberOfMessages) { if (maxUnackedMessages <= 0 && blockedDispatcherOnUnackedMsgs == TRUE && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) { log.info("[{}] Dispatcher is unblocked, since maxUnackedMessagesPerSubscription=0", name); - readMoreEntiresAsync(); + readMoreEntriesAsync(); } int unAckedMessages = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this, numberOfMessages); @@ -946,7 +947,7 @@ public void addUnAckedMessages(int numberOfMessages) { // unblock dispatcher if it acks back enough messages if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) { log.info("[{}] Dispatcher is unblocked", name); - readMoreEntiresAsync(); + readMoreEntriesAsync(); } } // increment broker-level count