Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][broker] Support lower boundary shedding for ThresholdShedder #17456

Merged
merged 18 commits into from
Sep 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2069,6 +2069,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private String loadBalancerLoadSheddingStrategy = "org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder";

@FieldContext(
category = CATEGORY_LOAD_BALANCER,
doc = "When [current usage < average usage - threshold], "
+ "the broker with the highest load will be triggered to unload"
)
private boolean lowerBoundarySheddingEnabled = false;

@FieldContext(
category = CATEGORY_LOAD_BALANCER,
doc = "load balance placement strategy"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public class ThresholdShedder implements LoadSheddingStrategy {
private static final Logger log = LoggerFactory.getLogger(ThresholdShedder.class);
private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create();
private static final double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05;

private static final double LOWER_BOUNDARY_THRESHOLD_MARGIN = 0.5;

private static final double MB = 1024 * 1024;

private static final long LOAD_LOG_SAMPLE_DELAY_IN_SEC = 5 * 60; // 5 mins
Expand Down Expand Up @@ -80,8 +83,7 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
final Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
final double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;

final double avgUsage = getBrokerAvgUsage(
loadData, conf.getLoadBalancerHistoryResourcePercentage(), conf, sampleLog);
final double avgUsage = getBrokerAvgUsage(loadData, conf, sampleLog);
if (sampleLog) {
log.info("brokers' resource avgUsage:{}%", toPercentage(avgUsage));
}
Expand Down Expand Up @@ -122,17 +124,34 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
broker, 100 * currentUsage, 100 * avgUsage, 100 * threshold, minimumThroughputToOffload / MB,
(brokerCurrentThroughput - minimumThroughputToOffload) / MB);

MutableDouble trafficMarkedToOffload = new MutableDouble(0);
MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false);

if (localData.getBundles().size() > 1) {
loadData.getBundleDataForLoadShedding().entrySet().stream()
.map((e) -> {
String bundle = e.getKey();
BundleData bundleData = e.getValue();
TimeAverageMessageData shortTermData = bundleData.getShortTermData();
double throughput = shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut();
return Pair.of(bundle, throughput);
filterAndSelectBundle(loadData, recentlyUnloadedBundles, broker, localData, minimumThroughputToOffload);
} else if (localData.getBundles().size() == 1) {
log.warn(
"HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. "
+ "No Load Shedding will be done on this broker",
localData.getBundles().iterator().next(), broker);
} else {
log.warn("Broker {} is overloaded despite having no bundles", broker);
}
});
if (selectedBundlesCache.isEmpty() && conf.isLowerBoundarySheddingEnabled()) {
tryLowerBoundaryShedding(loadData, conf);
}
return selectedBundlesCache;
}

private void filterAndSelectBundle(LoadData loadData, Map<String, Long> recentlyUnloadedBundles, String broker,
LocalBrokerData localData, double minimumThroughputToOffload) {
MutableDouble trafficMarkedToOffload = new MutableDouble(0);
MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false);
loadData.getBundleDataForLoadShedding().entrySet().stream()
.map((e) -> {
String bundle = e.getKey();
BundleData bundleData = e.getValue();
TimeAverageMessageData shortTermData = bundleData.getShortTermData();
double throughput = shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut();
return Pair.of(bundle, throughput);
}).filter(e ->
!recentlyUnloadedBundles.containsKey(e.getLeft())
).filter(e ->
Expand All @@ -147,21 +166,11 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
atLeastOneBundleSelected.setTrue();
}
});
} else if (localData.getBundles().size() == 1) {
log.warn(
"HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. "
+ "No Load Shedding will be done on this broker",
localData.getBundles().iterator().next(), broker);
} else {
log.warn("Broker {} is overloaded despite having no bundles", broker);
}
});

return selectedBundlesCache;
}

private double getBrokerAvgUsage(final LoadData loadData, final double historyPercentage,
private double getBrokerAvgUsage(final LoadData loadData,
final ServiceConfiguration conf, boolean sampleLog) {
double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
double totalUsage = 0.0;
int totalBrokers = 0;

Expand Down Expand Up @@ -227,4 +236,60 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
return historyUsage;
}

private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
// Select the broker with the most resource usage.
final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
315157973 marked this conversation as resolved.
Show resolved Hide resolved
final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
boolean hasBrokerBelowLowerBound = result.getLeft();
String maxUsageBroker = result.getRight();
BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
if (brokerData == null) {
log.info("Load data is null or bundle <=1, skipping bundle unload.");
return;
}
if (!hasBrokerBelowLowerBound) {
log.info("No broker is below the lower bound, threshold is {}, "
+ "avgUsage usage is {}, max usage of Broker {} is {}",
threshold, avgUsage, maxUsageBroker,
brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
return;
}
LocalBrokerData localData = brokerData.getLocalData();
315157973 marked this conversation as resolved.
Show resolved Hide resolved
double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;
315157973 marked this conversation as resolved.
Show resolved Hide resolved
double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
if (minThroughputThreshold > minimumThroughputToOffload) {
log.info("broker {} in lower boundary shedding is planning to shed throughput {} MByte/s less than "
+ "minimumThroughputThreshold {} MByte/s, skipping bundle unload.",
maxUsageBroker, minimumThroughputToOffload / MB, minThroughputThreshold / MB);
return;
}
filterAndSelectBundle(loadData, loadData.getRecentlyUnloadedBundles(), maxUsageBroker, localData,
minimumThroughputToOffload);
}

private Pair<Boolean, String> getMaxUsageBroker(
LoadData loadData, double threshold, double avgUsage) {
String maxUsageBrokerName = "";
double maxUsage = avgUsage - threshold;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The max usage broker load: > (avgUsage - threshold)
The below lower broker load: < (avgUsage - threshold)

After the max usage broker unloads to the lower broker, the max usage broker might become the lower broker, the lower broker becomes the max usage broker. Can this will lead to frequent bundle unloading?

It looks like we need to change

double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;

to

double minimumThroughputToOffload = Math.min(brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN, brokerCurrentThroughput - avgUsage - threshold);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we should also add a test for this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not solve the problem.
Unless we know which Bundle is to be unloaded and the load of the Bundle, we can determine whether to select the current Broker. Currently, this Class is only responsible for selecting brokers and cannot know the information about bundles to be unloaded.

The best way to solve this problem is to split bundle.

boolean hasBrokerBelowLowerBound = false;
for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
String broker = entry.getKey();
BrokerData brokerData = entry.getValue();
double currentUsage = brokerAvgResourceUsage.getOrDefault(broker, 0.0);
// Select the broker with the most resource usage.
if (currentUsage > maxUsage && brokerData.getLocalData() != null
&& brokerData.getLocalData().getBundles().size() > 1) {
maxUsage = currentUsage;
maxUsageBrokerName = broker;
315157973 marked this conversation as resolved.
Show resolved Hide resolved
}
// Whether any brokers with low usage in the cluster.
if (currentUsage < avgUsage - threshold) {
hasBrokerBelowLowerBound = true;
315157973 marked this conversation as resolved.
Show resolved Hide resolved
}
}
return Pair.of(hasBrokerBelowLowerBound, maxUsageBrokerName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public ThresholdShedderTest() {

@BeforeMethod
public void setup() {
conf.setLowerBoundarySheddingEnabled(false);
thresholdShedder = new ThresholdShedder();
}

Expand Down Expand Up @@ -224,4 +225,114 @@ public void testPrintResourceUsage() {
assertEquals(data.printResourceUsage(),
"cpu: 10.00%, memory: 50.00%, directMemory: 90.00%, bandwidthIn: 30.00%, bandwidthOut: 20.00%");
}

@Test
public void testLowerBoundaryShedding() {
int numBundles = 10;
int brokerNum = 11;
int lowLoadNode = 10;
LoadData loadData = new LoadData();
double throughput = 100 * 1024 * 1024;
//There are 11 Brokers, of which 10 are loaded at 80% and 1 is loaded at 0%.
//At this time, the average load is 80*10/11 = 72.73, and the threshold for rebalancing is 72.73 + 10 = 82.73.
//Since 80 < 82.73, rebalancing will not be trigger, and there is one Broker with load of 0.
for (int i = 0; i < brokerNum; i++) {
LocalBrokerData broker = new LocalBrokerData();
for (int j = 0; j < numBundles; j++) {
broker.getBundles().add("bundle-" + j);
BundleData bundle = new BundleData();
TimeAverageMessageData timeAverageMessageData = new TimeAverageMessageData();
timeAverageMessageData.setMsgThroughputIn(i == lowLoadNode ? 0 : throughput);
timeAverageMessageData.setMsgThroughputOut(i == lowLoadNode ? 0 : throughput);
bundle.setShortTermData(timeAverageMessageData);
String broker2BundleName = "broker-" + i + "-bundle-" + j;
loadData.getBundleData().put(broker2BundleName, bundle);
broker.getBundles().add(broker2BundleName);
}
broker.setBandwidthIn(new ResourceUsage(i == lowLoadNode ? 0 : 80, 100));
broker.setBandwidthOut(new ResourceUsage(i == lowLoadNode ? 0 : 80, 100));
broker.setMsgThroughputIn(i == lowLoadNode ? 0 : throughput);
broker.setMsgThroughputOut(i == lowLoadNode ? 0 : throughput);
loadData.getBrokerData().put("broker-" + i, new BrokerData(broker));
}
ThresholdShedder shedder = new ThresholdShedder();
Multimap<String, String> bundlesToUnload = shedder.findBundlesForUnloading(loadData, conf);
assertTrue(bundlesToUnload.isEmpty());
conf.setLowerBoundarySheddingEnabled(true);
bundlesToUnload = thresholdShedder.findBundlesForUnloading(loadData, conf);
assertFalse(bundlesToUnload.isEmpty());
}

@Test
public void testLowerBoundarySheddingNoBrokerToOffload() {
int numBundles = 10;
int brokerNum = 11;
LoadData loadData = new LoadData();
double throughput = 80 * 1024 * 1024;
//Load of all Brokers are 80%, and no Broker needs to offload.
for (int i = 0; i < brokerNum; i++) {
LocalBrokerData broker = new LocalBrokerData();
for (int j = 0; j < numBundles; j++) {
broker.getBundles().add("bundle-" + j);
BundleData bundle = new BundleData();
TimeAverageMessageData timeAverageMessageData = new TimeAverageMessageData();
timeAverageMessageData.setMsgThroughputIn(throughput);
timeAverageMessageData.setMsgThroughputOut(throughput);
bundle.setShortTermData(timeAverageMessageData);
String broker2BundleName = "broker-" + i + "-bundle-" + j;
loadData.getBundleData().put(broker2BundleName, bundle);
broker.getBundles().add(broker2BundleName);
}
broker.setBandwidthIn(new ResourceUsage(80, 100));
broker.setBandwidthOut(new ResourceUsage(80, 100));
broker.setMsgThroughputIn(throughput);
broker.setMsgThroughputOut(throughput);
loadData.getBrokerData().put("broker-" + i, new BrokerData(broker));
}
ThresholdShedder shedder = new ThresholdShedder();
Multimap<String, String> bundlesToUnload = shedder.findBundlesForUnloading(loadData, conf);
assertTrue(bundlesToUnload.isEmpty());
conf.setLowerBoundarySheddingEnabled(true);
bundlesToUnload = thresholdShedder.findBundlesForUnloading(loadData, conf);
assertTrue(bundlesToUnload.isEmpty());
}

@Test
public void testLowerBoundarySheddingBrokerWithOneBundle() {
int brokerNum = 11;
int lowLoadNode = 5;
int brokerWithManyBundles = 3;
LoadData loadData = new LoadData();
double throughput = 100 * 1024 * 1024;
//There are 11 Brokers, of which 10 are loaded at 80% and 1 is loaded at 0%.
//Only broker3 has 10 bundles.
for (int i = 0; i < brokerNum; i++) {
LocalBrokerData broker = new LocalBrokerData();
//Broker3 has 10 bundles
int numBundles = i == brokerWithManyBundles ? 10 : 1;
for (int j = 0; j < numBundles; j++) {
BundleData bundle = new BundleData();
TimeAverageMessageData timeAverageMessageData = new TimeAverageMessageData();
timeAverageMessageData.setMsgThroughputIn(i == lowLoadNode ? 0 : throughput);
timeAverageMessageData.setMsgThroughputOut(i == lowLoadNode ? 0 : throughput);
bundle.setShortTermData(timeAverageMessageData);
String broker2BundleName = "broker-" + i + "-bundle-" + j;
loadData.getBundleData().put(broker2BundleName, bundle);
broker.getBundles().add(broker2BundleName);
}
broker.setBandwidthIn(new ResourceUsage(i == lowLoadNode ? 0 : 80, 100));
broker.setBandwidthOut(new ResourceUsage(i == lowLoadNode ? 0 : 80, 100));
broker.setMsgThroughputIn(i == lowLoadNode ? 0 : throughput);
broker.setMsgThroughputOut(i == lowLoadNode ? 0 : throughput);
loadData.getBrokerData().put("broker-" + i, new BrokerData(broker));
}
ThresholdShedder shedder = new ThresholdShedder();
Multimap<String, String> bundlesToUnload = shedder.findBundlesForUnloading(loadData, conf);
assertTrue(bundlesToUnload.isEmpty());
conf.setLowerBoundarySheddingEnabled(true);
bundlesToUnload = thresholdShedder.findBundlesForUnloading(loadData, conf);
assertFalse(bundlesToUnload.isEmpty());
assertEquals(bundlesToUnload.size(), 1);
assertTrue(bundlesToUnload.containsKey("broker-3"));
}
}