diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/BrokerRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/BrokerRegistry.java index 860d2cade14eff..f653534b8051cd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/BrokerRegistry.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/BrokerRegistry.java @@ -51,39 +51,50 @@ public interface BrokerRegistry { /** * Get the current broker lookup service address. + * + * @return The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port */ String getLookupServiceAddress(); /** * Get available brokers. + * + * @return The brokers service url list. */ List getAvailableBrokers(); /** * Async get available brokers. + * + * @return The brokers service url list. */ CompletableFuture> getAvailableBrokersAsync(); /** * Fetch local-broker data from load-manager broker cache. * - * @param broker The load-balancer path. + * @param broker The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port */ Optional lookup(String broker); /** * For each the broker lookup data. - * The key is lookupServiceAddress + * The key is lookupServiceAddress{@link BrokerRegistry#getLookupServiceAddress()} */ void forEach(BiConsumer action); /** * Listen the broker register change. + * + * @param listener Key is lookup service address{@link BrokerRegistry#getLookupServiceAddress()} + * Value is notification type. */ void listen(BiConsumer listener); /** * Close the broker registry. + * + * @throws PulsarServerException if it fails to close the broker registry. */ - void close() throws Exception; + void close() throws PulsarServerException; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/ExtensibleLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/ExtensibleLoadManager.java index 4ca7d3fdaf4bb5..5faf1c3a217d61 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/ExtensibleLoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/ExtensibleLoadManager.java @@ -23,10 +23,13 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.lookup.LookupResult; +import org.apache.pulsar.broker.namespace.LookupOptions; +import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.ServiceUnitId; /** - * Find the appropriate broker for bundle through different load balancer Implementation. + * Find the appropriate broker for service unit (e.g. bundle) through different load balancer Implementation. */ public interface ExtensibleLoadManager { @@ -50,24 +53,20 @@ public interface ExtensibleLoadManager { void initialize(PulsarService pulsar); /** - * The incoming bundle selects the appropriate broker through strategies. + * The incoming service unit (e.g. bundle) selects the appropriate broker through strategies. * - * @param serviceUnit Bundle. - * @return Simple resource. - */ - Optional discover(ServiceUnitId serviceUnit); - - - /** - * The incoming bundle selects the appropriate broker through strategies. - * - * @param serviceUnit Bundle. + * @param topic The optional topic, some method won't provide topic var in this param + * (e.g. {@link NamespaceService#internalGetWebServiceUrl(NamespaceBundle, LookupOptions)}), + * So the topic is optional. + * @param serviceUnit service unit (e.g. bundle). * @return Simple resource. */ CompletableFuture> assign(Optional topic, ServiceUnitId serviceUnit); /** - * Stop the load manager. + * Close the load manager. + * + * @throws PulsarServerException if it fails to stop the load manager. */ - void stop() throws PulsarServerException; + void close() throws PulsarServerException; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/data/BrokerLoadData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/data/BrokerLoadData.java index d251085384879c..0edcebd9783aa3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/data/BrokerLoadData.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/data/BrokerLoadData.java @@ -18,18 +18,8 @@ */ package org.apache.pulsar.broker.loadbalance.extensible.data; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import lombok.Data; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.TopicDomain; -import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; -import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; @@ -41,12 +31,6 @@ */ @Data public class BrokerLoadData { - - public static final String TOPIC = - TopicDomain.non_persistent - + "://" - + NamespaceName.SYSTEM_NAMESPACE - + "/broker-load-data"; private static final double gigaBitToByte = 128 * 1024 * 1024.0; // Most recently available system resource usage. @@ -65,29 +49,7 @@ public class BrokerLoadData { private double msgRateIn; private double msgRateOut; - // Timestamp of last update. - private long lastUpdate; - - // The stats given in the most recent invocation of update. - private Map lastStats; - - private int numTopics; - private int numBundles; - private int numConsumers; - private int numProducers; - - // All bundles belonging to this broker. - private Set bundles; - - // The bundles gained since the last invocation of update. - private Set lastBundleGains; - - // The bundles lost since the last invocation of update. - private Set lastBundleLosses; - public BrokerLoadData() { - lastStats = new ConcurrentHashMap<>(); - lastUpdate = System.currentTimeMillis(); cpu = new ResourceUsage(); memory = new ResourceUsage(); directMemory = new ResourceUsage(); @@ -95,9 +57,6 @@ public BrokerLoadData() { bandwidthOut = new ResourceUsage(); msgThroughputInUsage = new ResourceUsage(); msgThroughputOutUsage = new ResourceUsage(); - bundles = new HashSet<>(); - lastBundleGains = new HashSet<>(); - lastBundleLosses = new HashSet<>(); } /** @@ -105,14 +64,9 @@ public BrokerLoadData() { * * @param systemResourceUsage * System resource usage (cpu, memory, and direct memory). - * @param bundleStats - * The bundle stats retrieved from the Pulsar client. */ - public void update(final SystemResourceUsage systemResourceUsage, - final Map bundleStats) { + public void update(final SystemResourceUsage systemResourceUsage) { updateSystemResourceUsage(systemResourceUsage); - updateBundleData(bundleStats); - lastStats = bundleStats; } /** @@ -123,8 +77,6 @@ public void update(final SystemResourceUsage systemResourceUsage, */ public void update(final BrokerLoadData other) { updateSystemResourceUsage(other.cpu, other.memory, other.directMemory, other.bandwidthIn, other.bandwidthOut); - updateBundleData(other.lastStats); - lastStats = other.lastStats; } // Set the cpu, memory, and direct memory to that of the new system resource usage data. @@ -144,57 +96,6 @@ private void updateSystemResourceUsage(final ResourceUsage cpu, final ResourceUs this.bandwidthOut = bandwidthOut; } - // Aggregate all message, throughput, topic count, bundle count, consumer - // count, and producer count across the - // given data. Also keep track of bundle gains and losses. - private void updateBundleData(final Map bundleStats) { - msgRateIn = 0; - msgRateOut = 0; - msgThroughputIn = 0; - msgThroughputOut = 0; - int totalNumTopics = 0; - int totalNumBundles = 0; - int totalNumConsumers = 0; - int totalNumProducers = 0; - final Iterator oldBundleIterator = bundles.iterator(); - while (oldBundleIterator.hasNext()) { - final String bundle = oldBundleIterator.next(); - if (!bundleStats.containsKey(bundle)) { - // If this bundle is in the old bundle set but not the new one, - // we lost it. - lastBundleLosses.add(bundle); - oldBundleIterator.remove(); - } - } - for (Map.Entry entry : bundleStats.entrySet()) { - final String bundle = entry.getKey(); - final NamespaceBundleStats stats = entry.getValue(); - if (!bundles.contains(bundle)) { - // If this bundle is in the new bundle set but not the old one, - // we gained it. - lastBundleGains.add(bundle); - bundles.add(bundle); - } - msgThroughputIn += stats.msgThroughputIn; - msgThroughputOut += stats.msgThroughputOut; - msgRateIn += stats.msgRateIn; - msgRateOut += stats.msgRateOut; - totalNumTopics += stats.topics; - ++totalNumBundles; - totalNumConsumers += stats.consumerCount; - totalNumProducers += stats.producerCount; - } - numTopics = totalNumTopics; - numBundles = totalNumBundles; - numConsumers = totalNumConsumers; - numProducers = totalNumProducers; - } - - public void cleanDeltas() { - lastBundleGains.clear(); - lastBundleLosses.clear(); - } - public double getMaxResourceUsage() { return max(cpu.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(), bandwidthOut.percentUsage()) / 100; @@ -273,20 +174,6 @@ private static double maxWithinLimit(double limit, double...args) { return max; } - public String printResourceUsage(ServiceConfiguration conf) { - double nicSpeedBytesInSec = getNicSpeedBytesInSec(conf); - return String.format( - Locale.ENGLISH, - "cpu: %.2f%%, memory: %.2f%%, directMemory: %.2f%%," - + " bandwidthIn: %.2f%%, bandwidthOut: %.2f%%," - + " msgThroughputIn: %.2f%%, msgThroughputOut: %.2f%%", - cpu.percentUsage(), memory.percentUsage(), directMemory.percentUsage(), - bandwidthIn.percentUsage(), - bandwidthOut.percentUsage(), - getMsgThroughputInUsage(nicSpeedBytesInSec).percentUsage(), - getMsgThroughputOutUsage(nicSpeedBytesInSec).percentUsage()); - } - private static double max(double...args) { double max = Double.NEGATIVE_INFINITY; @@ -298,27 +185,4 @@ private static double max(double...args) { return max; } - - public static LocalBrokerData convertToLoadManagerReport(BrokerLoadData brokerLoadData) { - LocalBrokerData localBrokerData = new LocalBrokerData(); - localBrokerData.setCpu(brokerLoadData.getCpu()); - localBrokerData.setMemory(brokerLoadData.getMemory()); - localBrokerData.setBandwidthIn(brokerLoadData.getBandwidthIn()); - localBrokerData.setBandwidthOut(brokerLoadData.getBandwidthOut()); - localBrokerData.setBundles(brokerLoadData.getBundles()); - localBrokerData.setDirectMemory(brokerLoadData.getDirectMemory()); - localBrokerData.setLastStats(brokerLoadData.getLastStats()); - localBrokerData.setLastBundleGains(brokerLoadData.getLastBundleGains()); - localBrokerData.setLastBundleLosses(brokerLoadData.getLastBundleLosses()); - localBrokerData.setLastUpdate(brokerLoadData.getLastUpdate()); - localBrokerData.setNumBundles(brokerLoadData.getNumBundles()); - localBrokerData.setNumTopics(brokerLoadData.getNumTopics()); - localBrokerData.setNumProducers(brokerLoadData.getNumProducers()); - localBrokerData.setNumConsumers(brokerLoadData.getNumConsumers()); - localBrokerData.setMsgRateIn(brokerLoadData.getMsgRateIn()); - localBrokerData.setMsgRateOut(brokerLoadData.getMsgRateOut()); - localBrokerData.setMsgThroughputIn(brokerLoadData.getMsgThroughputIn()); - localBrokerData.setMsgThroughputOut(brokerLoadData.getMsgThroughputOut()); - return localBrokerData; - } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/data/TopBundlesLoadData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/data/TopBundlesLoadData.java index ffe2e05ccc56cb..5b8e2446600646 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/data/TopBundlesLoadData.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/data/TopBundlesLoadData.java @@ -18,25 +18,18 @@ */ package org.apache.pulsar.broker.loadbalance.extensible.data; -import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; import lombok.Getter; -import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; +/** + * Defines the information of top bundles load data. + */ @Getter public class TopBundlesLoadData { - public static final String TOPIC = - TopicDomain.non_persistent - + "://" - + NamespaceName.SYSTEM_NAMESPACE - + "/top-bundle-load-data"; - - @JsonProperty("top_bundles_load_data") private final List topBundlesLoadData; public record BundleLoadData(String bundleName, NamespaceBundleStats stats) { @@ -53,6 +46,12 @@ private TopBundlesLoadData(List bundleStats, int topK) { .collect(Collectors.toList()); } + /** + * Give full bundle stats, and return the top K bundle stats. + * + * @param bundleStats full bundle stats. + * @param topK Top K bundles. + */ public static TopBundlesLoadData of(List bundleStats, int topK) { return new TopBundlesLoadData(bundleStats, topK); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/scheduler/NamespaceBundleSplitStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/scheduler/NamespaceBundleSplitStrategy.java index 1d368282a207e7..ea0be8505cc5eb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/scheduler/NamespaceBundleSplitStrategy.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/scheduler/NamespaceBundleSplitStrategy.java @@ -33,11 +33,8 @@ public interface NamespaceBundleSplitStrategy { /** * Determines which bundles, if any, should be split. * - * @param context - * Load data to base decisions on (does not have benefit of preallocated data since this may not be the - * leader broker). - * @param pulsarService - * Pulasr service to use. + * @param context The context used for decisions. + * @param pulsarService Pulsar service to use. * @return A set of the bundles that should be split. */ Set findBundlesToSplit(LoadManagerContext context, PulsarService pulsarService); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/scheduler/NamespaceUnloadStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/scheduler/NamespaceUnloadStrategy.java index f1d81fd2034154..69a6a11902d328 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/scheduler/NamespaceUnloadStrategy.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/scheduler/NamespaceUnloadStrategy.java @@ -31,7 +31,14 @@ */ public interface NamespaceUnloadStrategy { - + /** + * Recommend that all the returned bundles be unloaded. + * + * @param context The context used for decisions. + * @param recentlyUnloadedBundles + * The recently unloaded bundles. + * @return A list of the bundles that should be unloaded. + */ List findBundlesForUnloading(LoadManagerContext context, Map recentlyUnloadedBundles); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/store/LoadDataStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/store/LoadDataStore.java index baec378b8ed9c8..77ce2e4a20df86 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/store/LoadDataStore.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/store/LoadDataStore.java @@ -32,58 +32,21 @@ */ public interface LoadDataStore extends Closeable { - /** - * Push load data to store. - * - * @param key - * The load data key. - * @param loadData - * The load data. - */ - void push(String key, T loadData) throws LoadDataStoreException; - /** * Async push load data to store. * - * @param key - * The load data key. - * @param loadData - * The load data. + * @param key The load data key. (e.g. bundle) + * @param loadData The load data. */ CompletableFuture pushAsync(String key, T loadData); /** * Get load data by key. * - * @param key - * The load data key. + * @param key The load data key. (e.g. bundle) */ Optional get(String key); - /** - * Async get load data by key. - * - * @param key - * The load data key. - */ - CompletableFuture> getAsync(String key); - - /** - * Remove the load data by key async. - * - * @param key - * The load data key. - */ - CompletableFuture removeAsync(String key); - - /** - * Remove the load data by key. - * - * @param key - * The load data key. - */ - void remove(String key) throws LoadDataStoreException; - /** * Performs the given action for each entry in this map until all entries * have been processed or the action throws an exception. @@ -99,13 +62,4 @@ public interface LoadDataStore extends Closeable { */ Set> entrySet(); - /** - * Listen the load data change. - */ - void listen(BiConsumer listener); - - /** - * The load data key count. - */ - int size(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensible/data/BrokerLoadDataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensible/data/BrokerLoadDataTest.java index ee6d2593de64a9..908c28004df457 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensible/data/BrokerLoadDataTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensible/data/BrokerLoadDataTest.java @@ -23,6 +23,10 @@ import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; import org.testng.annotations.Test; +/** + * Unit test of {@link BrokerLoadData}. + * TODO: Add more units test. + */ @Test(groups = "broker") public class BrokerLoadDataTest {