diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 76f4c192785270..7933ccd46953eb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -26,6 +26,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -94,6 +95,9 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { private TopBundleLoadDataReporter topBundleLoadDataReporter; + private ScheduledFuture brokerLoadDataReportTask; + private ScheduledFuture topBundlesLoadDataReportTask; + private boolean started = false; private final ConcurrentOpenHashMap>> @@ -148,7 +152,7 @@ public void start() throws PulsarServerException { new TopBundleLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), topBundlesLoadDataStore); var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis(); - this.pulsar.getLoadManagerExecutor() + this.brokerLoadDataReportTask = this.pulsar.getLoadManagerExecutor() .scheduleAtFixedRate(() -> { try { brokerLoadDataReporter.reportAsync(false); @@ -159,9 +163,11 @@ public void start() throws PulsarServerException { }, interval, interval, TimeUnit.MILLISECONDS); - this.pulsar.getLoadManagerExecutor() + + this.topBundlesLoadDataReportTask = this.pulsar.getLoadManagerExecutor() .scheduleAtFixedRate(() -> { try { + // TODO: consider excluding the bundles that are in the process of split. topBundleLoadDataReporter.reportAsync(false); } catch (Throwable e) { log.error("Failed to run the top bundles load manager executor job.", e); @@ -282,6 +288,14 @@ public void close() throws PulsarServerException { return; } try { + if (brokerLoadDataReportTask != null) { + brokerLoadDataReportTask.cancel(true); + } + + if (topBundlesLoadDataReportTask != null) { + topBundlesLoadDataReportTask.cancel(true); + } + this.brokerLoadDataStore.close(); this.topBundlesLoadDataStore.close(); } catch (IOException ex) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java index 827c89e6687670..cf50f942e11b4f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java @@ -92,7 +92,7 @@ public BrokerLoadData generateLoadData() { @Override public CompletableFuture reportAsync(boolean force) { BrokerLoadData newLoadData = this.generateLoadData(); - if (needBrokerDataUpdate() || force) { + if (force || needBrokerDataUpdate()) { log.info("publishing load report:{}", localData.toString(conf)); CompletableFuture future = this.brokerLoadDataStore.pushAsync(this.lookupServiceAddress, newLoadData); @@ -103,7 +103,6 @@ public CompletableFuture reportAsync(boolean force) { } else { log.error("Failed to report the broker load data.", ex); } - return; }); return future; } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java index 2eba42cad9e42b..d759dd016955af 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.loadbalance.extensions.models; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import java.util.ArrayList; import java.util.Collections; @@ -85,24 +86,32 @@ public void testPartitionSort() { Random rand = new Random(); List> actual = new ArrayList<>(); List> expected = new ArrayList<>(); - int max = 10; - for (int j = 0; j < 50; j++) { + + for (int j = 0; j < 100; j++) { Map map = new HashMap<>(); + int max = rand.nextInt(10) + 1; for (int i = 0; i < max; i++) { int val = rand.nextInt(max); map.put("" + i, val); } actual.clear(); expected.clear(); - for(var etr : map.entrySet()){ + for (var etr : map.entrySet()) { actual.add(etr); expected.add(etr); } int topk = rand.nextInt(max) + 1; TopKBundles.partitionSort(actual, topk); Collections.sort(expected, (a, b) -> b.getValue().compareTo(a.getValue())); + String errorMsg = null; for (int i = 0; i < topk; i++) { - assertEquals(actual.get(i).getValue(), expected.get(i).getValue()); + Integer l = (Integer) actual.get(i).getValue(); + Integer r = (Integer) expected.get(i).getValue(); + if (!l.equals(r)) { + errorMsg = String.format("Diff found at i=%d, %d != %d, actual:%s, expected:%s", + i, l, r, actual, expected); + } + assertNull(errorMsg); } } }