Skip to content

Commit

Permalink
[fix][broker] Support loadBalancerSheddingIntervalMinutes dynamic con…
Browse files Browse the repository at this point in the history
…figuration (apache#16408)

(cherry picked from commit e4dcf5a)
(cherry picked from commit 18cd7f4)
  • Loading branch information
lordcheng10 authored and nicoloboschi committed Sep 2, 2022
1 parent 013cb01 commit 3275c82
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private LedgerOffloaderStats offloaderStats;
private Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = new ConcurrentHashMap<>();
private ScheduledFuture<?> loadReportTask = null;
private ScheduledFuture<?> loadSheddingTask = null;
private LoadSheddingTask loadSheddingTask = null;
private ScheduledFuture<?> loadResourceQuotaTask = null;
private final AtomicReference<LoadManager> loadManager = new AtomicReference<>();
private PulsarAdmin adminClient = null;
Expand Down Expand Up @@ -1006,20 +1006,17 @@ protected void startLeaderElectionService() {
if (state == LeaderElectionState.Leading) {
LOG.info("This broker was elected leader");
if (getConfiguration().isLoadBalancerEnabled()) {
long loadSheddingInterval = TimeUnit.MINUTES
.toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
long resourceQuotaUpdateInterval = TimeUnit.MINUTES
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());

if (loadSheddingTask != null) {
loadSheddingTask.cancel(false);
loadSheddingTask.cancel();
}
if (loadResourceQuotaTask != null) {
loadResourceQuotaTask.cancel(false);
}
loadSheddingTask = loadManagerExecutor.scheduleAtFixedRate(
new LoadSheddingTask(loadManager),
loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
loadSheddingTask = new LoadSheddingTask(loadManager, loadManagerExecutor, config);
loadSheddingTask.start();
loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval,
resourceQuotaUpdateInterval, TimeUnit.MILLISECONDS);
Expand All @@ -1030,7 +1027,7 @@ protected void startLeaderElectionService() {
leaderElectionService.getCurrentLeader());
}
if (loadSheddingTask != null) {
loadSheddingTask.cancel(false);
loadSheddingTask.cancel();
loadSheddingTask = null;
}
if (loadResourceQuotaTask != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
*/
package org.apache.pulsar.broker.loadbalance;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,9 +31,18 @@
public class LoadSheddingTask implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(LoadSheddingTask.class);
private final AtomicReference<LoadManager> loadManager;
private final ScheduledExecutorService loadManagerExecutor;

public LoadSheddingTask(AtomicReference<LoadManager> loadManager) {
private final ServiceConfiguration config;

private volatile boolean isCancel = false;

public LoadSheddingTask(AtomicReference<LoadManager> loadManager,
ScheduledExecutorService loadManagerExecutor,
ServiceConfiguration config) {
this.loadManager = loadManager;
this.loadManagerExecutor = loadManagerExecutor;
this.config = config;
}

@Override
Expand All @@ -39,6 +51,26 @@ public void run() {
loadManager.get().doLoadShedding();
} catch (Exception e) {
LOG.warn("Error during the load shedding", e);
} finally {
if (!isCancel && loadManagerExecutor != null && config != null) {
loadManagerExecutor.schedule(
new LoadSheddingTask(loadManager, loadManagerExecutor, config),
config.getLoadBalancerSheddingIntervalMinutes(),
TimeUnit.MINUTES);
}
}
}

public void start() {
if (loadManagerExecutor != null && config != null) {
loadManagerExecutor.schedule(
new LoadSheddingTask(loadManager, loadManagerExecutor, config),
config.getLoadBalancerSheddingIntervalMinutes(),
TimeUnit.MINUTES);
}
}

public void cancel() {
isCancel = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,20 @@ public void testUpdateDynamicCacheConfigurationWithZkWatch() throws Exception {
.toNanos(2000));
}

@Test
public void testUpdateDynamicLoadBalancerSheddingIntervalMinutes() throws Exception {
// update configuration
admin.brokers().updateDynamicConfiguration("loadBalancerSheddingIntervalMinutes", "10");

// wait config to be updated
Awaitility.await().until(() -> {
return conf.getLoadBalancerSheddingIntervalMinutes() == 10;
});

// verify value is updated
assertEquals(conf.getLoadBalancerSheddingIntervalMinutes(), 10);
}

/**
* <pre>
* Verifies: zk-update configuration updates service-config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ public void testTask() throws Exception {
task1.run();
verify(loadManager, times(1)).writeResourceQuotasToZooKeeper();

LoadSheddingTask task2 = new LoadSheddingTask(atomicLoadManager);
LoadSheddingTask task2 = new LoadSheddingTask(atomicLoadManager, null, null);
task2.run();
verify(loadManager, times(1)).doLoadShedding();
}
Expand Down

0 comments on commit 3275c82

Please sign in to comment.