Skip to content

Commit

Permalink
Issue #3803: Make ManagedLedger read batch size configurable (#3808)
Browse files Browse the repository at this point in the history
*Motivation*

Fixes #3803

Hardcoding is a very bad practice. It means we have no way to alter system behavior
when production issues occur.

*Modifications*

introduce a few read batch related settings to make them configurable
  • Loading branch information
sijie authored and merlimat committed Mar 12, 2019
1 parent c5e3baa commit 0ce297c
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 19 deletions.
11 changes: 11 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,17 @@ dispatchThrottlingRatePerSubscribeInByte=0
# backlog.
dispatchThrottlingOnNonBacklogConsumerEnabled=true

# Max number of entries to read from bookkeeper. By default it is 100 entries.
dispatcherMaxReadBatchSize=100

# Min number of entries to read from bookkeeper. By default it is 1 entries.
# When there is an error occurred on reading entries from bookkeeper, the broker
# will backoff the batch size to this minimum number."
dispatcherMinReadBatchSize=1

# Max number of entries to dispatch for a shared subscription. By default it is 20 entries.
dispatcherMaxRoundRobinBatchSize=20

# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
maxConcurrentLookupRequest=50000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,31 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ " published messages and don't have backlog. This enables dispatch-throttling for "
+ " non-backlog consumers as well.")
private boolean dispatchThrottlingOnNonBacklogConsumerEnabled = false;

// <-- dispatcher read settings -->
@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
doc = "Max number of entries to read from bookkeeper. By default it is 100 entries."
)
private int dispatcherMaxReadBatchSize = 100;

@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
doc = "Min number of entries to read from bookkeeper. By default it is 1 entries."
+ "When there is an error occurred on reading entries from bookkeeper, the broker"
+ " will backoff the batch size to this minimum number."
)
private int dispatcherMinReadBatchSize = 1;

@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
doc = "Max number of entries to dispatch for a shared subscription. By default it is 20 entries."
)
private int dispatcherMaxRoundRobinBatchSize = 20;

@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@
*/
public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers implements Dispatcher, ReadEntriesCallback {

private static final int MaxReadBatchSize = 100;
private static final int MaxRoundRobinBatchSize = 20;

private final PersistentTopic topic;
private final ManagedCursor cursor;

Expand Down Expand Up @@ -105,7 +102,7 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso
this.redeliveryTracker = this.serviceConfig.isSubscriptionRedeliveryTrackerEnabled()
? new InMemoryRedeliveryTracker()
: RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
this.readBatchSize = MaxReadBatchSize;
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.maxUnackedMessages = topic.getBrokerService().pulsar().getConfiguration()
.getMaxUnackedMessagesPerSubscription();
this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
Expand Down Expand Up @@ -386,8 +383,8 @@ public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
havePendingReplayRead = false;
}

if (readBatchSize < MaxReadBatchSize) {
int newReadBatchSize = Math.min(readBatchSize * 2, MaxReadBatchSize);
if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) {
int newReadBatchSize = Math.min(readBatchSize * 2, serviceConfig.getDispatcherMaxReadBatchSize());
if (log.isDebugEnabled()) {
log.debug("[{}] Increasing read batch size from {} to {}", name, readBatchSize, newReadBatchSize);
}
Expand Down Expand Up @@ -423,7 +420,9 @@ public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
}

// round-robin dispatch batch size for this consumer
int messagesForC = Math.min(Math.min(entriesToDispatch, c.getAvailablePermits()), MaxRoundRobinBatchSize);
int messagesForC = Math.min(
Math.min(entriesToDispatch, c.getAvailablePermits()),
serviceConfig.getDispatcherMaxRoundRobinBatchSize());

if (messagesForC > 0) {

Expand Down Expand Up @@ -511,7 +510,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
}
}

readBatchSize = 1;
readBatchSize = serviceConfig.getDispatcherMinReadBatchSize();

topic.getBrokerService().executor().schedule(() -> {
synchronized (PersistentDispatcherMultipleConsumers.this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp

private volatile boolean havePendingRead = false;

private static final int MaxReadBatchSize = 100;
private int readBatchSize;
private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
private final ServiceConfiguration serviceConfig;
Expand All @@ -74,8 +73,8 @@ public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType su
this.name = topic.getName() + " / " + (cursor.getName() != null ? Codec.decode(cursor.getName())
: ""/* NonDurableCursor doesn't have name */);
this.cursor = cursor;
this.readBatchSize = MaxReadBatchSize;
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
}
Expand Down Expand Up @@ -182,8 +181,8 @@ public synchronized void internalReadEntriesComplete(final List<Entry> entries,

havePendingRead = false;

if (readBatchSize < MaxReadBatchSize) {
int newReadBatchSize = Math.min(readBatchSize * 2, MaxReadBatchSize);
if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) {
int newReadBatchSize = Math.min(readBatchSize * 2, serviceConfig.getDispatcherMaxReadBatchSize());
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Increasing read batch size from {} to {}", name, readConsumer, readBatchSize,
newReadBatchSize);
Expand Down Expand Up @@ -451,7 +450,7 @@ private synchronized void internalReadEntriesFailed(ManagedLedgerException excep
checkNotNull(c);

// Reduce read batch size to avoid flooding bookies with retries
readBatchSize = 1;
readBatchSize = serviceConfig.getDispatcherMinReadBatchSize();

topic.getBrokerService().executor().schedule(() -> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
private final PersistentTopic topic;
private final ManagedCursor cursor;


private static final int MaxReadBatchSize = 100;
private int readBatchSize;

private final int producerQueueThreshold;
Expand Down Expand Up @@ -98,7 +96,9 @@ public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
PENDING_MESSAGES_UPDATER.set(this, 0);

readBatchSize = Math.min(producerQueueSize, MaxReadBatchSize);
readBatchSize = Math.min(
producerQueueSize,
topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize());
producerQueueThreshold = (int) (producerQueueSize * 0.9);

startProducer();
Expand Down Expand Up @@ -189,8 +189,9 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
entries.size());
}

if (readBatchSize < MaxReadBatchSize) {
int newReadBatchSize = Math.min(readBatchSize * 2, MaxReadBatchSize);
int maxReadBatchSize = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize();
if (readBatchSize < maxReadBatchSize) {
int newReadBatchSize = Math.min(readBatchSize * 2, maxReadBatchSize);
if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] Increasing read batch size from {} to {}", topicName, localCluster,
remoteCluster, readBatchSize, newReadBatchSize);
Expand Down Expand Up @@ -410,7 +411,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
}

// Reduce read batch size to avoid flooding bookies with retries
readBatchSize = 1;
readBatchSize = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMinReadBatchSize();

long waitTimeMillis = readFailureBackoff.next();

Expand Down

0 comments on commit 0ce297c

Please sign in to comment.