diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 0f43eb6c5ccbb..637ede8a41f08 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -74,6 +74,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher protected volatile int readBatchSize; protected final Backoff readFailureBackoff; private volatile ScheduledFuture readOnActiveConsumerTask = null; + private final Object lockForReadOnActiveConsumerTask = new Object(); private final RedeliveryTracker redeliveryTracker; @@ -123,18 +124,23 @@ protected void scheduleReadOnActiveConsumer() { return; } - readOnActiveConsumerTask = topic.getBrokerService().executor().schedule(() -> { - if (log.isDebugEnabled()) { - log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name, - serviceConfig.getActiveConsumerFailoverDelayTimeMillis()); + synchronized (lockForReadOnActiveConsumerTask) { + if (readOnActiveConsumerTask != null) { + return; } - Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this); - cursor.rewind(activeConsumer != null && activeConsumer.readCompacted()); + readOnActiveConsumerTask = topic.getBrokerService().executor().schedule(() -> { + if (log.isDebugEnabled()) { + log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name, + serviceConfig.getActiveConsumerFailoverDelayTimeMillis()); + } + Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this); + cursor.rewind(activeConsumer != null && activeConsumer.readCompacted()); - notifyActiveConsumerChanged(activeConsumer); - readMoreEntries(activeConsumer); - readOnActiveConsumerTask = null; - }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS); + notifyActiveConsumerChanged(activeConsumer); + readMoreEntries(activeConsumer); + readOnActiveConsumerTask = null; + }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS); + } } @Override