diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 557b30245af386..1484209d43197f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2514,6 +2514,17 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se ) private long loadBalancerBrokerLoadDataTTLInSeconds = 1800; + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "Percentage of bundles to compute topK bundle load data from each broker. " + + "The load balancer distributes bundles across brokers, " + + "based on topK bundle load data and other broker load data." + + "The bigger value will increase the overhead of reporting many bundles in load data. " + + "(only used in load balancer extension logics)" + ) + private int loadBalancerBundleLoadReportPercentage = 10; + /**** --- Replication. --- ****/ @FieldContext( category = CATEGORY_REPLICATION, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 66c271ab22eace..0ead70b8be4648 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -26,6 +26,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarServerException; @@ -39,6 +40,8 @@ import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData; import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter; import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerVersionFilter; +import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter; +import org.apache.pulsar.broker.loadbalance.extensions.reporter.TopBundleLoadDataReporter; import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore; import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException; import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory; @@ -84,6 +87,13 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { @Getter private final List brokerFilterPipeline; + /** + * The load data reporter. + */ + private BrokerLoadDataReporter brokerLoadDataReporter; + + private TopBundleLoadDataReporter topBundleLoadDataReporter; + private boolean started = false; private final ConcurrentOpenHashMap>> @@ -129,7 +139,23 @@ public void start() throws PulsarServerException { .brokerRegistry(brokerRegistry) .brokerLoadDataStore(brokerLoadDataStore) .topBundleLoadDataStore(topBundlesLoadDataStore).build(); - // TODO: Start load data reporter. + + + this.brokerLoadDataReporter = + new BrokerLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), brokerLoadDataStore); + + this.topBundleLoadDataReporter = + new TopBundleLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), topBundlesLoadDataStore); + + var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis(); + this.pulsar.getLoadManagerExecutor() + .scheduleAtFixedRate(() -> brokerLoadDataReporter.reportAsync(false), + interval, + interval, TimeUnit.MILLISECONDS); + this.pulsar.getLoadManagerExecutor() + .scheduleAtFixedRate(() -> topBundleLoadDataReporter.reportAsync(false), + interval, + interval, TimeUnit.MILLISECONDS); // TODO: Start unload scheduler and bundle split scheduler this.started = true; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java index 39419946992c61..1f38d596bf2a18 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java @@ -20,6 +20,8 @@ import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.Setter; +import lombok.ToString; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; @@ -33,6 +35,7 @@ */ @Getter @EqualsAndHashCode +@ToString public class BrokerLoadData { private static final double DEFAULT_RESOURCE_USAGE = 1.0d; @@ -49,6 +52,7 @@ public class BrokerLoadData { private double msgThroughputOut; // bytes/sec private double msgRateIn; // messages/sec private double msgRateOut; // messages/sec + private int bundleCount; // Load data features computed from the above resources. private double maxResourceUsage; // max of resource usages @@ -68,6 +72,9 @@ public class BrokerLoadData { private double weightedMaxEMA; private long updatedAt; + @Setter + private long reportedAt; + public BrokerLoadData() { cpu = new ResourceUsage(); memory = new ResourceUsage(); @@ -91,6 +98,8 @@ public BrokerLoadData() { * broker-level message input rate in messages/s. * @param msgRateOut * broker-level message output rate in messages/s. + * @param bundleCount + * broker-level bundle counts. * @param conf * Service configuration to compute load data features. */ @@ -99,12 +108,14 @@ public void update(final SystemResourceUsage usage, double msgThroughputOut, double msgRateIn, double msgRateOut, + int bundleCount, ServiceConfiguration conf) { updateSystemResourceUsage(usage.cpu, usage.memory, usage.directMemory, usage.bandwidthIn, usage.bandwidthOut); this.msgThroughputIn = msgThroughputIn; this.msgThroughputOut = msgThroughputOut; this.msgRateIn = msgRateIn; this.msgRateOut = msgRateOut; + this.bundleCount = bundleCount; updateFeatures(conf); updatedAt = System.currentTimeMillis(); } @@ -121,9 +132,11 @@ public void update(final BrokerLoadData other) { msgThroughputOut = other.msgThroughputOut; msgRateIn = other.msgRateIn; msgRateOut = other.msgRateOut; + bundleCount = other.bundleCount; weightedMaxEMA = other.weightedMaxEMA; maxResourceUsage = other.maxResourceUsage; updatedAt = other.updatedAt; + reportedAt = other.reportedAt; } // Update resource usage given each individual usage. @@ -173,7 +186,9 @@ public String toString(ServiceConfiguration conf) { + "cpuWeight= %f, memoryWeight= %f, directMemoryWeight= %f, " + "bandwithInResourceWeight= %f, bandwithOutResourceWeight= %f, " + "msgThroughputIn= %.2f, msgThroughputOut= %.2f, msgRateIn= %.2f, msgRateOut= %.2f, " - + "maxResourceUsage= %.2f%%, weightedMaxEMA= %.2f%%, updatedAt= %d", + + "bundleCount= %d, " + + "maxResourceUsage= %.2f%%, weightedMaxEMA= %.2f%%, " + + "updatedAt= %d, reportedAt= %d", cpu.percentUsage(), memory.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(), bandwidthOut.percentUsage(), @@ -183,7 +198,9 @@ public String toString(ServiceConfiguration conf) { conf.getLoadBalancerBandwithInResourceWeight(), conf.getLoadBalancerBandwithOutResourceWeight(), msgThroughputIn, msgThroughputOut, msgRateIn, msgRateOut, - maxResourceUsage * 100, weightedMaxEMA * 100, updatedAt + bundleCount, + maxResourceUsage * 100, weightedMaxEMA * 100, + updatedAt, reportedAt ); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/TopBundlesLoadData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/TopBundlesLoadData.java index 9c34b40ff0416d..a9562a32a35422 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/TopBundlesLoadData.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/TopBundlesLoadData.java @@ -18,19 +18,25 @@ */ package org.apache.pulsar.broker.loadbalance.extensions.data; +import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.stream.Collectors; +import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; /** * Defines the information of top bundles load data. */ @Getter +@ToString +@EqualsAndHashCode +@NoArgsConstructor public class TopBundlesLoadData { - private final List topBundlesLoadData; + private final List topBundlesLoadData = new ArrayList<>(); public record BundleLoadData(String bundleName, NamespaceBundleStats stats) { public BundleLoadData { @@ -38,21 +44,4 @@ public record BundleLoadData(String bundleName, NamespaceBundleStats stats) { } } - private TopBundlesLoadData(List bundleStats, int topK) { - topBundlesLoadData = bundleStats - .stream() - .sorted((o1, o2) -> o2.stats().compareTo(o1.stats())) - .limit(topK) - .collect(Collectors.toList()); - } - - /** - * Give full bundle stats, and return the top K bundle stats. - * - * @param bundleStats full bundle stats. - * @param topK Top K bundles. - */ - public static TopBundlesLoadData of(List bundleStats, int topK) { - return new TopBundlesLoadData(bundleStats, topK); - } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java new file mode 100644 index 00000000000000..f4f95a5287f74a --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.models; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; + +/** + * Defines the information of top bundles load data. + */ +@Getter +@ToString +@EqualsAndHashCode +@NoArgsConstructor +public class TopKBundles { + + // temp array for sorting + private final List> arr = new ArrayList<>(); + + private final TopBundlesLoadData loadData = new TopBundlesLoadData(); + + /** + * Return TopBundlesLoadData with the given bundleStats. + * + * @param bundleStats bundle stats. + * @param topk top k bundle stats to select. + */ + public void update(Map bundleStats, int topk) { + arr.clear(); + for (var etr : bundleStats.entrySet()) { + if (etr.getKey().startsWith(NamespaceName.SYSTEM_NAMESPACE.toString())) { + continue; + } + arr.add(etr); + } + List topKBundlesLoadData = loadData.getTopBundlesLoadData(); + topKBundlesLoadData.clear(); + if (arr.isEmpty()) { + return; + } + topk = Math.min(topk, arr.size()); + partitionSort(arr, topk); + + for (int i = 0; i < topk; i++) { + var etr = arr.get(i); + topKBundlesLoadData.add( + new TopBundlesLoadData.BundleLoadData(etr.getKey(), (NamespaceBundleStats) etr.getValue())); + } + } + + static void partitionSort(List> arr, int k) { + int start = 0; + int end = arr.size() - 1; + int target = k - 1; + while (start < end) { + int lo = start; + int hi = end; + int mid = lo; + var pivot = arr.get(hi).getValue(); + while (mid <= hi) { + int cmp = pivot.compareTo(arr.get(mid).getValue()); + if (cmp < 0) { + var tmp = arr.get(lo); + arr.set(lo++, arr.get(mid)); + arr.set(mid++, tmp); + } else if (cmp > 0) { + var tmp = arr.get(mid); + arr.set(mid, arr.get(hi)); + arr.set(hi--, tmp); + } else { + mid++; + } + } + if (lo <= target && target < mid) { + end = lo; + break; + } + if (target < lo) { + end = lo - 1; + } else { + start = mid; + } + } + Collections.sort(arr.subList(0, end), (a, b) -> b.getValue().compareTo(a.getValue())); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java new file mode 100644 index 00000000000000..c20045801b795e --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.reporter; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.SystemUtils; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.BrokerHostUsage; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData; +import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore; +import org.apache.pulsar.broker.loadbalance.impl.GenericBrokerHostUsageImpl; +import org.apache.pulsar.broker.loadbalance.impl.LinuxBrokerHostUsageImpl; +import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; +import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; + +/** + * The broker load data reporter. + */ +@Slf4j +public class BrokerLoadDataReporter implements LoadDataReporter { + + private final PulsarService pulsar; + + private final ServiceConfiguration conf; + + private final LoadDataStore brokerLoadDataStore; + + private final BrokerHostUsage brokerHostUsage; + + private final String lookupServiceAddress; + + private final BrokerLoadData localData; + + private final BrokerLoadData lastData; + + public BrokerLoadDataReporter(PulsarService pulsar, + String lookupServiceAddress, + LoadDataStore brokerLoadDataStore) { + this.brokerLoadDataStore = brokerLoadDataStore; + this.lookupServiceAddress = lookupServiceAddress; + this.pulsar = pulsar; + this.conf = this.pulsar.getConfiguration(); + if (SystemUtils.IS_OS_LINUX) { + brokerHostUsage = new LinuxBrokerHostUsageImpl(pulsar); + } else { + brokerHostUsage = new GenericBrokerHostUsageImpl(pulsar); + } + this.localData = new BrokerLoadData(); + this.lastData = new BrokerLoadData(); + + } + + @Override + public BrokerLoadData generateLoadData() { + final SystemResourceUsage systemResourceUsage = LoadManagerShared.getSystemResourceUsage(brokerHostUsage); + final var pulsarStats = pulsar.getBrokerService().getPulsarStats(); + synchronized (pulsarStats) { + var brokerStats = pulsarStats.getBrokerStats(); + localData.update(systemResourceUsage, + brokerStats.msgThroughputIn, + brokerStats.msgThroughputOut, + brokerStats.msgRateIn, + brokerStats.msgRateOut, + brokerStats.bundleCount, + pulsar.getConfiguration()); + + } + return this.localData; + } + + @Override + public CompletableFuture reportAsync(boolean force) { + try { + BrokerLoadData newLoadData = this.generateLoadData(); + if (needBrokerDataUpdate() || force) { + log.info("publishing load report:{}", localData.toString(conf)); + CompletableFuture future = + this.brokerLoadDataStore.pushAsync(this.lookupServiceAddress, newLoadData); + future.whenComplete((__, ex) -> { + if (ex == null) { + localData.setReportedAt(System.currentTimeMillis()); + lastData.update(localData); + } else { + log.error("Failed to report the broker load data.", ex); + } + return; + }); + return future; + } else { + log.info("skipping load report:{}", localData.toString(conf)); + } + return CompletableFuture.completedFuture(null); + } catch (Throwable e) { + log.error("Failed to report the broker load data.", e); + return CompletableFuture.failedFuture(e); + } + + } + + private boolean needBrokerDataUpdate() { + int loadBalancerReportUpdateMaxIntervalMinutes = conf.getLoadBalancerReportUpdateMaxIntervalMinutes(); + int loadBalancerReportUpdateThresholdPercentage = conf.getLoadBalancerReportUpdateThresholdPercentage(); + final long updateMaxIntervalMillis = TimeUnit.MINUTES + .toMillis(loadBalancerReportUpdateMaxIntervalMinutes); + long timeSinceLastReportWrittenToStore = System.currentTimeMillis() - localData.getReportedAt(); + if (timeSinceLastReportWrittenToStore > updateMaxIntervalMillis) { + log.info("Writing local data to metadata store because time since last" + + " update exceeded threshold of {} minutes", + loadBalancerReportUpdateMaxIntervalMinutes); + // Always update after surpassing the maximum interval. + return true; + } + final double maxChange = Math + .max(100.0 * (Math.abs(lastData.getMaxResourceUsage() - localData.getMaxResourceUsage())), + Math.max(percentChange(lastData.getMsgRateIn() + lastData.getMsgRateOut(), + localData.getMsgRateIn() + localData.getMsgRateOut()), + Math.max( + percentChange(lastData.getMsgThroughputIn() + lastData.getMsgThroughputOut(), + localData.getMsgThroughputIn() + localData.getMsgThroughputOut()), + percentChange(lastData.getBundleCount(), localData.getBundleCount())))); + if (maxChange > loadBalancerReportUpdateThresholdPercentage) { + log.info("Writing local data to metadata store because maximum change {}% exceeded threshold {}%; " + + "time since last report written is {} seconds", maxChange, + loadBalancerReportUpdateThresholdPercentage, + timeSinceLastReportWrittenToStore / 1000.0); + return true; + } + return false; + } + + protected double percentChange(final double oldValue, final double newValue) { + if (oldValue == 0) { + if (newValue == 0) { + // Avoid NaN + return 0; + } + return Double.POSITIVE_INFINITY; + } + return 100 * Math.abs((oldValue - newValue) / oldValue); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporter.java new file mode 100644 index 00000000000000..b2ef00543a76dd --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporter.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.reporter; + +import java.util.concurrent.CompletableFuture; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData; +import org.apache.pulsar.broker.loadbalance.extensions.models.TopKBundles; +import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore; + +@Slf4j +public class TopBundleLoadDataReporter implements LoadDataReporter { + + private final PulsarService pulsar; + + private final String lookupServiceAddress; + + private final LoadDataStore bundleLoadDataStore; + + private final TopKBundles topKBundles; + + private long lastBundleStatsUpdatedAt; + + public TopBundleLoadDataReporter(PulsarService pulsar, + String lookupServiceAddress, + LoadDataStore bundleLoadDataStore) { + this.pulsar = pulsar; + this.lookupServiceAddress = lookupServiceAddress; + this.bundleLoadDataStore = bundleLoadDataStore; + this.lastBundleStatsUpdatedAt = 0; + this.topKBundles = new TopKBundles(); + } + + @Override + public TopBundlesLoadData generateLoadData() { + + var pulsarStats = pulsar.getBrokerService().getPulsarStats(); + TopBundlesLoadData result = null; + synchronized (pulsarStats) { + var pulsarStatsUpdatedAt = pulsarStats.getUpdatedAt(); + if (pulsarStatsUpdatedAt > lastBundleStatsUpdatedAt) { + var bundleStats = pulsar.getBrokerService().getBundleStats(); + double percentage = pulsar.getConfiguration().getLoadBalancerBundleLoadReportPercentage(); + int topk = Math.max(1, (int) (bundleStats.size() * percentage / 100.0)); + topKBundles.update(bundleStats, topk); + lastBundleStatsUpdatedAt = pulsarStatsUpdatedAt; + result = topKBundles.getLoadData(); + } + } + return result; + } + + @Override + public CompletableFuture reportAsync(boolean force) { + try { + var topBundlesLoadData = generateLoadData(); + if (topBundlesLoadData != null || force) { + return this.bundleLoadDataStore.pushAsync(lookupServiceAddress, topKBundles.getLoadData()) + .exceptionally(e -> { + log.error("Failed to report top-bundles load data.", e); + return null; + }); + } else { + return CompletableFuture.completedFuture(null); + } + } catch (Throwable e) { + log.error("Failed to report top-bundles load data.", e); + return CompletableFuture.failedFuture(e); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java index a1047edc06fc95..89e5a89f2a12c1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java @@ -335,7 +335,7 @@ public UnloadDecision findBundlesForUnloading(LoadManagerContext context, var topBundlesLoadData = bundlesLoadData.get().getTopBundlesLoadData(); if (topBundlesLoadData.size() > 1) { - MutableInt remainingTopBundles = new MutableInt(); + MutableInt remainingTopBundles = new MutableInt(topBundlesLoadData.size()); topBundlesLoadData.stream() .filter(e -> !recentlyUnloadedBundles.containsKey(e.bundleName()) && isTransferable( @@ -344,11 +344,8 @@ public UnloadDecision findBundlesForUnloading(LoadManagerContext context, String bundle = e.bundleName(); var bundleData = e.stats(); double throughput = bundleData.msgThroughputIn + bundleData.msgThroughputOut; - remainingTopBundles.increment(); return Pair.of(bundle, throughput); - }).sorted((e1, e2) -> - Double.compare(e2.getRight(), e1.getRight()) - ).forEach(e -> { + }).forEach(e -> { if (remainingTopBundles.getValue() > 1 && (trafficMarkedToOffload.doubleValue() < offloadThroughput || atLeastOneBundleSelected.isFalse())) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java index aa734ce73dbc02..045bb336d62e27 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java @@ -28,10 +28,12 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; +import lombok.Getter; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.stats.BrokerOperabilityMetrics; +import org.apache.pulsar.broker.stats.BrokerStats; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.NamespaceStats; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -47,7 +49,11 @@ public class PulsarStats implements Closeable { private volatile ByteBuf topicStatsBuf; private volatile ByteBuf tempTopicStatsBuf; + + @Getter + private BrokerStats brokerStats; private NamespaceStats nsStats; + private final ClusterReplicationMetrics clusterReplicationMetrics; private Map bundleStats; private List tempMetricsCollection; @@ -58,11 +64,15 @@ public class PulsarStats implements Closeable { private final ReentrantReadWriteLock bufferLock = new ReentrantReadWriteLock(); + @Getter + private long updatedAt; + public PulsarStats(PulsarService pulsar) { this.topicStatsBuf = Unpooled.buffer(16 * 1024); this.tempTopicStatsBuf = Unpooled.buffer(16 * 1024); this.nsStats = new NamespaceStats(pulsar.getConfig().getStatsUpdateFrequencyInSecs()); + this.brokerStats = new BrokerStats(pulsar.getConfig().getStatsUpdateFrequencyInSecs()); this.clusterReplicationMetrics = new ClusterReplicationMetrics(pulsar.getConfiguration().getClusterName(), pulsar.getConfiguration().isReplicationMetricsEnabled()); this.bundleStats = new ConcurrentHashMap<>(); @@ -73,6 +83,8 @@ public PulsarStats(PulsarService pulsar) { this.tempNonPersistentTopics = new ArrayList<>(); this.exposePublisherStats = pulsar.getConfiguration().isExposePublisherStats(); + this.updatedAt = 0; + } @Override @@ -100,6 +112,7 @@ public synchronized void updateStats( tempMetricsCollection.clear(); bundleStats.clear(); brokerOperabilityMetrics.reset(); + brokerStats.reset(); // Json begin topicStatsStream.startObject(); @@ -169,6 +182,18 @@ public synchronized void updateStats( topicStatsStream.endObject(); }); + brokerStats.bundleCount += bundles.size(); + brokerStats.producerCount += nsStats.producerCount; + brokerStats.replicatorCount += nsStats.replicatorCount; + brokerStats.subsCount += nsStats.subsCount; + brokerStats.consumerCount += nsStats.consumerCount; + brokerStats.msgBacklog += nsStats.msgBacklog; + brokerStats.msgRateIn += nsStats.msgRateIn; + brokerStats.msgRateOut += nsStats.msgRateOut; + brokerStats.msgThroughputIn += nsStats.msgThroughputIn; + brokerStats.msgThroughputOut += nsStats.msgThroughputOut; + NamespaceStats.add(nsStats.addLatencyBucket, brokerStats.addLatencyBucket); + topicStatsStream.endObject(); // Update metricsCollection with namespace stats tempMetricsCollection.add(nsStats.add(namespaceName)); @@ -204,6 +229,7 @@ public synchronized void updateStats( } finally { bufferLock.writeLock().unlock(); } + updatedAt = System.currentTimeMillis(); } public NamespaceBundleStats invalidBundleStats(String bundleName) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerStats.java new file mode 100644 index 00000000000000..d0be71167002d8 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerStats.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +public class BrokerStats extends NamespaceStats { + + public int bundleCount; + public BrokerStats(int ratePeriodInSeconds) { + super(ratePeriodInSeconds); + } + + @Override + public void reset() { + super.reset(); + bundleCount = 0; + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadDataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadDataTest.java index de5975ba49be00..c85fa4ce9d2cd0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadDataTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadDataTest.java @@ -59,7 +59,7 @@ public void testUpdateBySystemResourceUsage() { usage1.setDirectMemory(directMemory); usage1.setBandwidthIn(bandwidthIn); usage1.setBandwidthOut(bandwidthOut); - data.update(usage1, 1,2,3,4, conf); + data.update(usage1, 1, 2, 3, 4, 5, conf); assertEquals(data.getCpu(), cpu); assertEquals(data.getMemory(), memory); @@ -70,6 +70,7 @@ public void testUpdateBySystemResourceUsage() { assertEquals(data.getMsgThroughputOut(), 2.0); assertEquals(data.getMsgRateIn(), 3.0); assertEquals(data.getMsgRateOut(), 4.0); + assertEquals(data.getBundleCount(), 5); assertEquals(data.getMaxResourceUsage(), 0.04); // skips memory usage assertEquals(data.getWeightedMaxEMA(), 2); assertThat(data.getUpdatedAt(), greaterThanOrEqualTo(now)); @@ -86,7 +87,7 @@ public void testUpdateBySystemResourceUsage() { usage2.setDirectMemory(directMemory); usage2.setBandwidthIn(bandwidthIn); usage2.setBandwidthOut(bandwidthOut); - data.update(usage2, 5,6,7,8, conf); + data.update(usage2, 5, 6, 7, 8, 9, conf); assertEquals(data.getCpu(), cpu); assertEquals(data.getMemory(), memory); @@ -97,16 +98,19 @@ public void testUpdateBySystemResourceUsage() { assertEquals(data.getMsgThroughputOut(), 6.0); assertEquals(data.getMsgRateIn(), 7.0); assertEquals(data.getMsgRateOut(), 8.0); + assertEquals(data.getBundleCount(), 9); assertEquals(data.getMaxResourceUsage(), 3.0); assertEquals(data.getWeightedMaxEMA(), 1.875); assertThat(data.getUpdatedAt(), greaterThanOrEqualTo(now)); + assertEquals(data.getReportedAt(), 0l); assertEquals(data.toString(conf), "cpu= 300.00%, memory= 100.00%, directMemory= 2.00%, " + "bandwithIn= 3.00%, bandwithOut= 4.00%, " + "cpuWeight= 0.500000, memoryWeight= 0.500000, directMemoryWeight= 0.500000, " + "bandwithInResourceWeight= 0.500000, bandwithOutResourceWeight= 0.500000, " + "msgThroughputIn= 5.00, msgThroughputOut= 6.00, " - + "msgRateIn= 7.00, msgRateOut= 8.00," - + " maxResourceUsage= 300.00%, weightedMaxEMA= 187.50%, updatedAt= " + data.getUpdatedAt()); + + "msgRateIn= 7.00, msgRateOut= 8.00, bundleCount= 9, " + + "maxResourceUsage= 300.00%, weightedMaxEMA= 187.50%, " + + "updatedAt= " + data.getUpdatedAt() + ", reportedAt= " + data.getReportedAt()); } @Test @@ -133,7 +137,7 @@ public void testUpdateByBrokerLoadData() { usage1.setDirectMemory(directMemory); usage1.setBandwidthIn(bandwidthIn); usage1.setBandwidthOut(bandwidthOut); - other.update(usage1, 1,2,3,4, conf); + other.update(usage1, 1, 2, 3, 4, 5, conf); data.update(other); assertEquals(data, other); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/TopBundlesLoadDataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/TopBundlesLoadDataTest.java deleted file mode 100644 index 06c232d3219902..00000000000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/TopBundlesLoadDataTest.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.loadbalance.extensions.data; - -import static org.testng.Assert.assertEquals; - -import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; -import org.testng.annotations.Test; -import java.util.ArrayList; -import java.util.List; - -@Test(groups = "broker") -public class TopBundlesLoadDataTest { - - @Test - public void testTopBundlesLoadData() { - List bundleStats = new ArrayList<>(); - NamespaceBundleStats stats1 = new NamespaceBundleStats(); - stats1.msgRateIn = 100; - bundleStats.add(new TopBundlesLoadData.BundleLoadData("bundle-1", stats1)); - - NamespaceBundleStats stats2 = new NamespaceBundleStats(); - stats2.msgRateIn = 10000; - bundleStats.add(new TopBundlesLoadData.BundleLoadData("bundle-2", stats2)); - - NamespaceBundleStats stats3 = new NamespaceBundleStats(); - stats3.msgRateIn = 100000; - bundleStats.add(new TopBundlesLoadData.BundleLoadData("bundle-3", stats3)); - - NamespaceBundleStats stats4 = new NamespaceBundleStats(); - stats4.msgRateIn = 10; - bundleStats.add(new TopBundlesLoadData.BundleLoadData("bundle-4", stats4)); - - TopBundlesLoadData topBundlesLoadData = TopBundlesLoadData.of(bundleStats, 3); - var top0 = topBundlesLoadData.getTopBundlesLoadData().get(0); - var top1 = topBundlesLoadData.getTopBundlesLoadData().get(1); - var top2 = topBundlesLoadData.getTopBundlesLoadData().get(2); - - assertEquals(top0.bundleName(), "bundle-3"); - assertEquals(top1.bundleName(), "bundle-2"); - assertEquals(top2.bundleName(), "bundle-1"); - } -} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java new file mode 100644 index 00000000000000..2eba42cad9e42b --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.models; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class TopKBundlesTest { + + @Test + public void testTopBundlesLoadData() { + Map bundleStats = new HashMap<>(); + var topKBundles = new TopKBundles(); + NamespaceBundleStats stats1 = new NamespaceBundleStats(); + stats1.msgRateIn = 500; + bundleStats.put("bundle-1", stats1); + + NamespaceBundleStats stats2 = new NamespaceBundleStats(); + stats2.msgRateIn = 10000; + bundleStats.put("bundle-2", stats2); + + NamespaceBundleStats stats3 = new NamespaceBundleStats(); + stats3.msgRateIn = 100000; + bundleStats.put("bundle-3", stats3); + + NamespaceBundleStats stats4 = new NamespaceBundleStats(); + stats4.msgRateIn = 0; + bundleStats.put("bundle-4", stats4); + + topKBundles.update(bundleStats, 3); + var top0 = topKBundles.getLoadData().getTopBundlesLoadData().get(0); + var top1 = topKBundles.getLoadData().getTopBundlesLoadData().get(1); + var top2 = topKBundles.getLoadData().getTopBundlesLoadData().get(2); + + assertEquals(top0.bundleName(), "bundle-3"); + assertEquals(top1.bundleName(), "bundle-2"); + assertEquals(top2.bundleName(), "bundle-1"); + } + + @Test + public void testSystemNamespace() { + Map bundleStats = new HashMap<>(); + var topKBundles = new TopKBundles(); + NamespaceBundleStats stats1 = new NamespaceBundleStats(); + stats1.msgRateIn = 500; + bundleStats.put("pulsar/system/bundle-1", stats1); + + NamespaceBundleStats stats2 = new NamespaceBundleStats(); + stats2.msgRateIn = 10000; + bundleStats.put("pulsar/system/bundle-2", stats2); + + topKBundles.update(bundleStats, 2); + assertTrue(topKBundles.getLoadData().getTopBundlesLoadData().isEmpty()); + } + + + @Test + public void testPartitionSort() { + + Random rand = new Random(); + List> actual = new ArrayList<>(); + List> expected = new ArrayList<>(); + int max = 10; + for (int j = 0; j < 50; j++) { + Map map = new HashMap<>(); + for (int i = 0; i < max; i++) { + int val = rand.nextInt(max); + map.put("" + i, val); + } + actual.clear(); + expected.clear(); + for(var etr : map.entrySet()){ + actual.add(etr); + expected.add(etr); + } + int topk = rand.nextInt(max) + 1; + TopKBundles.partitionSort(actual, topk); + Collections.sort(expected, (a, b) -> b.getValue().compareTo(a.getValue())); + for (int i = 0; i < topk; i++) { + assertEquals(actual.get(i).getValue(), expected.get(i).getValue()); + } + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporterTest.java new file mode 100644 index 00000000000000..e9fe82112b3975 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporterTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.reporter; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import org.apache.commons.lang.reflect.FieldUtils; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData; +import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore; +import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.PulsarStats; +import org.apache.pulsar.broker.stats.BrokerStats; +import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; +import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class BrokerLoadDataReporterTest { + PulsarService pulsar; + LoadDataStore store; + BrokerService brokerService; + PulsarStats pulsarStats; + ServiceConfiguration config; + BrokerStats brokerStats; + SystemResourceUsage usage; + + @BeforeClass + void classSetup() { + MockedStatic mockLoadManagerShared = Mockito.mockStatic(LoadManagerShared.class); + usage = new SystemResourceUsage(); + usage.setCpu(new ResourceUsage(1.0, 100.0)); + usage.setMemory(new ResourceUsage(800.0, 200.0)); + usage.setDirectMemory(new ResourceUsage(2.0, 100.0)); + usage.setBandwidthIn(new ResourceUsage(3.0, 100.0)); + usage.setBandwidthOut(new ResourceUsage(4.0, 100.0)); + mockLoadManagerShared.when(() -> LoadManagerShared.getSystemResourceUsage(any())).thenReturn(usage); + } + @BeforeMethod + void setup() { + config = new ServiceConfiguration(); + pulsar = mock(PulsarService.class); + store = mock(LoadDataStore.class); + brokerService = mock(BrokerService.class); + pulsarStats = mock(PulsarStats.class); + doReturn(brokerService).when(pulsar).getBrokerService(); + doReturn(config).when(pulsar).getConfiguration(); + doReturn(Executors.newSingleThreadScheduledExecutor()).when(pulsar).getLoadManagerExecutor(); + doReturn(pulsarStats).when(brokerService).getPulsarStats(); + brokerStats = new BrokerStats(0); + brokerStats.bundleCount = 5; + brokerStats.msgRateIn = 3; + brokerStats.msgRateOut = 4; + brokerStats.msgThroughputIn = 1; + brokerStats.msgThroughputOut = 2; + doReturn(pulsarStats).when(brokerService).getPulsarStats(); + doReturn(brokerStats).when(pulsarStats).getBrokerStats(); + doReturn(CompletableFuture.completedFuture(null)).when(store).pushAsync(any(), any()); + } + + public void testGenerate() throws IllegalAccessException { + doReturn(0l).when(pulsarStats).getUpdatedAt(); + var target = new BrokerLoadDataReporter(pulsar, "", store); + var expected = new BrokerLoadData(); + expected.update(usage, 1, 2, 3, 4, 5, config); + FieldUtils.writeDeclaredField(expected, "updatedAt", 0l, true); + var actual = target.generateLoadData(); + FieldUtils.writeDeclaredField(actual, "updatedAt", 0l, true); + assertEquals(actual, expected); + } + + public void testReport() throws IllegalAccessException { + var target = new BrokerLoadDataReporter(pulsar, "broker-1", store); + var localData = (BrokerLoadData) FieldUtils.readDeclaredField(target, "localData", true); + localData.setReportedAt(System.currentTimeMillis()); + var lastData = (BrokerLoadData) FieldUtils.readDeclaredField(target, "lastData", true); + lastData.update(usage, 1, 2, 3, 4, 5, config); + target.reportAsync(false); + verify(store, times(0)).pushAsync(any(), any()); + + target.reportAsync(true); + verify(store, times(1)).pushAsync(eq("broker-1"), any()); + + target.reportAsync(false); + verify(store, times(1)).pushAsync(eq("broker-1"), any()); + + localData.setReportedAt(0l); + target.reportAsync(false); + verify(store, times(2)).pushAsync(eq("broker-1"), any()); + + lastData.update(usage, 10000, 2, 3, 4, 5, config); + target.reportAsync(false); + verify(store, times(3)).pushAsync(eq("broker-1"), any()); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporterTest.java new file mode 100644 index 00000000000000..2aa702b746c401 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporterTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.reporter; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.lang.reflect.FieldUtils; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData; +import org.apache.pulsar.broker.loadbalance.extensions.models.TopKBundles; +import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.PulsarStats; +import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class TopBundleLoadDataReporterTest { + PulsarService pulsar; + LoadDataStore store; + BrokerService brokerService; + PulsarStats pulsarStats; + Map bundleStats; + ServiceConfiguration config; + + @BeforeMethod + void setup() { + config = new ServiceConfiguration(); + pulsar = mock(PulsarService.class); + store = mock(LoadDataStore.class); + brokerService = mock(BrokerService.class); + pulsarStats = mock(PulsarStats.class); + doReturn(brokerService).when(pulsar).getBrokerService(); + doReturn(config).when(pulsar).getConfiguration(); + doReturn(pulsarStats).when(brokerService).getPulsarStats(); + + bundleStats = new HashMap<>(); + NamespaceBundleStats stats1 = new NamespaceBundleStats(); + stats1.msgRateIn = 500; + bundleStats.put("bundle-1", stats1); + NamespaceBundleStats stats2 = new NamespaceBundleStats(); + stats2.msgRateIn = 10000; + bundleStats.put("bundle-2", stats2); + doReturn(bundleStats).when(brokerService).getBundleStats(); + } + + public void testZeroUpdatedAt() { + doReturn(0l).when(pulsarStats).getUpdatedAt(); + var target = new TopBundleLoadDataReporter(pulsar, "", store); + assertNull(target.generateLoadData()); + } + + public void testGenerateLoadData() throws IllegalAccessException { + doReturn(1l).when(pulsarStats).getUpdatedAt(); + config.setLoadBalancerBundleLoadReportPercentage(100); + var target = new TopBundleLoadDataReporter(pulsar, "", store); + var expected = new TopKBundles(); + expected.update(bundleStats, 2); + assertEquals(target.generateLoadData(), expected.getLoadData()); + + config.setLoadBalancerBundleLoadReportPercentage(50); + FieldUtils.writeDeclaredField(target, "lastBundleStatsUpdatedAt", 0l, true); + expected = new TopKBundles(); + expected.update(bundleStats, 1); + assertEquals(target.generateLoadData(), expected.getLoadData()); + + config.setLoadBalancerBundleLoadReportPercentage(1); + FieldUtils.writeDeclaredField(target, "lastBundleStatsUpdatedAt", 0l, true); + expected = new TopKBundles(); + expected.update(bundleStats, 1); + assertEquals(target.generateLoadData(), expected.getLoadData()); + + doReturn(new HashMap()).when(brokerService).getBundleStats(); + FieldUtils.writeDeclaredField(target, "lastBundleStatsUpdatedAt", 0l, true); + expected = new TopKBundles(); + assertEquals(target.generateLoadData(), expected.getLoadData()); + } + + + public void testReportForce() { + var target = new TopBundleLoadDataReporter(pulsar, "broker-1", store); + target.reportAsync(false); + verify(store, times(0)).pushAsync(any(), any()); + target.reportAsync(true); + verify(store, times(1)).pushAsync("broker-1", new TopBundlesLoadData()); + + } + + public void testReport(){ + var target = new TopBundleLoadDataReporter(pulsar, "broker-1", store); + doReturn(1l).when(pulsarStats).getUpdatedAt(); + var expected = new TopKBundles(); + expected.update(bundleStats, 1); + target.reportAsync(false); + verify(store, times(1)).pushAsync("broker-1", expected.getLoadData()); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java index a16cf2d8a67f14..bdf5f846267e62 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java @@ -37,7 +37,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; @@ -52,6 +51,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData; import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData; +import org.apache.pulsar.broker.loadbalance.extensions.models.TopKBundles; import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision; import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore; @@ -82,11 +82,11 @@ public LoadManagerContext setupContext(){ brokerLoadDataStore.pushAsync("broker5", getCpuLoad(ctx, 90)); var topBundlesLoadDataStore = ctx.topBundleLoadDataStore(); - topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("bundleA", 1, 1)); - topBundlesLoadDataStore.pushAsync("broker2", getTopBundlesLoad("bundleB", 3, 1)); - topBundlesLoadDataStore.pushAsync("broker3", getTopBundlesLoad("bundleC", 4, 2)); - topBundlesLoadDataStore.pushAsync("broker4", getTopBundlesLoad("bundleD", 20, 60)); - topBundlesLoadDataStore.pushAsync("broker5", getTopBundlesLoad("bundleE", 70, 20)); + topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("bundleA", 2000000, 1000000)); + topBundlesLoadDataStore.pushAsync("broker2", getTopBundlesLoad("bundleB", 3000000, 1000000)); + topBundlesLoadDataStore.pushAsync("broker3", getTopBundlesLoad("bundleC", 4000000, 2000000)); + topBundlesLoadDataStore.pushAsync("broker4", getTopBundlesLoad("bundleD", 6000000, 2000000)); + topBundlesLoadDataStore.pushAsync("broker5", getTopBundlesLoad("bundleE", 7000000, 2000000)); return ctx; } @@ -121,7 +121,7 @@ public BrokerLoadData getCpuLoad(LoadManagerContext ctx, int load) { usage1.setDirectMemory(directMemory); usage1.setBandwidthIn(bandwidthIn); usage1.setBandwidthOut(bandwidthOut); - loadData.update(usage1, 1,2,3,4, + loadData.update(usage1, 1,2,3,4,5, ctx.brokerConfiguration()); return loadData; } @@ -131,18 +131,18 @@ public TopBundlesLoadData getTopBundlesLoad(String bundlePrefix, int load1, int namespaceBundleStats1.msgThroughputOut = load1; var namespaceBundleStats2 = new NamespaceBundleStats(); namespaceBundleStats2.msgThroughputOut = load2; - var topLoadData = TopBundlesLoadData.of(List.of( - new TopBundlesLoadData.BundleLoadData(bundlePrefix + "-1", namespaceBundleStats1), - new TopBundlesLoadData.BundleLoadData(bundlePrefix + "-2", namespaceBundleStats2)), 2); - return topLoadData; + var topKBundles = new TopKBundles(); + topKBundles.update(Map.of(bundlePrefix + "-1", namespaceBundleStats1, + bundlePrefix + "-2", namespaceBundleStats2), 2); + return topKBundles.getLoadData(); } public TopBundlesLoadData getTopBundlesLoad(String bundlePrefix, int load1) { var namespaceBundleStats1 = new NamespaceBundleStats(); namespaceBundleStats1.msgThroughputOut = load1; - var topLoadData = TopBundlesLoadData.of(List.of( - new TopBundlesLoadData.BundleLoadData(bundlePrefix + "-1", namespaceBundleStats1)), 2); - return topLoadData; + var topKBundles = new TopKBundles(); + topKBundles.update(Map.of(bundlePrefix + "-1", namespaceBundleStats1), 2); + return topKBundles.getLoadData(); } public LoadManagerContext getContext(){ @@ -311,7 +311,7 @@ public void testRecentlyUnloadedBrokers() { unloads.put("broker5", new Unload("broker5", "bundleE-1", Optional.of("broker1"))); unloads.put("broker4", - new Unload("broker4", "bundleD-2", Optional.of("broker2"))); + new Unload("broker4", "bundleD-1", Optional.of("broker2"))); expected.setLabel(Success); expected.setReason(Overloaded); @@ -494,7 +494,7 @@ public void testMinBrokerWithZeroTraffic() throws IllegalAccessException { unloads.put("broker5", new Unload("broker5", "bundleE-1", Optional.of("broker1"))); unloads.put("broker4", - new Unload("broker4", "bundleD-2", Optional.of("broker2"))); + new Unload("broker4", "bundleD-1", Optional.of("broker2"))); expected.setLabel(Success); expected.setReason(Underloaded); expected.setLoadAvg(0.26400000000000007); @@ -515,7 +515,7 @@ public void testMaxNumberOfTransfersPerShedderCycle() { unloads.put("broker5", new Unload("broker5", "bundleE-1", Optional.of("broker1"))); unloads.put("broker4", - new Unload("broker4", "bundleD-2", Optional.of("broker2"))); + new Unload("broker4", "bundleD-1", Optional.of("broker2"))); expected.setLabel(Success); expected.setReason(Overloaded); expected.setLoadAvg(setupLoadAvg); @@ -528,7 +528,7 @@ public void testRemainingTopBundles() { TransferShedder transferShedder = new TransferShedder(); var ctx = setupContext(); var topBundlesLoadDataStore = ctx.topBundleLoadDataStore(); - topBundlesLoadDataStore.pushAsync("broker5", getTopBundlesLoad("bundleE", 20, 20)); + topBundlesLoadDataStore.pushAsync("broker5", getTopBundlesLoad("bundleE", 3000000, 2000000)); var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of()); var expected = new UnloadDecision(); @@ -536,7 +536,7 @@ public void testRemainingTopBundles() { unloads.put("broker5", new Unload("broker5", "bundleE-1", Optional.of("broker1"))); unloads.put("broker4", - new Unload("broker4", "bundleD-2", Optional.of("broker2"))); + new Unload("broker4", "bundleD-1", Optional.of("broker2"))); expected.setLabel(Success); expected.setReason(Overloaded); expected.setLoadAvg(setupLoadAvg); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java index 2856dde892a8ff..da0866c689746e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java @@ -168,7 +168,7 @@ public void testNoLoadDataBrokers() { private BrokerLoadData createBrokerData(LoadManagerContext ctx, double usage, double limit) { var brokerLoadData = new BrokerLoadData(); SystemResourceUsage usages = createUsage(usage, limit); - brokerLoadData.update(usages, 1, 1, 1, 1, + brokerLoadData.update(usages, 1, 1, 1, 1, 1, ctx.brokerConfiguration()); return brokerLoadData; } @@ -185,7 +185,7 @@ private SystemResourceUsage createUsage(double usage, double limit) { private void updateLoad(LoadManagerContext ctx, String broker, double usage) { ctx.brokerLoadDataStore().get(broker).get().update(createUsage(usage, 100.0), - 1, 1, 1, 1, ctx.brokerConfiguration()); + 1, 1, 1, 1, 1, ctx.brokerConfiguration()); } public static LoadManagerContext getContext() {