From bdaa059335c90c37f898122af272629a54a85ab6 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Tue, 9 Aug 2022 01:30:46 +0800 Subject: [PATCH] [fix][broker]Prevent `StackOverFlowException` in SHARED subscription(#16968) --- ...PersistentDispatcherMultipleConsumers.java | 37 +++++++++++-------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index a02b76c9aed474..4887f6b0541d90 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -93,7 +93,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul protected volatile PositionImpl minReplayedPosition = null; protected boolean shouldRewindBeforeReadingOrReplaying = false; protected final String name; - protected boolean sendInProgress; + protected volatile boolean sendInProgress; protected static final AtomicIntegerFieldUpdater TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, @@ -244,6 +244,14 @@ private synchronized void internalConsumerFlow(Consumer consumer, int additional readMoreEntries(); } + /** + * We should not call readMoreEntries() recursively in the same thread as there is a risk of StackOverflowError. + * + */ + public void readMoreEntiresAsync() { + topic.getBrokerService().executor().execute(() -> readMoreEntries()); + } + public synchronized void readMoreEntries() { if (sendInProgress) { // we cannot read more entries while sending the previous batch @@ -287,9 +295,7 @@ public synchronized void readMoreEntries() { // next entries as readCompletedEntries-callback was never called if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) { havePendingReplayRead = false; - // We should not call readMoreEntries() recursively in the same thread - // as there is a risk of StackOverflowError - topic.getBrokerService().executor().execute(() -> readMoreEntries()); + readMoreEntiresAsync(); } } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) { if (log.isDebugEnabled()) { @@ -544,24 +550,25 @@ public final synchronized void readEntriesComplete(List entries, Object c if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) { // setting sendInProgress here, because sendMessagesToConsumers will be executed // in a separate thread, and we want to prevent more reads - sendInProgress = true; - dispatchMessagesThread.execute(safeRun(() -> sendMessagesToConsumers(readType, entries))); + dispatchMessagesThread.execute(safeRun(() -> { + if (sendMessagesToConsumers(readType, entries)) { + readMoreEntries(); + } + })); } else { - sendMessagesToConsumers(readType, entries); + if (sendMessagesToConsumers(readType, entries)) { + readMoreEntiresAsync(); + } } } - protected final synchronized void sendMessagesToConsumers(ReadType readType, List entries) { + protected final synchronized boolean sendMessagesToConsumers(ReadType readType, List entries) { sendInProgress = true; - boolean readMoreEntries; try { - readMoreEntries = trySendMessagesToConsumers(readType, entries); + return trySendMessagesToConsumers(readType, entries); } finally { sendInProgress = false; } - if (readMoreEntries) { - readMoreEntries(); - } } /** @@ -916,7 +923,7 @@ public void addUnAckedMessages(int numberOfMessages) { if (maxUnackedMessages <= 0 && blockedDispatcherOnUnackedMsgs == TRUE && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) { log.info("[{}] Dispatcher is unblocked, since maxUnackedMessagesPerSubscription=0", name); - topic.getBrokerService().executor().execute(() -> readMoreEntries()); + readMoreEntiresAsync(); } int unAckedMessages = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this, numberOfMessages); @@ -939,7 +946,7 @@ public void addUnAckedMessages(int numberOfMessages) { // unblock dispatcher if it acks back enough messages if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) { log.info("[{}] Dispatcher is unblocked", name); - topic.getBrokerService().executor().execute(() -> readMoreEntries()); + readMoreEntiresAsync(); } } // increment broker-level count