diff --git a/conf/broker.conf b/conf/broker.conf index 125b2aa8c1b39..617e202e5ec65 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -489,12 +489,12 @@ dispatcherReadFailureBackoffMandatoryStopTimeInMs=0 # On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered # out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff # delay. This parameter sets the initial backoff delay in milliseconds. -dispatcherRetryBackoffInitialTimeInMs=100 +dispatcherRetryBackoffInitialTimeInMs=1 # On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered # out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff # delay. This parameter sets the maximum backoff delay in milliseconds. -dispatcherRetryBackoffMaxTimeInMs=1000 +dispatcherRetryBackoffMaxTimeInMs=10 # Precise dispatcher flow control according to history message number of each entry preciseDispatcherFlowControl=false diff --git a/conf/standalone.conf b/conf/standalone.conf index 622949bf6c325..535800a43f3e0 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -305,12 +305,12 @@ dispatcherReadFailureBackoffMandatoryStopTimeInMs=0 # On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered # out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff # delay. This parameter sets the initial backoff delay in milliseconds. -dispatcherRetryBackoffInitialTimeInMs=100 +dispatcherRetryBackoffInitialTimeInMs=1 # On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered # out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff # delay. This parameter sets the maximum backoff delay in milliseconds. -dispatcherRetryBackoffMaxTimeInMs=1000 +dispatcherRetryBackoffMaxTimeInMs=10 # Precise dispatcher flow control according to history message number of each entry preciseDispatcherFlowControl=false diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 486587ec174a0..33b4fbff5f5bb 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1231,14 +1231,14 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece doc = "On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered " + "out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff " + "delay. This parameter sets the initial backoff delay in milliseconds.") - private int dispatcherRetryBackoffInitialTimeInMs = 100; + private int dispatcherRetryBackoffInitialTimeInMs = 1; @FieldContext( category = CATEGORY_POLICIES, doc = "On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered " + "out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff " + "delay. This parameter sets the maximum backoff delay in milliseconds.") - private int dispatcherRetryBackoffMaxTimeInMs = 1000; + private int dispatcherRetryBackoffMaxTimeInMs = 10; @FieldContext( dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 450a446c85a78..8fdb65e7b3076 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -729,11 +729,13 @@ private synchronized void handleSendingMessagesAndReadingMore(ReadType readType, boolean triggerReadingMore = sendMessagesToConsumers(readType, entries, needAcquireSendInProgress); int entriesDispatched = lastNumberOfEntriesDispatched; updatePendingBytesToDispatch(-totalBytesSize); + if (entriesDispatched > 0) { + // Reset the backoff when we successfully dispatched messages + retryBackoff.reset(); + } if (triggerReadingMore) { if (entriesDispatched > 0 || skipNextBackoff) { skipNextBackoff = false; - // Reset the backoff when we successfully dispatched messages - retryBackoff.reset(); // Call readMoreEntries in the same thread to trigger the next read readMoreEntries(); } else if (entriesDispatched == 0) {