Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][broker] Support lower boundary shedding for ThresholdShedder #17456

Merged
merged 18 commits into from
Sep 23, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2069,6 +2069,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private String loadBalancerLoadSheddingStrategy = "org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder";

@FieldContext(
category = CATEGORY_LOAD_BALANCER,
doc = "When [current usage < average usage - threshold], "
+ "the broker with the highest load will be triggered to unload"
)
private boolean enableLowerBoundaryShedding = false;
315157973 marked this conversation as resolved.
Show resolved Hide resolved
315157973 marked this conversation as resolved.
Show resolved Hide resolved

@FieldContext(
category = CATEGORY_LOAD_BALANCER,
doc = "load balance placement strategy"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public class ThresholdShedder implements LoadSheddingStrategy {
private static final Logger log = LoggerFactory.getLogger(ThresholdShedder.class);
private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create();
private static final double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05;

private static final double LOWER_BOUNDARY_THRESHOLD_MARGIN = 0.5;

private static final double MB = 1024 * 1024;

private static final long LOAD_LOG_SAMPLE_DELAY_IN_SEC = 5 * 60; // 5 mins
Expand Down Expand Up @@ -80,8 +83,7 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
final Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
final double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;

final double avgUsage = getBrokerAvgUsage(
loadData, conf.getLoadBalancerHistoryResourcePercentage(), conf, sampleLog);
final double avgUsage = getBrokerAvgUsage(loadData, conf, sampleLog);
if (sampleLog) {
log.info("brokers' resource avgUsage:{}%", toPercentage(avgUsage));
}
Expand Down Expand Up @@ -122,17 +124,34 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
broker, 100 * currentUsage, 100 * avgUsage, 100 * threshold, minimumThroughputToOffload / MB,
(brokerCurrentThroughput - minimumThroughputToOffload) / MB);

MutableDouble trafficMarkedToOffload = new MutableDouble(0);
MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false);

if (localData.getBundles().size() > 1) {
loadData.getBundleDataForLoadShedding().entrySet().stream()
.map((e) -> {
String bundle = e.getKey();
BundleData bundleData = e.getValue();
TimeAverageMessageData shortTermData = bundleData.getShortTermData();
double throughput = shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut();
return Pair.of(bundle, throughput);
filterAndSelectBundle(loadData, recentlyUnloadedBundles, broker, localData, minimumThroughputToOffload);
} else if (localData.getBundles().size() == 1) {
log.warn(
"HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. "
+ "No Load Shedding will be done on this broker",
localData.getBundles().iterator().next(), broker);
} else {
log.warn("Broker {} is overloaded despite having no bundles", broker);
}
});
if (selectedBundlesCache.isEmpty() && conf.isEnableLowerBoundaryShedding()) {
tryLowerBoundaryShedding(loadData, conf);
}
return selectedBundlesCache;
}

private void filterAndSelectBundle(LoadData loadData, Map<String, Long> recentlyUnloadedBundles, String broker,
LocalBrokerData localData, double minimumThroughputToOffload) {
MutableDouble trafficMarkedToOffload = new MutableDouble(0);
MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false);
loadData.getBundleDataForLoadShedding().entrySet().stream()
.map((e) -> {
String bundle = e.getKey();
BundleData bundleData = e.getValue();
TimeAverageMessageData shortTermData = bundleData.getShortTermData();
double throughput = shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut();
return Pair.of(bundle, throughput);
}).filter(e ->
!recentlyUnloadedBundles.containsKey(e.getLeft())
).filter(e ->
Expand All @@ -147,21 +166,11 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
atLeastOneBundleSelected.setTrue();
}
});
} else if (localData.getBundles().size() == 1) {
log.warn(
"HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. "
+ "No Load Shedding will be done on this broker",
localData.getBundles().iterator().next(), broker);
} else {
log.warn("Broker {} is overloaded despite having no bundles", broker);
}
});

return selectedBundlesCache;
}

private double getBrokerAvgUsage(final LoadData loadData, final double historyPercentage,
private double getBrokerAvgUsage(final LoadData loadData,
final ServiceConfiguration conf, boolean sampleLog) {
double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
double totalUsage = 0.0;
int totalBrokers = 0;

Expand Down Expand Up @@ -227,4 +236,59 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
return historyUsage;
}

private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
// Select the broker with the most resource usage.
final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
315157973 marked this conversation as resolved.
Show resolved Hide resolved
final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
boolean hasBrokerBelowLowerBound = result.getLeft();
String maxUsageBroker = result.getRight();
BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
if (brokerData == null || brokerData.getLocalData() == null
|| brokerData.getLocalData().getBundles().size() <= 1) {
log.info("Load data is null or bundle <=1, broker name is {}, skipping bundle unload.", maxUsageBroker);
315157973 marked this conversation as resolved.
Show resolved Hide resolved
return;
}
315157973 marked this conversation as resolved.
Show resolved Hide resolved
if (!hasBrokerBelowLowerBound) {
log.info("No broker is below the lower bound, threshold is {}, "
+ "avgUsage usage is {}, max usage of Broker {} is {}",
threshold, avgUsage, maxUsageBroker,
brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
return;
}
LocalBrokerData localData = brokerData.getLocalData();
315157973 marked this conversation as resolved.
Show resolved Hide resolved
double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;
315157973 marked this conversation as resolved.
Show resolved Hide resolved
double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
if (minThroughputThreshold > minimumThroughputToOffload) {
log.info("broker {} in RangeThresholdShedder is planning to shed throughput {} MByte/s less than "
315157973 marked this conversation as resolved.
Show resolved Hide resolved
+ "minimumThroughputThreshold {} MByte/s, skipping bundle unload.",
maxUsageBroker, minimumThroughputToOffload / MB, minThroughputThreshold / MB);
return;
}
filterAndSelectBundle(loadData, loadData.getRecentlyUnloadedBundles(), maxUsageBroker, localData,
minimumThroughputToOffload);
}

private Pair<Boolean, String> getMaxUsageBroker(
LoadData loadData, double threshold, double avgUsage) {
String maxUsageBrokerName = "";
double maxUsage = -1;
boolean hasBrokerBelowLowerBound = false;
for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
String broker = entry.getKey();
double currentUsage = brokerAvgResourceUsage.getOrDefault(broker, 0.0);
// Select the broker with the most resource usage.
if (currentUsage > maxUsage) {
315157973 marked this conversation as resolved.
Show resolved Hide resolved
maxUsage = currentUsage;
maxUsageBrokerName = broker;
315157973 marked this conversation as resolved.
Show resolved Hide resolved
}
// Whether any brokers with low usage in the cluster.
if (currentUsage < avgUsage - threshold) {
hasBrokerBelowLowerBound = true;
315157973 marked this conversation as resolved.
Show resolved Hide resolved
}
}
return Pair.of(hasBrokerBelowLowerBound, maxUsageBrokerName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public ThresholdShedderTest() {

@BeforeMethod
public void setup() {
conf.setEnableLowerBoundaryShedding(false);
thresholdShedder = new ThresholdShedder();
}

Expand Down Expand Up @@ -224,4 +225,75 @@ public void testPrintResourceUsage() {
assertEquals(data.printResourceUsage(),
"cpu: 10.00%, memory: 50.00%, directMemory: 90.00%, bandwidthIn: 30.00%, bandwidthOut: 20.00%");
}

@Test
public void testRangeThroughput() {
315157973 marked this conversation as resolved.
Show resolved Hide resolved
int numBundles = 10;
int brokerNum = 11;
int lowLoadNode = 10;
LoadData loadData = new LoadData();
double throughput = 100 * 1024 * 1024;
//There are 11 Brokers, of which 10 are loaded at 80% and 1 is loaded at 0%.
//At this time, the average load is 80*10/11 = 72.73, and the threshold for rebalancing is 72.73 + 10 = 82.73.
//Since 80 < 82.73, rebalancing will not be trigger, and there is one Broker with load of 0.
for (int i = 0; i < brokerNum; i++) {
LocalBrokerData broker = new LocalBrokerData();
for (int j = 0; j < numBundles; j++) {
broker.getBundles().add("bundle-" + j);
BundleData bundle = new BundleData();
TimeAverageMessageData timeAverageMessageData = new TimeAverageMessageData();
timeAverageMessageData.setMsgThroughputIn(i == lowLoadNode ? 0 : throughput);
timeAverageMessageData.setMsgThroughputOut(i == lowLoadNode ? 0 : throughput);
bundle.setShortTermData(timeAverageMessageData);
String broker2BundleName = "broker-" + i + "-bundle-" + (numBundles + i);
loadData.getBundleData().put(broker2BundleName, bundle);
broker.getBundles().add(broker2BundleName);
}
broker.setBandwidthIn(new ResourceUsage(i == lowLoadNode ? 0 : 80, 100));
broker.setBandwidthOut(new ResourceUsage(i == lowLoadNode ? 0 : 80, 100));
broker.setMsgThroughputIn(i == lowLoadNode ? 0 : throughput);
broker.setMsgThroughputOut(i == lowLoadNode ? 0 : throughput);
loadData.getBrokerData().put("broker-" + i, new BrokerData(broker));
}
ThresholdShedder shedder = new ThresholdShedder();
Multimap<String, String> bundlesToUnload = shedder.findBundlesForUnloading(loadData, conf);
assertTrue(bundlesToUnload.isEmpty());
conf.setEnableLowerBoundaryShedding(true);
bundlesToUnload = thresholdShedder.findBundlesForUnloading(loadData, conf);
assertFalse(bundlesToUnload.isEmpty());
}

@Test
public void testNoBrokerToOffload() {
315157973 marked this conversation as resolved.
Show resolved Hide resolved
int numBundles = 10;
int brokerNum = 11;
LoadData loadData = new LoadData();
double throughput = 80 * 1024 * 1024;
//Load of all Brokers are 80%, and no Broker needs to offload.
for (int i = 0; i < brokerNum; i++) {
LocalBrokerData broker = new LocalBrokerData();
for (int j = 0; j < numBundles; j++) {
broker.getBundles().add("bundle-" + j);
BundleData bundle = new BundleData();
TimeAverageMessageData timeAverageMessageData = new TimeAverageMessageData();
timeAverageMessageData.setMsgThroughputIn(throughput);
timeAverageMessageData.setMsgThroughputOut(throughput);
bundle.setShortTermData(timeAverageMessageData);
String broker2BundleName = "broker-" + i + "-bundle-" + (numBundles + i);
loadData.getBundleData().put(broker2BundleName, bundle);
broker.getBundles().add(broker2BundleName);
}
broker.setBandwidthIn(new ResourceUsage(80, 100));
broker.setBandwidthOut(new ResourceUsage(80, 100));
broker.setMsgThroughputIn(throughput);
broker.setMsgThroughputOut(throughput);
loadData.getBrokerData().put("broker-" + i, new BrokerData(broker));
}
ThresholdShedder shedder = new ThresholdShedder();
Multimap<String, String> bundlesToUnload = shedder.findBundlesForUnloading(loadData, conf);
assertTrue(bundlesToUnload.isEmpty());
conf.setEnableLowerBoundaryShedding(true);
bundlesToUnload = thresholdShedder.findBundlesForUnloading(loadData, conf);
assertTrue(bundlesToUnload.isEmpty());
}
}