Skip to content

Commit

Permalink
Address reviewer's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Demogorgon314 committed Nov 14, 2022
1 parent 09ddd25 commit c4ef416
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,39 +51,50 @@ public interface BrokerRegistry {

/**
* Get the current broker lookup service address.
*
* @return The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port
*/
String getLookupServiceAddress();

/**
* Get available brokers.
*
* @return The brokers service url list.
*/
List<String> getAvailableBrokers();

/**
* Async get available brokers.
*
* @return The brokers service url list.
*/
CompletableFuture<List<String>> getAvailableBrokersAsync();

/**
* Fetch local-broker data from load-manager broker cache.
*
* @param broker The load-balancer path.
* @param broker The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port
*/
Optional<BrokerLookupData> lookup(String broker);

/**
* For each the broker lookup data.
* The key is lookupServiceAddress
* The key is lookupServiceAddress{@link BrokerRegistry#getLookupServiceAddress()}
*/
void forEach(BiConsumer<String, BrokerLookupData> action);

/**
* Listen the broker register change.
*
* @param listener Key is lookup service address{@link BrokerRegistry#getLookupServiceAddress()}
* Value is notification type.
*/
void listen(BiConsumer<String, NotificationType> listener);

/**
* Close the broker registry.
*
* @throws PulsarServerException if it fails to close the broker registry.
*/
void close() throws Exception;
void close() throws PulsarServerException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.ServiceUnitId;

/**
* Find the appropriate broker for bundle through different load balancer Implementation.
* Find the appropriate broker for service unit (e.g. bundle) through different load balancer Implementation.
*/
public interface ExtensibleLoadManager {

Expand All @@ -50,24 +53,20 @@ public interface ExtensibleLoadManager {
void initialize(PulsarService pulsar);

/**
* The incoming bundle selects the appropriate broker through strategies.
* The incoming service unit (e.g. bundle) selects the appropriate broker through strategies.
*
* @param serviceUnit Bundle.
* @return Simple resource.
*/
Optional<String> discover(ServiceUnitId serviceUnit);


/**
* The incoming bundle selects the appropriate broker through strategies.
*
* @param serviceUnit Bundle.
* @param topic The optional topic, some method won't provide topic var in this param
* (e.g. {@link NamespaceService#internalGetWebServiceUrl(NamespaceBundle, LookupOptions)}),
* So the topic is optional.
* @param serviceUnit service unit (e.g. bundle).
* @return Simple resource.
*/
CompletableFuture<Optional<LookupResult>> assign(Optional<ServiceUnitId> topic, ServiceUnitId serviceUnit);

/**
* Stop the load manager.
* Close the load manager.
*
* @throws PulsarServerException if it fails to stop the load manager.
*/
void stop() throws PulsarServerException;
void close() throws PulsarServerException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,8 @@
*/
package org.apache.pulsar.broker.loadbalance.extensible.data;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Data;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;

Expand All @@ -41,12 +31,6 @@
*/
@Data
public class BrokerLoadData {

public static final String TOPIC =
TopicDomain.non_persistent
+ "://"
+ NamespaceName.SYSTEM_NAMESPACE
+ "/broker-load-data";
private static final double gigaBitToByte = 128 * 1024 * 1024.0;

// Most recently available system resource usage.
Expand All @@ -65,54 +49,24 @@ public class BrokerLoadData {
private double msgRateIn;
private double msgRateOut;

// Timestamp of last update.
private long lastUpdate;

// The stats given in the most recent invocation of update.
private Map<String, NamespaceBundleStats> lastStats;

private int numTopics;
private int numBundles;
private int numConsumers;
private int numProducers;

// All bundles belonging to this broker.
private Set<String> bundles;

// The bundles gained since the last invocation of update.
private Set<String> lastBundleGains;

// The bundles lost since the last invocation of update.
private Set<String> lastBundleLosses;

public BrokerLoadData() {
lastStats = new ConcurrentHashMap<>();
lastUpdate = System.currentTimeMillis();
cpu = new ResourceUsage();
memory = new ResourceUsage();
directMemory = new ResourceUsage();
bandwidthIn = new ResourceUsage();
bandwidthOut = new ResourceUsage();
msgThroughputInUsage = new ResourceUsage();
msgThroughputOutUsage = new ResourceUsage();
bundles = new HashSet<>();
lastBundleGains = new HashSet<>();
lastBundleLosses = new HashSet<>();
}

/**
* Using the system resource usage and bundle stats acquired from the Pulsar client, update this LocalBrokerData.
*
* @param systemResourceUsage
* System resource usage (cpu, memory, and direct memory).
* @param bundleStats
* The bundle stats retrieved from the Pulsar client.
*/
public void update(final SystemResourceUsage systemResourceUsage,
final Map<String, NamespaceBundleStats> bundleStats) {
public void update(final SystemResourceUsage systemResourceUsage) {
updateSystemResourceUsage(systemResourceUsage);
updateBundleData(bundleStats);
lastStats = bundleStats;
}

/**
Expand All @@ -123,8 +77,6 @@ public void update(final SystemResourceUsage systemResourceUsage,
*/
public void update(final BrokerLoadData other) {
updateSystemResourceUsage(other.cpu, other.memory, other.directMemory, other.bandwidthIn, other.bandwidthOut);
updateBundleData(other.lastStats);
lastStats = other.lastStats;
}

// Set the cpu, memory, and direct memory to that of the new system resource usage data.
Expand All @@ -144,57 +96,6 @@ private void updateSystemResourceUsage(final ResourceUsage cpu, final ResourceUs
this.bandwidthOut = bandwidthOut;
}

// Aggregate all message, throughput, topic count, bundle count, consumer
// count, and producer count across the
// given data. Also keep track of bundle gains and losses.
private void updateBundleData(final Map<String, NamespaceBundleStats> bundleStats) {
msgRateIn = 0;
msgRateOut = 0;
msgThroughputIn = 0;
msgThroughputOut = 0;
int totalNumTopics = 0;
int totalNumBundles = 0;
int totalNumConsumers = 0;
int totalNumProducers = 0;
final Iterator<String> oldBundleIterator = bundles.iterator();
while (oldBundleIterator.hasNext()) {
final String bundle = oldBundleIterator.next();
if (!bundleStats.containsKey(bundle)) {
// If this bundle is in the old bundle set but not the new one,
// we lost it.
lastBundleLosses.add(bundle);
oldBundleIterator.remove();
}
}
for (Map.Entry<String, NamespaceBundleStats> entry : bundleStats.entrySet()) {
final String bundle = entry.getKey();
final NamespaceBundleStats stats = entry.getValue();
if (!bundles.contains(bundle)) {
// If this bundle is in the new bundle set but not the old one,
// we gained it.
lastBundleGains.add(bundle);
bundles.add(bundle);
}
msgThroughputIn += stats.msgThroughputIn;
msgThroughputOut += stats.msgThroughputOut;
msgRateIn += stats.msgRateIn;
msgRateOut += stats.msgRateOut;
totalNumTopics += stats.topics;
++totalNumBundles;
totalNumConsumers += stats.consumerCount;
totalNumProducers += stats.producerCount;
}
numTopics = totalNumTopics;
numBundles = totalNumBundles;
numConsumers = totalNumConsumers;
numProducers = totalNumProducers;
}

public void cleanDeltas() {
lastBundleGains.clear();
lastBundleLosses.clear();
}

public double getMaxResourceUsage() {
return max(cpu.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(),
bandwidthOut.percentUsage()) / 100;
Expand Down Expand Up @@ -273,20 +174,6 @@ private static double maxWithinLimit(double limit, double...args) {
return max;
}

public String printResourceUsage(ServiceConfiguration conf) {
double nicSpeedBytesInSec = getNicSpeedBytesInSec(conf);
return String.format(
Locale.ENGLISH,
"cpu: %.2f%%, memory: %.2f%%, directMemory: %.2f%%,"
+ " bandwidthIn: %.2f%%, bandwidthOut: %.2f%%,"
+ " msgThroughputIn: %.2f%%, msgThroughputOut: %.2f%%",
cpu.percentUsage(), memory.percentUsage(), directMemory.percentUsage(),
bandwidthIn.percentUsage(),
bandwidthOut.percentUsage(),
getMsgThroughputInUsage(nicSpeedBytesInSec).percentUsage(),
getMsgThroughputOutUsage(nicSpeedBytesInSec).percentUsage());
}

private static double max(double...args) {
double max = Double.NEGATIVE_INFINITY;

Expand All @@ -298,27 +185,4 @@ private static double max(double...args) {

return max;
}

public static LocalBrokerData convertToLoadManagerReport(BrokerLoadData brokerLoadData) {
LocalBrokerData localBrokerData = new LocalBrokerData();
localBrokerData.setCpu(brokerLoadData.getCpu());
localBrokerData.setMemory(brokerLoadData.getMemory());
localBrokerData.setBandwidthIn(brokerLoadData.getBandwidthIn());
localBrokerData.setBandwidthOut(brokerLoadData.getBandwidthOut());
localBrokerData.setBundles(brokerLoadData.getBundles());
localBrokerData.setDirectMemory(brokerLoadData.getDirectMemory());
localBrokerData.setLastStats(brokerLoadData.getLastStats());
localBrokerData.setLastBundleGains(brokerLoadData.getLastBundleGains());
localBrokerData.setLastBundleLosses(brokerLoadData.getLastBundleLosses());
localBrokerData.setLastUpdate(brokerLoadData.getLastUpdate());
localBrokerData.setNumBundles(brokerLoadData.getNumBundles());
localBrokerData.setNumTopics(brokerLoadData.getNumTopics());
localBrokerData.setNumProducers(brokerLoadData.getNumProducers());
localBrokerData.setNumConsumers(brokerLoadData.getNumConsumers());
localBrokerData.setMsgRateIn(brokerLoadData.getMsgRateIn());
localBrokerData.setMsgRateOut(brokerLoadData.getMsgRateOut());
localBrokerData.setMsgThroughputIn(brokerLoadData.getMsgThroughputIn());
localBrokerData.setMsgThroughputOut(brokerLoadData.getMsgThroughputOut());
return localBrokerData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,18 @@
*/
package org.apache.pulsar.broker.loadbalance.extensible.data;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.Getter;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;

/**
* Defines the information of top bundles load data.
*/
@Getter
public class TopBundlesLoadData {

public static final String TOPIC =
TopicDomain.non_persistent
+ "://"
+ NamespaceName.SYSTEM_NAMESPACE
+ "/top-bundle-load-data";

@JsonProperty("top_bundles_load_data")
private final List<BundleLoadData> topBundlesLoadData;

public record BundleLoadData(String bundleName, NamespaceBundleStats stats) {
Expand All @@ -53,6 +46,12 @@ private TopBundlesLoadData(List<BundleLoadData> bundleStats, int topK) {
.collect(Collectors.toList());
}

/**
* Give full bundle stats, and return the top K bundle stats.
*
* @param bundleStats full bundle stats.
* @param topK Top K bundles.
*/
public static TopBundlesLoadData of(List<BundleLoadData> bundleStats, int topK) {
return new TopBundlesLoadData(bundleStats, topK);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,8 @@ public interface NamespaceBundleSplitStrategy {
/**
* Determines which bundles, if any, should be split.
*
* @param context
* Load data to base decisions on (does not have benefit of preallocated data since this may not be the
* leader broker).
* @param pulsarService
* Pulasr service to use.
* @param context The context used for decisions.
* @param pulsarService Pulsar service to use.
* @return A set of the bundles that should be split.
*/
Set<Split> findBundlesToSplit(LoadManagerContext context, PulsarService pulsarService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,14 @@
*/
public interface NamespaceUnloadStrategy {


/**
* Recommend that all the returned bundles be unloaded.
*
* @param context The context used for decisions.
* @param recentlyUnloadedBundles
* The recently unloaded bundles.
* @return A list of the bundles that should be unloaded.
*/
List<Unload> findBundlesForUnloading(LoadManagerContext context,
Map<String, Long> recentlyUnloadedBundles);

Expand Down
Loading

0 comments on commit c4ef416

Please sign in to comment.