diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 1484209d43197f..106410d855e226 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2523,7 +2523,7 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se + "The bigger value will increase the overhead of reporting many bundles in load data. " + "(only used in load balancer extension logics)" ) - private int loadBalancerBundleLoadReportPercentage = 10; + private double loadBalancerBundleLoadReportPercentage = 10; /**** --- Replication. --- ****/ @FieldContext( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 0ead70b8be4648..76f4c192785270 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -149,11 +149,24 @@ public void start() throws PulsarServerException { var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis(); this.pulsar.getLoadManagerExecutor() - .scheduleAtFixedRate(() -> brokerLoadDataReporter.reportAsync(false), + .scheduleAtFixedRate(() -> { + try { + brokerLoadDataReporter.reportAsync(false); + // TODO: update broker load metrics using getLocalData + } catch (Throwable e) { + log.error("Failed to run the broker load manager executor job.", e); + } + }, interval, interval, TimeUnit.MILLISECONDS); this.pulsar.getLoadManagerExecutor() - .scheduleAtFixedRate(() -> topBundleLoadDataReporter.reportAsync(false), + .scheduleAtFixedRate(() -> { + try { + topBundleLoadDataReporter.reportAsync(false); + } catch (Throwable e) { + log.error("Failed to run the top bundles load manager executor job.", e); + } + }, interval, interval, TimeUnit.MILLISECONDS); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java index f4f95a5287f74a..c189005b9539c5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java @@ -31,7 +31,7 @@ import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; /** - * Defines the information of top bundles load data. + * Defines the information of top k highest-loaded bundles. */ @Getter @ToString @@ -45,7 +45,7 @@ public class TopKBundles { private final TopBundlesLoadData loadData = new TopBundlesLoadData(); /** - * Return TopBundlesLoadData with the given bundleStats. + * Update the topK bundles from the input bundleStats. * * @param bundleStats bundle stats. * @param topk top k bundle stats to select. @@ -58,7 +58,7 @@ public void update(Map bundleStats, int topk) { } arr.add(etr); } - List topKBundlesLoadData = loadData.getTopBundlesLoadData(); + var topKBundlesLoadData = loadData.getTopBundlesLoadData(); topKBundlesLoadData.clear(); if (arr.isEmpty()) { return; @@ -71,6 +71,7 @@ public void update(Map bundleStats, int topk) { topKBundlesLoadData.add( new TopBundlesLoadData.BundleLoadData(etr.getKey(), (NamespaceBundleStats) etr.getValue())); } + arr.clear(); } static void partitionSort(List> arr, int k) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java index c20045801b795e..827c89e6687670 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java @@ -20,6 +20,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.SystemUtils; import org.apache.pulsar.broker.PulsarService; @@ -48,6 +49,7 @@ public class BrokerLoadDataReporter implements LoadDataReporter private final String lookupServiceAddress; + @Getter private final BrokerLoadData localData; private final BrokerLoadData lastData; @@ -89,31 +91,25 @@ public BrokerLoadData generateLoadData() { @Override public CompletableFuture reportAsync(boolean force) { - try { - BrokerLoadData newLoadData = this.generateLoadData(); - if (needBrokerDataUpdate() || force) { - log.info("publishing load report:{}", localData.toString(conf)); - CompletableFuture future = - this.brokerLoadDataStore.pushAsync(this.lookupServiceAddress, newLoadData); - future.whenComplete((__, ex) -> { - if (ex == null) { - localData.setReportedAt(System.currentTimeMillis()); - lastData.update(localData); - } else { - log.error("Failed to report the broker load data.", ex); - } - return; - }); - return future; - } else { - log.info("skipping load report:{}", localData.toString(conf)); - } - return CompletableFuture.completedFuture(null); - } catch (Throwable e) { - log.error("Failed to report the broker load data.", e); - return CompletableFuture.failedFuture(e); + BrokerLoadData newLoadData = this.generateLoadData(); + if (needBrokerDataUpdate() || force) { + log.info("publishing load report:{}", localData.toString(conf)); + CompletableFuture future = + this.brokerLoadDataStore.pushAsync(this.lookupServiceAddress, newLoadData); + future.whenComplete((__, ex) -> { + if (ex == null) { + localData.setReportedAt(System.currentTimeMillis()); + lastData.update(localData); + } else { + log.error("Failed to report the broker load data.", ex); + } + return; + }); + return future; + } else { + log.info("skipping load report:{}", localData.toString(conf)); } - + return CompletableFuture.completedFuture(null); } private boolean needBrokerDataUpdate() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporter.java index b2ef00543a76dd..59e328fc2be80c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporter.java @@ -25,6 +25,9 @@ import org.apache.pulsar.broker.loadbalance.extensions.models.TopKBundles; import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore; +/** + * The top k highest-loaded bundles' load data reporter. + */ @Slf4j public class TopBundleLoadDataReporter implements LoadDataReporter { @@ -69,20 +72,15 @@ public TopBundlesLoadData generateLoadData() { @Override public CompletableFuture reportAsync(boolean force) { - try { - var topBundlesLoadData = generateLoadData(); - if (topBundlesLoadData != null || force) { - return this.bundleLoadDataStore.pushAsync(lookupServiceAddress, topKBundles.getLoadData()) - .exceptionally(e -> { - log.error("Failed to report top-bundles load data.", e); - return null; - }); - } else { - return CompletableFuture.completedFuture(null); - } - } catch (Throwable e) { - log.error("Failed to report top-bundles load data.", e); - return CompletableFuture.failedFuture(e); + var topBundlesLoadData = generateLoadData(); + if (topBundlesLoadData != null || force) { + return this.bundleLoadDataStore.pushAsync(lookupServiceAddress, topKBundles.getLoadData()) + .exceptionally(e -> { + log.error("Failed to report top-bundles load data.", e); + return null; + }); + } else { + return CompletableFuture.completedFuture(null); } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporterTest.java index e9fe82112b3975..3e4dc4cb1c07b0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporterTest.java @@ -40,7 +40,6 @@ import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; import org.mockito.MockedStatic; import org.mockito.Mockito; -import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -54,17 +53,6 @@ public class BrokerLoadDataReporterTest { BrokerStats brokerStats; SystemResourceUsage usage; - @BeforeClass - void classSetup() { - MockedStatic mockLoadManagerShared = Mockito.mockStatic(LoadManagerShared.class); - usage = new SystemResourceUsage(); - usage.setCpu(new ResourceUsage(1.0, 100.0)); - usage.setMemory(new ResourceUsage(800.0, 200.0)); - usage.setDirectMemory(new ResourceUsage(2.0, 100.0)); - usage.setBandwidthIn(new ResourceUsage(3.0, 100.0)); - usage.setBandwidthOut(new ResourceUsage(4.0, 100.0)); - mockLoadManagerShared.when(() -> LoadManagerShared.getSystemResourceUsage(any())).thenReturn(usage); - } @BeforeMethod void setup() { config = new ServiceConfiguration(); @@ -85,41 +73,54 @@ void setup() { doReturn(pulsarStats).when(brokerService).getPulsarStats(); doReturn(brokerStats).when(pulsarStats).getBrokerStats(); doReturn(CompletableFuture.completedFuture(null)).when(store).pushAsync(any(), any()); + + usage = new SystemResourceUsage(); + usage.setCpu(new ResourceUsage(1.0, 100.0)); + usage.setMemory(new ResourceUsage(800.0, 200.0)); + usage.setDirectMemory(new ResourceUsage(2.0, 100.0)); + usage.setBandwidthIn(new ResourceUsage(3.0, 100.0)); + usage.setBandwidthOut(new ResourceUsage(4.0, 100.0)); } public void testGenerate() throws IllegalAccessException { - doReturn(0l).when(pulsarStats).getUpdatedAt(); - var target = new BrokerLoadDataReporter(pulsar, "", store); - var expected = new BrokerLoadData(); - expected.update(usage, 1, 2, 3, 4, 5, config); - FieldUtils.writeDeclaredField(expected, "updatedAt", 0l, true); - var actual = target.generateLoadData(); - FieldUtils.writeDeclaredField(actual, "updatedAt", 0l, true); - assertEquals(actual, expected); + try (MockedStatic mockLoadManagerShared = Mockito.mockStatic(LoadManagerShared.class)) { + mockLoadManagerShared.when(() -> LoadManagerShared.getSystemResourceUsage(any())).thenReturn(usage); + doReturn(0l).when(pulsarStats).getUpdatedAt(); + var target = new BrokerLoadDataReporter(pulsar, "", store); + var expected = new BrokerLoadData(); + expected.update(usage, 1, 2, 3, 4, 5, config); + FieldUtils.writeDeclaredField(expected, "updatedAt", 0l, true); + var actual = target.generateLoadData(); + FieldUtils.writeDeclaredField(actual, "updatedAt", 0l, true); + assertEquals(actual, expected); + } } public void testReport() throws IllegalAccessException { - var target = new BrokerLoadDataReporter(pulsar, "broker-1", store); - var localData = (BrokerLoadData) FieldUtils.readDeclaredField(target, "localData", true); - localData.setReportedAt(System.currentTimeMillis()); - var lastData = (BrokerLoadData) FieldUtils.readDeclaredField(target, "lastData", true); - lastData.update(usage, 1, 2, 3, 4, 5, config); - target.reportAsync(false); - verify(store, times(0)).pushAsync(any(), any()); + try (MockedStatic mockLoadManagerShared = Mockito.mockStatic(LoadManagerShared.class)) { + mockLoadManagerShared.when(() -> LoadManagerShared.getSystemResourceUsage(any())).thenReturn(usage); + var target = new BrokerLoadDataReporter(pulsar, "broker-1", store); + var localData = (BrokerLoadData) FieldUtils.readDeclaredField(target, "localData", true); + localData.setReportedAt(System.currentTimeMillis()); + var lastData = (BrokerLoadData) FieldUtils.readDeclaredField(target, "lastData", true); + lastData.update(usage, 1, 2, 3, 4, 5, config); + target.reportAsync(false); + verify(store, times(0)).pushAsync(any(), any()); - target.reportAsync(true); - verify(store, times(1)).pushAsync(eq("broker-1"), any()); + target.reportAsync(true); + verify(store, times(1)).pushAsync(eq("broker-1"), any()); - target.reportAsync(false); - verify(store, times(1)).pushAsync(eq("broker-1"), any()); + target.reportAsync(false); + verify(store, times(1)).pushAsync(eq("broker-1"), any()); - localData.setReportedAt(0l); - target.reportAsync(false); - verify(store, times(2)).pushAsync(eq("broker-1"), any()); + localData.setReportedAt(0l); + target.reportAsync(false); + verify(store, times(2)).pushAsync(eq("broker-1"), any()); - lastData.update(usage, 10000, 2, 3, 4, 5, config); - target.reportAsync(false); - verify(store, times(3)).pushAsync(eq("broker-1"), any()); + lastData.update(usage, 10000, 2, 3, 4, 5, config); + target.reportAsync(false); + verify(store, times(3)).pushAsync(eq("broker-1"), any()); + } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporterTest.java index 2aa702b746c401..ce2d3d8c3ea931 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporterTest.java @@ -27,6 +27,7 @@ import static org.testng.Assert.assertNull; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import org.apache.commons.lang.reflect.FieldUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -58,6 +59,7 @@ void setup() { doReturn(brokerService).when(pulsar).getBrokerService(); doReturn(config).when(pulsar).getConfiguration(); doReturn(pulsarStats).when(brokerService).getPulsarStats(); + doReturn(CompletableFuture.completedFuture(null)).when(store).pushAsync(any(), any()); bundleStats = new HashMap<>(); NamespaceBundleStats stats1 = new NamespaceBundleStats();