Skip to content

Commit

Permalink
[fix][broker] Revert "[fix][load-balancer] skip mis-configured resour…
Browse files Browse the repository at this point in the history
…ce usage(>100%) in load balancer (#18645)
  • Loading branch information
Technoboy- authored Nov 28, 2022
1 parent b5946f1 commit 363d1a9
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData
ServiceConfiguration conf) {
final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeightWithinLimit(
double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
conf.getLoadBalancerCPUResourceWeight(),
conf.getLoadBalancerMemoryResourceWeight(),
conf.getLoadBalancerDirectMemoryResourceWeight(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,8 @@ private boolean needBrokerDataUpdate() {
long timeSinceLastReportWrittenToStore = System.currentTimeMillis() - localData.getLastUpdate();
if (timeSinceLastReportWrittenToStore > updateMaxIntervalMillis) {
log.info("Writing local data to metadata store because time since last"
+ " update exceeded threshold of {} minutes. ResourceUsage:[{}]",
conf.getLoadBalancerReportUpdateMaxIntervalMinutes(),
localData.printResourceUsage());
+ " update exceeded threshold of {} minutes",
conf.getLoadBalancerReportUpdateMaxIntervalMinutes());
// Always update after surpassing the maximum interval.
return true;
}
Expand All @@ -459,10 +458,9 @@ private boolean needBrokerDataUpdate() {
percentChange(lastData.getNumBundles(), localData.getNumBundles()))));
if (maxChange > conf.getLoadBalancerReportUpdateThresholdPercentage()) {
log.info("Writing local data to metadata store because maximum change {}% exceeded threshold {}%; "
+ "time since last report written is {} seconds. ResourceUsage:[{}]", maxChange,
+ "time since last report written is {} seconds", maxChange,
conf.getLoadBalancerReportUpdateThresholdPercentage(),
timeSinceLastReportWrittenToStore / 1000.0,
localData.printResourceUsage());
timeSinceLastReportWrittenToStore / 1000.0);
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,37 +56,16 @@ public class ThresholdShedder implements LoadSheddingStrategy {
private static final double LOWER_BOUNDARY_THRESHOLD_MARGIN = 0.5;

private static final double MB = 1024 * 1024;

private static final long LOAD_LOG_SAMPLE_DELAY_IN_SEC = 5 * 60; // 5 mins
private final Map<String, Double> brokerAvgResourceUsage = new HashMap<>();
private long lastSampledLoadLogTS = 0;


private static int toPercentage(double usage) {
return (int) (usage * 100);
}

private boolean canSampleLog() {
long now = System.currentTimeMillis() / 1000;
boolean sampleLog = now - lastSampledLoadLogTS >= LOAD_LOG_SAMPLE_DELAY_IN_SEC;
if (sampleLog) {
lastSampledLoadLogTS = now;
}
return sampleLog;
}

@Override
public Multimap<String, String> findBundlesForUnloading(final LoadData loadData, final ServiceConfiguration conf) {
selectedBundlesCache.clear();
boolean sampleLog = canSampleLog();
final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
final Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
final double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;

final double avgUsage = getBrokerAvgUsage(loadData, conf, sampleLog);
if (sampleLog) {
log.info("brokers' resource avgUsage:{}%", toPercentage(avgUsage));
}
final double avgUsage = getBrokerAvgUsage(loadData, conf.getLoadBalancerHistoryResourcePercentage(), conf);

if (avgUsage == 0) {
log.warn("average max resource usage is 0");
Expand All @@ -98,9 +77,8 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
final double currentUsage = brokerAvgResourceUsage.getOrDefault(broker, 0.0);

if (currentUsage < avgUsage + threshold) {
if (sampleLog) {
log.info("[{}] broker is not overloaded, ignoring at this point, currentUsage:{}%",
broker, toPercentage(currentUsage));
if (log.isDebugEnabled()) {
log.debug("[{}] broker is not overloaded, ignoring at this point", broker);
}
return;
}
Expand All @@ -111,13 +89,14 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
double minimumThroughputToOffload = brokerCurrentThroughput * percentOfTrafficToOffload;

if (minimumThroughputToOffload < minThroughputThreshold) {
if (sampleLog) {
log.info("[{}] broker is planning to shed throughput {} MByte/s less than "
if (log.isDebugEnabled()) {
log.debug("[{}] broker is planning to shed throughput {} MByte/s less than "
+ "minimumThroughputThreshold {} MByte/s, skipping bundle unload.",
broker, minimumThroughputToOffload / MB, minThroughputThreshold / MB);
}
return;
}

log.info(
"Attempting to shed load on {}, which has max resource usage above avgUsage and threshold {}%"
+ " > {}% + {}% -- Offloading at least {} MByte/s of traffic, left throughput {} MByte/s",
Expand Down Expand Up @@ -168,67 +147,30 @@ private void filterAndSelectBundle(LoadData loadData, Map<String, Long> recently
});
}

private double getBrokerAvgUsage(final LoadData loadData,
final ServiceConfiguration conf, boolean sampleLog) {
double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
private double getBrokerAvgUsage(final LoadData loadData, final double historyPercentage,
final ServiceConfiguration conf) {
double totalUsage = 0.0;
int totalBrokers = 0;

for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
LocalBrokerData localBrokerData = entry.getValue().getLocalData();
String broker = entry.getKey();
totalUsage += updateAvgResourceUsage(broker, localBrokerData, historyPercentage, conf, sampleLog);
totalUsage += updateAvgResourceUsage(broker, localBrokerData, historyPercentage, conf);
totalBrokers++;
}

return totalBrokers > 0 ? totalUsage / totalBrokers : 0;
}

private double updateAvgResourceUsage(String broker, LocalBrokerData localBrokerData,
final double historyPercentage, final ServiceConfiguration conf,
boolean sampleLog) {
final double historyPercentage, final ServiceConfiguration conf) {
Double historyUsage =
brokerAvgResourceUsage.get(broker);
double resourceUsage = localBrokerData.getMaxResourceUsageWithWeight(
conf.getLoadBalancerCPUResourceWeight(),
conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
conf.getLoadBalancerBandwithInResourceWeight(),
conf.getLoadBalancerBandwithOutResourceWeight());

if (sampleLog) {
log.info("{} broker load: historyUsage={}%, resourceUsage={}%",
broker,
historyUsage == null ? 0 : toPercentage(historyUsage),
toPercentage(resourceUsage));
}

// wrap if resourceUsage is bigger than 1.0
if (resourceUsage > 1.0) {
log.error("{} broker resourceUsage is bigger than 100%. "
+ "Some of the resource limits are mis-configured. "
+ "Try to disable the error resource signals by setting their weights to zero "
+ "or fix the resource limit configurations. "
+ "Ref:https://pulsar.apache.org/docs/administration-load-balance/#thresholdshedder "
+ "ResourceUsage:[{}], "
+ "CPUResourceWeight:{}, MemoryResourceWeight:{}, DirectMemoryResourceWeight:{}, "
+ "BandwithInResourceWeight:{}, BandwithOutResourceWeight:{}",
broker,
localBrokerData.printResourceUsage(),
conf.getLoadBalancerCPUResourceWeight(),
conf.getLoadBalancerMemoryResourceWeight(),
conf.getLoadBalancerDirectMemoryResourceWeight(),
conf.getLoadBalancerBandwithInResourceWeight(),
conf.getLoadBalancerBandwithOutResourceWeight());

resourceUsage = localBrokerData.getMaxResourceUsageWithWeightWithinLimit(
conf.getLoadBalancerCPUResourceWeight(),
conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
conf.getLoadBalancerBandwithInResourceWeight(),
conf.getLoadBalancerBandwithOutResourceWeight());

log.warn("{} broker recomputed max resourceUsage={}%. Skipped usage signals bigger than 100%",
broker, toPercentage(resourceUsage));
}
historyUsage = historyUsage == null
? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;

Expand All @@ -239,7 +181,7 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
// Select the broker with the most resource usage.
final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
final double avgUsage = getBrokerAvgUsage(loadData, conf.getLoadBalancerHistoryResourcePercentage(), conf);
Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
boolean hasBrokerBelowLowerBound = result.getLeft();
String maxUsageBroker = result.getRight();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ public void testBrokerNotReachThreshold() {
LoadData loadData = new LoadData();

LocalBrokerData broker1 = new LocalBrokerData();
broker1.setCpu(new ResourceUsage(1000, 100));
broker1.setMemory(new ResourceUsage(5000, 100));
broker1.setDirectMemory(new ResourceUsage(5000, 100));
broker1.setBandwidthIn(new ResourceUsage(500, 1000));
broker1.setBandwidthOut(new ResourceUsage(500, 1000));
broker1.setBundles(Sets.newHashSet("bundle-1"));
Expand Down Expand Up @@ -119,9 +116,6 @@ public void testBrokerWithMultipleBundles() {
LoadData loadData = new LoadData();

LocalBrokerData broker1 = new LocalBrokerData();
broker1.setCpu(new ResourceUsage(1000, 100));
broker1.setMemory(new ResourceUsage(5000, 100));
broker1.setDirectMemory(new ResourceUsage(5000, 100));
broker1.setBandwidthIn(new ResourceUsage(999, 1000));
broker1.setBandwidthOut(new ResourceUsage(999, 1000));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,16 +258,6 @@ public double getMaxResourceUsageWithWeight(final double cpuWeight, final double
bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
}

public double getMaxResourceUsageWithWeightWithinLimit(final double cpuWeight, final double memoryWeight,
final double directMemoryWeight,
final double bandwidthInWeight,
final double bandwidthOutWeight) {
return maxWithinLimit(100.0d,
cpu.percentUsage() * cpuWeight, memory.percentUsage() * memoryWeight,
directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight,
bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
}

private static double max(double... args) {
double max = Double.NEGATIVE_INFINITY;

Expand All @@ -292,16 +282,6 @@ private static float max(float...args) {
return max;
}

private static double maxWithinLimit(double limit, double...args) {
double max = 0.0;
for (double d : args) {
if (d > max && d <= limit) {
max = d;
}
}
return max;
}

public String getLoadReportType() {
return loadReportType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ public void testMaxResourceUsage() {
assertEquals(
data.getMaxResourceUsageWithWeight(
weight, weight, weight, weight, weight), 2.0, epsilon);

assertEquals(
data.getMaxResourceUsageWithWeightWithinLimit(
weight, weight, weight, weight, weight), 0.02, epsilon);

}

/*
Expand Down

0 comments on commit 363d1a9

Please sign in to comment.