Skip to content

Commit

Permalink
[fix][broker]Prevent StackOverFlowException in SHARED subscription(a…
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao authored and Technoboy- committed Aug 16, 2022
1 parent 9e9234c commit bdaa059
Showing 1 changed file with 22 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
protected volatile PositionImpl minReplayedPosition = null;
protected boolean shouldRewindBeforeReadingOrReplaying = false;
protected final String name;
protected boolean sendInProgress;
protected volatile boolean sendInProgress;
protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
TOTAL_AVAILABLE_PERMITS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
Expand Down Expand Up @@ -244,6 +244,14 @@ private synchronized void internalConsumerFlow(Consumer consumer, int additional
readMoreEntries();
}

/**
* We should not call readMoreEntries() recursively in the same thread as there is a risk of StackOverflowError.
*
*/
public void readMoreEntiresAsync() {
topic.getBrokerService().executor().execute(() -> readMoreEntries());
}

public synchronized void readMoreEntries() {
if (sendInProgress) {
// we cannot read more entries while sending the previous batch
Expand Down Expand Up @@ -287,9 +295,7 @@ public synchronized void readMoreEntries() {
// next entries as readCompletedEntries-callback was never called
if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) {
havePendingReplayRead = false;
// We should not call readMoreEntries() recursively in the same thread
// as there is a risk of StackOverflowError
topic.getBrokerService().executor().execute(() -> readMoreEntries());
readMoreEntiresAsync();
}
} else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -544,24 +550,25 @@ public final synchronized void readEntriesComplete(List<Entry> entries, Object c
if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
// setting sendInProgress here, because sendMessagesToConsumers will be executed
// in a separate thread, and we want to prevent more reads
sendInProgress = true;
dispatchMessagesThread.execute(safeRun(() -> sendMessagesToConsumers(readType, entries)));
dispatchMessagesThread.execute(safeRun(() -> {
if (sendMessagesToConsumers(readType, entries)) {
readMoreEntries();
}
}));
} else {
sendMessagesToConsumers(readType, entries);
if (sendMessagesToConsumers(readType, entries)) {
readMoreEntiresAsync();
}
}
}

protected final synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
protected final synchronized boolean sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
sendInProgress = true;
boolean readMoreEntries;
try {
readMoreEntries = trySendMessagesToConsumers(readType, entries);
return trySendMessagesToConsumers(readType, entries);
} finally {
sendInProgress = false;
}
if (readMoreEntries) {
readMoreEntries();
}
}

/**
Expand Down Expand Up @@ -916,7 +923,7 @@ public void addUnAckedMessages(int numberOfMessages) {
if (maxUnackedMessages <= 0 && blockedDispatcherOnUnackedMsgs == TRUE
&& BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
log.info("[{}] Dispatcher is unblocked, since maxUnackedMessagesPerSubscription=0", name);
topic.getBrokerService().executor().execute(() -> readMoreEntries());
readMoreEntiresAsync();
}

int unAckedMessages = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this, numberOfMessages);
Expand All @@ -939,7 +946,7 @@ public void addUnAckedMessages(int numberOfMessages) {
// unblock dispatcher if it acks back enough messages
if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
log.info("[{}] Dispatcher is unblocked", name);
topic.getBrokerService().executor().execute(() -> readMoreEntries());
readMoreEntiresAsync();
}
}
// increment broker-level count
Expand Down

0 comments on commit bdaa059

Please sign in to comment.