Skip to content

Commit

Permalink
[Branch-2.7] Fix broker dispatch byte rate limiter (#11249)
Browse files Browse the repository at this point in the history
The PR #11135 couldn't be cherry-picked to branch-2.7, because there are too many conflicts.

## Motivation

fix #11044
now dispatcher byte rate limit don't limit every cursor read. When cursor read always use `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read. It will cause that  dispatcher read entries by `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read every time.

## implement

when cursor read entries size need to calculate, the calculate result by dispatcher bytes limiter.
  • Loading branch information
gaoran10 authored Jul 9, 2021
1 parent 732395f commit 0c6e809
Show file tree
Hide file tree
Showing 10 changed files with 89 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.proto.PulsarApi;
Expand All @@ -48,8 +49,11 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {

protected final Subscription subscription;

protected AbstractBaseDispatcher(Subscription subscription) {
protected final ServiceConfiguration serviceConfig;

protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) {
this.subscription = subscription;
this.serviceConfig = serviceConfig;
}

/**
Expand Down Expand Up @@ -235,4 +239,18 @@ private void handleTxnAbortMarker(Entry entry) {
}
});
}

protected static Pair<Integer, Long> computeReadLimits(int messagesToRead, int availablePermitsOnMsg,
long bytesToRead, long availablePermitsOnByte) {
if (availablePermitsOnMsg > 0) {
messagesToRead = Math.min(messagesToRead, availablePermitsOnMsg);
}

if (availablePermitsOnByte > 0) {
bytesToRead = Math.min(bytesToRead, availablePermitsOnByte);
}

return Pair.of(messagesToRead, bytesToRead);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.slf4j.Logger;
Expand All @@ -47,8 +48,8 @@ public abstract class AbstractDispatcherMultipleConsumers extends AbstractBaseDi

private Random random = new Random(42);

protected AbstractDispatcherMultipleConsumers(Subscription subscription) {
super(subscription);
protected AbstractDispatcherMultipleConsumers(Subscription subscription, ServiceConfiguration serviceConfig) {
super(subscription, serviceConfig);
}

public boolean isConsumerConnected() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
Expand Down Expand Up @@ -56,8 +57,8 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
private volatile int isClosed = FALSE;

public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
String topicName, Subscription subscription) {
super(subscription);
String topicName, Subscription subscription, ServiceConfiguration serviceConfig) {
super(subscription, serviceConfig);
this.topicName = topicName;
this.consumers = new CopyOnWriteArrayList<>();
this.partitionIndex = partitionIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
Expand Down Expand Up @@ -56,16 +55,14 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
@SuppressWarnings("unused")
private volatile int totalAvailablePermits = 0;

private final ServiceConfiguration serviceConfig;
private final RedeliveryTracker redeliveryTracker;

public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription) {
super(subscription);
super(subscription, topic.getBrokerService().pulsar().getConfiguration());
this.topic = topic;
this.subscription = subscription;
this.name = topic.getName() + " / " + subscription.getName();
this.msgDrop = new Rate();
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.List;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
Expand All @@ -47,16 +46,15 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
private final NonPersistentTopic topic;
private final Rate msgDrop;
private final Subscription subscription;
private final ServiceConfiguration serviceConfig;
private final RedeliveryTracker redeliveryTracker;

public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
NonPersistentTopic topic, Subscription subscription) {
super(subscriptionType, partitionIndex, topic.getName(), subscription);
super(subscriptionType, partitionIndex, topic.getName(), subscription,
topic.getBrokerService().pulsar().getConfiguration());
this.topic = topic;
this.subscription = subscription;
this.msgDrop = new Rate();
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,15 @@ private static boolean isDispatchRateEnabled(DispatchRate dispatchRate) {
|| dispatchRate.dispatchThrottlingRateInByte > 0);
}

/**
* returns available byte-permit if msg-dispatch-throttling is enabled else it returns -1.
*
* @return
*/
public long getAvailableDispatchRateLimitOnByte() {
return dispatchRateLimiterOnByte == null ? -1 : dispatchRateLimiterOnByte.getAvailablePermits();
}

public void close() {
// close rate-limiter
if (dispatchRateLimiterOnMessage != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
Expand Down Expand Up @@ -103,16 +103,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
private volatile int blockedDispatcherOnUnackedMsgs = FALSE;
private static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "blockedDispatcherOnUnackedMsgs");
protected final ServiceConfiguration serviceConfig;
protected Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();

enum ReadType {
Normal, Replay
}

public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription) {
super(subscription);
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
super(subscription, topic.getBrokerService().pulsar().getConfiguration());
this.cursor = cursor;
this.lastIndividualDeletedRangeFromCursorRecovery = cursor.getLastIndividualDeletedRange();
this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
Expand Down Expand Up @@ -250,6 +248,7 @@ public synchronized void readMoreEntries() {
int currentTotalAvailablePermits = totalAvailablePermits;
if (currentTotalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
int messagesToRead = Math.min(currentTotalAvailablePermits, readBatchSize);
long bytesToRead = serviceConfig.getDispatcherMaxReadSizeBytes();

Consumer c = getRandomConsumer();
// if turn on precise dispatcher flow control, adjust the record to read
Expand Down Expand Up @@ -281,11 +280,12 @@ public synchronized void readMoreEntries() {
TimeUnit.MILLISECONDS);
return;
} else {
// if dispatch-rate is in msg then read only msg according to available permit
long availablePermitsOnMsg = topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
if (availablePermitsOnMsg > 0) {
messagesToRead = Math.min(messagesToRead, (int) availablePermitsOnMsg);
}
Pair<Integer, Long> calculateResult = computeReadLimits(
messagesToRead, (int) topicRateLimiter.getAvailableDispatchRateLimitOnMsg(),
bytesToRead, topicRateLimiter.getAvailableDispatchRateLimitOnByte());

messagesToRead = calculateResult.getLeft();
bytesToRead = calculateResult.getRight();
}
}

Expand All @@ -300,11 +300,11 @@ public synchronized void readMoreEntries() {
TimeUnit.MILLISECONDS);
return;
} else {
// if dispatch-rate is in msg then read only msg according to available permit
long availablePermitsOnMsg = dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
if (availablePermitsOnMsg > 0) {
messagesToRead = Math.min(messagesToRead, (int) availablePermitsOnMsg);
}
Pair<Integer, Long> calculateResult = computeReadLimits(
messagesToRead, (int) dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg(),
bytesToRead, dispatchRateLimiter.get().getAvailableDispatchRateLimitOnByte());
messagesToRead = calculateResult.getLeft();
bytesToRead = calculateResult.getRight();
}
}

Expand All @@ -319,6 +319,7 @@ public synchronized void readMoreEntries() {

// If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
messagesToRead = Math.max(messagesToRead, 1);
bytesToRead = Math.max(bytesToRead, 1);
Set<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);

if (!messagesToReplayNow.isEmpty()) {
Expand Down Expand Up @@ -351,8 +352,7 @@ public synchronized void readMoreEntries() {
consumerList.size());
}
havePendingRead = true;
cursor.asyncReadEntriesOrWait(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(), this,
ReadType.Normal);
cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this, ReadType.Normal);
} else {
log.debug("[{}] Cannot schedule next read until previous one is done", name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
Expand Down Expand Up @@ -74,7 +74,6 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp

private volatile int readBatchSize;
private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
private final ServiceConfiguration serviceConfig;
private volatile ScheduledFuture<?> readOnActiveConsumerTask = null;

LongPairSet messagesToRedeliver;
Expand All @@ -83,12 +82,12 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp

public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex,
PersistentTopic topic, Subscription subscription) {
super(subscriptionType, partitionIndex, topic.getName(), subscription);
super(subscriptionType, partitionIndex, topic.getName(), subscription,
topic.getBrokerService().pulsar().getConfiguration());
this.topic = topic;
this.name = topic.getName() + " / " + (cursor.getName() != null ? Codec.decode(cursor.getName())
: ""/* NonDurableCursor doesn't have name */);
this.cursor = cursor;
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
Expand Down Expand Up @@ -383,6 +382,7 @@ protected void readMoreEntries(Consumer consumer) {
}

int messagesToRead = Math.min(availablePermits, readBatchSize);
long bytesToRead = serviceConfig.getDispatcherMaxReadSizeBytes();
// if turn of precise dispatcher flow control, adjust the records to read
if (consumer.isPreciseDispatcherFlowControl()) {
int avgMessagesPerEntry = consumer.getAvgMessagesPerEntry();
Expand Down Expand Up @@ -415,11 +415,12 @@ protected void readMoreEntries(Consumer consumer) {
}, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
return;
} else {
// if dispatch-rate is in msg then read only msg according to available permit
long availablePermitsOnMsg = topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
if (availablePermitsOnMsg > 0) {
messagesToRead = Math.min(messagesToRead, (int) availablePermitsOnMsg);
}
Pair<Integer, Long> calculateResult = computeReadLimits(
messagesToRead, (int) topicRateLimiter.getAvailableDispatchRateLimitOnMsg(),
bytesToRead, topicRateLimiter.getAvailableDispatchRateLimitOnByte());

messagesToRead = calculateResult.getLeft();
bytesToRead = calculateResult.getRight();
}
}

Expand All @@ -443,18 +444,19 @@ protected void readMoreEntries(Consumer consumer) {
}, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
return;
} else {
// if dispatch-rate is in msg then read only msg according to available permit
long subPermitsOnMsg = dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
if (subPermitsOnMsg > 0) {
messagesToRead = Math.min(messagesToRead, (int) subPermitsOnMsg);
}
Pair<Integer, Long> calculateResult = computeReadLimits(
messagesToRead, (int) dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg(),
bytesToRead, dispatchRateLimiter.get().getAvailableDispatchRateLimitOnByte());

messagesToRead = calculateResult.getLeft();
bytesToRead = calculateResult.getRight();
}
}
}

// If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
messagesToRead = Math.max(messagesToRead, 1);

bytesToRead = Math.max(bytesToRead, 1);
// Schedule read
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
Expand All @@ -479,7 +481,7 @@ protected void readMoreEntries(Consumer consumer) {
} else if (consumer.readCompacted()) {
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer);
} else {
cursor.asyncReadEntriesOrWait(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(), this, consumer);
cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this, consumer);
}
} else {
if (log.isDebugEnabled()) {
Expand Down
Loading

0 comments on commit 0c6e809

Please sign in to comment.