Skip to content

Commit

Permalink
[improve][broker] Choose random thread for consumerFlow in Persistent…
Browse files Browse the repository at this point in the history
…DispatcherSingleActiveConsumer (#20522)

### Motivation

Currently, all subscriptions of one topic will do `consuemrFlow` action in a single thread, which is chosen by topicName:
```
this.topicExecutor = topic.getBrokerService().getTopicOrderedExecutor().chooseThread(topicName);
```

If there is a large number of subscriptions in a topic,  all the work will focus on one thread ---- the chosen thread, which will reduce the consume performance.  So this this patch , I'd like to choose a ramdom thread for `consumerFlow` in `PersistentDispatcherSingleActiveConsumer` to improve the consume performance.

### Modifications

*  `topic.getBrokerService().getTopicOrderedExecutor().chooseThread(topicName);` -> `topic.getBrokerService().getTopicOrderedExecutor().chooseThread();`
*  `this.topicExecutor ` -> `this.executor`
  • Loading branch information
AnonHxy authored Jun 12, 2023
1 parent 8ffa2df commit 51c2bb4
Showing 1 changed file with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher

private final AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false);
protected final PersistentTopic topic;
protected final Executor topicExecutor;
protected final Executor executor;

protected final String name;
private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
Expand All @@ -79,7 +79,7 @@ public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType su
super(subscriptionType, partitionIndex, topic.getName(), subscription,
topic.getBrokerService().pulsar().getConfiguration(), cursor);
this.topic = topic;
this.topicExecutor = topic.getBrokerService().getTopicOrderedExecutor().chooseThread(topicName);
this.executor = topic.getBrokerService().getTopicOrderedExecutor().chooseThread();
this.name = topic.getName() + " / " + (cursor.getName() != null ? Codec.decode(cursor.getName())
: ""/* NonDurableCursor doesn't have name */);
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
Expand Down Expand Up @@ -148,7 +148,7 @@ protected void cancelPendingRead() {

@Override
public void readEntriesComplete(final List<Entry> entries, Object obj) {
topicExecutor.execute(() -> internalReadEntriesComplete(entries, obj));
executor.execute(() -> internalReadEntriesComplete(entries, obj));
}

public synchronized void internalReadEntriesComplete(final List<Entry> entries, Object obj) {
Expand Down Expand Up @@ -226,7 +226,7 @@ protected void dispatchEntriesToConsumer(Consumer currentConsumer, List<Entry> e
sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes());

// Schedule a new read batch operation only after the previous batch has been written to the socket.
topicExecutor.execute(() -> {
executor.execute(() -> {
synchronized (PersistentDispatcherSingleActiveConsumer.this) {
Consumer newConsumer = getActiveConsumer();
readMoreEntries(newConsumer);
Expand All @@ -238,7 +238,7 @@ protected void dispatchEntriesToConsumer(Consumer currentConsumer, List<Entry> e

@Override
public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
topicExecutor.execute(() -> internalConsumerFlow(consumer));
executor.execute(() -> internalConsumerFlow(consumer));
}

private synchronized void internalConsumerFlow(Consumer consumer) {
Expand Down Expand Up @@ -267,7 +267,7 @@ private synchronized void internalConsumerFlow(Consumer consumer) {

@Override
public void redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) {
topicExecutor.execute(() -> internalRedeliverUnacknowledgedMessages(consumer, consumerEpoch));
executor.execute(() -> internalRedeliverUnacknowledgedMessages(consumer, consumerEpoch));
}

private synchronized void internalRedeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) {
Expand Down Expand Up @@ -459,7 +459,7 @@ protected Pair<Integer, Long> calculateToRead(Consumer consumer) {

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
topicExecutor.execute(() -> internalReadEntriesFailed(exception, ctx));
executor.execute(() -> internalReadEntriesFailed(exception, ctx));
}

private synchronized void internalReadEntriesFailed(ManagedLedgerException exception, Object ctx) {
Expand Down Expand Up @@ -507,7 +507,7 @@ private synchronized void internalReadEntriesFailed(ManagedLedgerException excep
topic.getBrokerService().executor().schedule(() -> {

// Jump again into dispatcher dedicated thread
topicExecutor.execute(() -> {
executor.execute(() -> {
synchronized (PersistentDispatcherSingleActiveConsumer.this) {
Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
// we should retry the read if we have an active consumer and there is no pending read
Expand Down

0 comments on commit 51c2bb4

Please sign in to comment.