Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-192 Moved the common broker load data feature(weightedMaxEMA) to BrokerLoadData #19154

Merged
merged 3 commits into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.data;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
Expand All @@ -29,59 +31,99 @@
* Migrate from {@link org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData}.
* And removed the lookup data, see {@link BrokerLookupData}
*/
@Data
@Getter
@EqualsAndHashCode
public class BrokerLoadData {

private static final double DEFAULT_RESOURCE_USAGE = 1.0d;

// Most recently available system resource usage.
private ResourceUsage cpu;
private ResourceUsage memory;
private ResourceUsage directMemory;

private ResourceUsage bandwidthIn;
private ResourceUsage bandwidthOut;

// Message data from the most recent namespace bundle stats.
private double msgThroughputIn;
private ResourceUsage msgThroughputInUsage;
private double msgThroughputOut;
private ResourceUsage msgThroughputOutUsage;
private double msgRateIn;
private double msgRateOut;
private double msgThroughputIn; // bytes/sec
private double msgThroughputOut; // bytes/sec
private double msgRateIn; // messages/sec
private double msgRateOut; // messages/sec

// Load data features computed from the above resources.
private double maxResourceUsage; // max of resource usages
/**
* Exponential moving average(EMA) of max of weighted resource usages among
* cpu, memory, directMemory, bandwidthIn and bandwidthOut.
*
* The resource weights are configured by :
* loadBalancerCPUResourceWeight,
* loadBalancerMemoryResourceWeight,
* loadBalancerDirectMemoryResourceWeight,
* loadBalancerBandwithInResourceWeight, and
* loadBalancerBandwithOutResourceWeight.
*
* The historical resource percentage is configured by loadBalancerHistoryResourcePercentage.
*/
private double weightedMaxEMA;
private long updatedAt;

public BrokerLoadData() {
cpu = new ResourceUsage();
memory = new ResourceUsage();
directMemory = new ResourceUsage();
bandwidthIn = new ResourceUsage();
bandwidthOut = new ResourceUsage();
msgThroughputInUsage = new ResourceUsage();
msgThroughputOutUsage = new ResourceUsage();
maxResourceUsage = DEFAULT_RESOURCE_USAGE;
weightedMaxEMA = DEFAULT_RESOURCE_USAGE;
}

/**
* Using the system resource usage and bundle stats acquired from the Pulsar client, update this LocalBrokerData.
* Using the system resource usage from the Pulsar client, update this BrokerLoadData.
*
* @param systemResourceUsage
* @param usage
* System resource usage (cpu, memory, and direct memory).
* @param msgThroughputIn
* broker-level message input throughput in bytes/s.
* @param msgThroughputOut
* broker-level message output throughput in bytes/s.
* @param msgRateIn
* broker-level message input rate in messages/s.
* @param msgRateOut
* broker-level message output rate in messages/s.
* @param conf
* Service configuration to compute load data features.
*/
public void update(final SystemResourceUsage systemResourceUsage) {
updateSystemResourceUsage(systemResourceUsage);
public void update(final SystemResourceUsage usage,
double msgThroughputIn,
double msgThroughputOut,
double msgRateIn,
double msgRateOut,
ServiceConfiguration conf) {
updateSystemResourceUsage(usage.cpu, usage.memory, usage.directMemory, usage.bandwidthIn, usage.bandwidthOut);
this.msgThroughputIn = msgThroughputIn;
this.msgThroughputOut = msgThroughputOut;
this.msgRateIn = msgRateIn;
this.msgRateOut = msgRateOut;
updateFeatures(conf);
updatedAt = System.currentTimeMillis();
}

/**
* Using another LocalBrokerData, update this.
* Using another BrokerLoadData, update this.
*
* @param other
* LocalBrokerData to update from.
* BrokerLoadData to update from.
*/
public void update(final BrokerLoadData other) {
updateSystemResourceUsage(other.cpu, other.memory, other.directMemory, other.bandwidthIn, other.bandwidthOut);
}

// Set the cpu, memory, and direct memory to that of the new system resource usage data.
private void updateSystemResourceUsage(final SystemResourceUsage systemResourceUsage) {
updateSystemResourceUsage(systemResourceUsage.cpu, systemResourceUsage.memory, systemResourceUsage.directMemory,
systemResourceUsage.bandwidthIn, systemResourceUsage.bandwidthOut);
msgThroughputIn = other.msgThroughputIn;
msgThroughputOut = other.msgThroughputOut;
msgRateIn = other.msgRateIn;
msgRateOut = other.msgRateOut;
weightedMaxEMA = other.weightedMaxEMA;
maxResourceUsage = other.maxResourceUsage;
updatedAt = other.updatedAt;
}

// Update resource usage given each individual usage.
Expand All @@ -95,17 +137,54 @@ private void updateSystemResourceUsage(final ResourceUsage cpu, final ResourceUs
this.bandwidthOut = bandwidthOut;
}

public double getMaxResourceUsage() {
return LocalBrokerData.max(cpu.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(),
private void updateFeatures(ServiceConfiguration conf) {
updateMaxResourceUsage();
updateWeightedMaxEMA(conf);
}

private void updateMaxResourceUsage() {
maxResourceUsage = LocalBrokerData.max(cpu.percentUsage(), directMemory.percentUsage(),
bandwidthIn.percentUsage(),
bandwidthOut.percentUsage()) / 100;
}

public double getMaxResourceUsageWithWeight(final double cpuWeight, final double memoryWeight,
private double getMaxResourceUsageWithWeight(final double cpuWeight, final double memoryWeight,
final double directMemoryWeight, final double bandwidthInWeight,
final double bandwidthOutWeight) {
return LocalBrokerData.max(cpu.percentUsage() * cpuWeight, memory.percentUsage() * memoryWeight,
directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight,
bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
}

private void updateWeightedMaxEMA(ServiceConfiguration conf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does EMA mean? Is it better to add a description for "EMA"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is an exponential moving average. Updated the comment for the weightedMaxEMA variable.

var historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
var weightedMax = getMaxResourceUsageWithWeight(
conf.getLoadBalancerCPUResourceWeight(),
conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
conf.getLoadBalancerBandwithInResourceWeight(),
conf.getLoadBalancerBandwithOutResourceWeight());
weightedMaxEMA = updatedAt == 0 ? weightedMax :
weightedMaxEMA * historyPercentage + (1 - historyPercentage) * weightedMax;
}

public String toString(ServiceConfiguration conf) {
return String.format("cpu= %.2f%%, memory= %.2f%%, directMemory= %.2f%%, "
+ "bandwithIn= %.2f%%, bandwithOut= %.2f%%, "
+ "cpuWeight= %f, memoryWeight= %f, directMemoryWeight= %f, "
+ "bandwithInResourceWeight= %f, bandwithOutResourceWeight= %f, "
+ "msgThroughputIn= %.2f, msgThroughputOut= %.2f, msgRateIn= %.2f, msgRateOut= %.2f, "
+ "maxResourceUsage= %.2f%%, weightedMaxEMA= %.2f%%, updatedAt= %d",

cpu.percentUsage(), memory.percentUsage(), directMemory.percentUsage(),
bandwidthIn.percentUsage(), bandwidthOut.percentUsage(),
conf.getLoadBalancerCPUResourceWeight(),
conf.getLoadBalancerMemoryResourceWeight(),
conf.getLoadBalancerDirectMemoryResourceWeight(),
conf.getLoadBalancerBandwithInResourceWeight(),
conf.getLoadBalancerBandwithOutResourceWeight(),
msgThroughputIn, msgThroughputOut, msgRateIn, msgRateOut,
maxResourceUsage * 100, weightedMaxEMA * 100, updatedAt
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
package org.apache.pulsar.broker.loadbalance.extensions.strategy;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
Expand All @@ -39,81 +37,37 @@
*/
@Slf4j
public class LeastResourceUsageWithWeight implements BrokerSelectionStrategy {
private static final double MAX_RESOURCE_USAGE = 1.0d;
// Maintain this list to reduce object creation.
private final ArrayList<String> bestBrokers;
private final Set<String> noLoadDataBrokers;
private final Map<String, Double> brokerAvgResourceUsageWithWeight;

public LeastResourceUsageWithWeight() {
this.bestBrokers = new ArrayList<>();
this.brokerAvgResourceUsageWithWeight = new HashMap<>();
this.noLoadDataBrokers = new HashSet<>();
}

// A broker's max resource usage with weight using its historical load and short-term load data with weight.
private double getMaxResourceUsageWithWeight(final String broker, final BrokerLoadData brokerLoadData,
final ServiceConfiguration conf, boolean debugMode) {
final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
final double maxUsageWithWeight =
updateAndGetMaxResourceUsageWithWeight(broker, brokerLoadData, conf, debugMode);
final var maxUsageWithWeight = brokerLoadData.getWeightedMaxEMA();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The weightedMaxEMA can be null. Should we handle the null case here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this comment.

I changed this weightedMaxEMA to 'double` and set the default value to 1.0.

Ideally, the downstream logics using this BrokerLoadData should not worry if it is updated or not.

Let's ensure we publish BrokerLoadData to the service unit channel only when it has been updated at least once.

Also, accordingly, I cleaned up BrokerLoadData.



if (maxUsageWithWeight > overloadThreshold) {
log.warn(
"Broker {} is overloaded, max resource usage with weight percentage: {}%, "
+ "CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+ "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+ "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
broker, maxUsageWithWeight * 100,
brokerLoadData.getCpu().percentUsage(), brokerLoadData.getMemory().percentUsage(),
brokerLoadData.getDirectMemory().percentUsage(), brokerLoadData.getBandwidthIn().percentUsage(),
brokerLoadData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
conf.getLoadBalancerBandwithInResourceWeight(),
conf.getLoadBalancerBandwithOutResourceWeight());
"Broker {} is overloaded, brokerLoad({}%) > overloadThreshold({}%). load data:{{}}",
broker,
maxUsageWithWeight * 100,
overloadThreshold * 100,
brokerLoadData.toString(conf));
} else if (debugMode) {
log.info("Broker {} load data:{{}}", broker, brokerLoadData.toString(conf));
}

if (debugMode) {
log.info("Broker {} has max resource usage with weight percentage: {}%",
broker, maxUsageWithWeight * 100);
}

return maxUsageWithWeight;
}

/**
* Update and get the max resource usage with weight of broker according to the service configuration.
*
* @param broker The broker name.
* @param brokerData The broker load data.
* @param conf The service configuration.
* @param debugMode The debug mode to print computed load states and decision information.
* @return the max resource usage with weight of broker
*/
private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerLoadData brokerData,
ServiceConfiguration conf, boolean debugMode) {
final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
double resourceUsage = brokerData.getMaxResourceUsageWithWeight(
conf.getLoadBalancerCPUResourceWeight(),
conf.getLoadBalancerMemoryResourceWeight(),
conf.getLoadBalancerDirectMemoryResourceWeight(),
conf.getLoadBalancerBandwithInResourceWeight(),
conf.getLoadBalancerBandwithOutResourceWeight());
historyUsage = historyUsage == null
? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
if (debugMode) {
log.info(
"Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+ "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+ "OUT weight: {} ",
broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
conf.getLoadBalancerBandwithInResourceWeight(),
conf.getLoadBalancerBandwithOutResourceWeight());
}
Comment on lines -104 to -113
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the debug log be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I think we can keep this.

Added BrokerLoadData.toString() and added the debug log.

else if (debugMode) {
            log.info("Broker {} load data:{{}}", broker, brokerLoadData.toString(conf));
        }

brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
return historyUsage;
}

/**
* Find a suitable broker to assign the given bundle to.
Expand Down Expand Up @@ -143,7 +97,7 @@ public Optional<String> select(List<String> candidates, ServiceUnitId bundleToAs
for (String broker : candidates) {
var brokerLoadDataOptional = context.brokerLoadDataStore().get(broker);
if (brokerLoadDataOptional.isEmpty()) {
log.warn("There is no broker load data for broker:{}. Skipping this broker.", broker);
log.warn("There is no broker load data for broker:{}. Skipping this broker. Phase one", broker);
noLoadDataBrokers.add(broker);
continue;
}
Expand All @@ -162,12 +116,17 @@ public Optional<String> select(List<String> candidates, ServiceUnitId bundleToAs
if (debugMode) {
log.info("Computed avgUsage:{}, diffThreshold:{}", avgUsage, diffThreshold);
}
candidates.forEach(broker -> {
Double avgResUsage = brokerAvgResourceUsageWithWeight.getOrDefault(broker, MAX_RESOURCE_USAGE);
for (String broker : candidates) {
var brokerLoadDataOptional = context.brokerLoadDataStore().get(broker);
if (brokerLoadDataOptional.isEmpty()) {
log.warn("There is no broker load data for broker:{}. Skipping this broker. Phase two", broker);
continue;
}
double avgResUsage = brokerLoadDataOptional.get().getWeightedMaxEMA();
if ((avgResUsage + diffThreshold <= avgUsage && !noLoadDataBrokers.contains(broker))) {
bestBrokers.add(broker);
}
});
}
}

if (bestBrokers.isEmpty()) {
Expand Down
Loading