diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 7cbf7bd2c787a..d9d0f6adc87ae 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -61,7 +61,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher private final AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false); protected final PersistentTopic topic; - protected final Executor topicExecutor; + protected final Executor executor; protected final String name; private Optional dispatchRateLimiter = Optional.empty(); @@ -79,7 +79,7 @@ public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType su super(subscriptionType, partitionIndex, topic.getName(), subscription, topic.getBrokerService().pulsar().getConfiguration(), cursor); this.topic = topic; - this.topicExecutor = topic.getBrokerService().getTopicOrderedExecutor().chooseThread(topicName); + this.executor = topic.getBrokerService().getTopicOrderedExecutor().chooseThread(); this.name = topic.getName() + " / " + (cursor.getName() != null ? Codec.decode(cursor.getName()) : ""/* NonDurableCursor doesn't have name */); this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize(); @@ -148,7 +148,7 @@ protected void cancelPendingRead() { @Override public void readEntriesComplete(final List entries, Object obj) { - topicExecutor.execute(() -> internalReadEntriesComplete(entries, obj)); + executor.execute(() -> internalReadEntriesComplete(entries, obj)); } public synchronized void internalReadEntriesComplete(final List entries, Object obj) { @@ -226,7 +226,7 @@ protected void dispatchEntriesToConsumer(Consumer currentConsumer, List e sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes()); // Schedule a new read batch operation only after the previous batch has been written to the socket. - topicExecutor.execute(() -> { + executor.execute(() -> { synchronized (PersistentDispatcherSingleActiveConsumer.this) { Consumer newConsumer = getActiveConsumer(); readMoreEntries(newConsumer); @@ -238,7 +238,7 @@ protected void dispatchEntriesToConsumer(Consumer currentConsumer, List e @Override public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) { - topicExecutor.execute(() -> internalConsumerFlow(consumer)); + executor.execute(() -> internalConsumerFlow(consumer)); } private synchronized void internalConsumerFlow(Consumer consumer) { @@ -267,7 +267,7 @@ private synchronized void internalConsumerFlow(Consumer consumer) { @Override public void redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) { - topicExecutor.execute(() -> internalRedeliverUnacknowledgedMessages(consumer, consumerEpoch)); + executor.execute(() -> internalRedeliverUnacknowledgedMessages(consumer, consumerEpoch)); } private synchronized void internalRedeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) { @@ -459,7 +459,7 @@ protected Pair calculateToRead(Consumer consumer) { @Override public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - topicExecutor.execute(() -> internalReadEntriesFailed(exception, ctx)); + executor.execute(() -> internalReadEntriesFailed(exception, ctx)); } private synchronized void internalReadEntriesFailed(ManagedLedgerException exception, Object ctx) { @@ -507,7 +507,7 @@ private synchronized void internalReadEntriesFailed(ManagedLedgerException excep topic.getBrokerService().executor().schedule(() -> { // Jump again into dispatcher dedicated thread - topicExecutor.execute(() -> { + executor.execute(() -> { synchronized (PersistentDispatcherSingleActiveConsumer.this) { Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this); // we should retry the read if we have an active consumer and there is no pending read