Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][broker] Support lower boundary shedding for ThresholdShedder #17456

Merged
merged 18 commits into from
Sep 23, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.impl;

import com.google.common.collect.Multimap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;

/**
* On the basis of ThresholdShedder, RangeThresholdShedder adds the lower boundary judgment of the load.
* When 【current usage < average usage - threshold】, the broker with the highest load will be triggered to unload,
* avoiding the following scenarios:
* There are 11 Brokers, of which 10 are loaded at 80% and 1 is loaded at 0%.
* The average load is 80 * 10 / 11 = 72.73, and the threshold to unload is 72.73 + 10 = 82.73.
* Since 80 < 82.73, unload will not be trigger, and there is one idle Broker with load of 0%.
*/
@Slf4j
public class RangeThresholdShedder extends ThresholdShedder {
315157973 marked this conversation as resolved.
Show resolved Hide resolved

@Override
public Multimap<String, String> findBundlesForUnloading(LoadData loadData, ServiceConfiguration conf) {
super.findBundlesForUnloading(loadData, conf);
// Return if the bundle to unload has already been selected.
if (!selectedBundlesCache.isEmpty()) {
return selectedBundlesCache;
}
// Select the broker with the most resource usage.
final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
final double avgUsage = getBrokerAvgUsage(loadData, conf, super.canSampleLog());
Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
boolean hasBrokerBelowLowerBound = result.getLeft();
String maxUsageBroker = result.getRight();
BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
if (brokerData == null || brokerData.getLocalData() == null
|| brokerData.getLocalData().getBundles().size() <= 1) {
log.info("Load data is null or bundle <=1, broker name is {}, skipping bundle unload.", maxUsageBroker);
return selectedBundlesCache;
}
if (!hasBrokerBelowLowerBound) {
log.info("No broker is below the lower bound, threshold is {}, "
+ "avgUsage usage is {}, max usage of Broker {} is {}",
threshold, avgUsage, maxUsageBroker,
brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
return selectedBundlesCache;
}
LocalBrokerData localData = brokerData.getLocalData();
double minimumThroughputToOffload = getMinimumThroughputToOffload(threshold, localData);
final double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
if (minThroughputThreshold > minimumThroughputToOffload) {
log.info("broker {} in RangeThresholdShedder is planning to shed throughput {} MByte/s less than "
+ "minimumThroughputThreshold {} MByte/s, skipping bundle unload.",
maxUsageBroker, minimumThroughputToOffload / MB, minThroughputThreshold / MB);
return selectedBundlesCache;
}
super.filterAndSelectBundle(loadData, loadData.getRecentlyUnloadedBundles(), maxUsageBroker, localData,
minimumThroughputToOffload);
return selectedBundlesCache;
}

private Pair<Boolean, String> getMaxUsageBroker(
LoadData loadData, double threshold, double avgUsage) {
String maxUsageBrokerName = "";
double maxUsage = -1;
boolean hasBrokerBelowLowerBound = false;
for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
String broker = entry.getKey();
double currentUsage = brokerAvgResourceUsage.getOrDefault(broker, 0.0);
// Select the broker with the most resource usage.
if (currentUsage > maxUsage) {
maxUsage = currentUsage;
maxUsageBrokerName = broker;
}
// Whether any brokers with low usage in the cluster.
if (currentUsage < avgUsage - threshold) {
hasBrokerBelowLowerBound = true;
}
}
return Pair.of(hasBrokerBelowLowerBound, maxUsageBrokerName);
}

private double getMinimumThroughputToOffload(double threshold, LocalBrokerData localData) {
double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
return brokerCurrentThroughput * threshold;
315157973 marked this conversation as resolved.
Show resolved Hide resolved
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,20 @@
*/
public class ThresholdShedder implements LoadSheddingStrategy {
private static final Logger log = LoggerFactory.getLogger(ThresholdShedder.class);
private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create();
private static final double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05;
private static final double MB = 1024 * 1024;
protected final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create();
public static final double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05;
public static final double MB = 1024 * 1024;

private static final long LOAD_LOG_SAMPLE_DELAY_IN_SEC = 5 * 60; // 5 mins
private final Map<String, Double> brokerAvgResourceUsage = new HashMap<>();
protected final Map<String, Double> brokerAvgResourceUsage = new HashMap<>();
private long lastSampledLoadLogTS = 0;


private static int toPercentage(double usage) {
return (int) (usage * 100);
}

private boolean canSampleLog() {
protected boolean canSampleLog() {
long now = System.currentTimeMillis() / 1000;
boolean sampleLog = now - lastSampledLoadLogTS >= LOAD_LOG_SAMPLE_DELAY_IN_SEC;
if (sampleLog) {
Expand All @@ -80,8 +80,7 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
final Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
final double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;

final double avgUsage = getBrokerAvgUsage(
loadData, conf.getLoadBalancerHistoryResourcePercentage(), conf, sampleLog);
final double avgUsage = getBrokerAvgUsage(loadData, conf, sampleLog);
if (sampleLog) {
log.info("brokers' resource avgUsage:{}%", toPercentage(avgUsage));
}
Expand Down Expand Up @@ -122,17 +121,32 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
broker, 100 * currentUsage, 100 * avgUsage, 100 * threshold, minimumThroughputToOffload / MB,
(brokerCurrentThroughput - minimumThroughputToOffload) / MB);

MutableDouble trafficMarkedToOffload = new MutableDouble(0);
MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false);

if (localData.getBundles().size() > 1) {
loadData.getBundleDataForLoadShedding().entrySet().stream()
.map((e) -> {
String bundle = e.getKey();
BundleData bundleData = e.getValue();
TimeAverageMessageData shortTermData = bundleData.getShortTermData();
double throughput = shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut();
return Pair.of(bundle, throughput);
filterAndSelectBundle(loadData, recentlyUnloadedBundles, broker, localData, minimumThroughputToOffload);
} else if (localData.getBundles().size() == 1) {
log.warn(
"HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. "
+ "No Load Shedding will be done on this broker",
localData.getBundles().iterator().next(), broker);
} else {
log.warn("Broker {} is overloaded despite having no bundles", broker);
}
});

return selectedBundlesCache;
}

protected void filterAndSelectBundle(LoadData loadData, Map<String, Long> recentlyUnloadedBundles, String broker,
LocalBrokerData localData, double minimumThroughputToOffload) {
MutableDouble trafficMarkedToOffload = new MutableDouble(0);
MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false);
loadData.getBundleDataForLoadShedding().entrySet().stream()
.map((e) -> {
String bundle = e.getKey();
BundleData bundleData = e.getValue();
TimeAverageMessageData shortTermData = bundleData.getShortTermData();
double throughput = shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut();
return Pair.of(bundle, throughput);
}).filter(e ->
!recentlyUnloadedBundles.containsKey(e.getLeft())
).filter(e ->
Expand All @@ -147,21 +161,11 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
atLeastOneBundleSelected.setTrue();
}
});
} else if (localData.getBundles().size() == 1) {
log.warn(
"HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. "
+ "No Load Shedding will be done on this broker",
localData.getBundles().iterator().next(), broker);
} else {
log.warn("Broker {} is overloaded despite having no bundles", broker);
}
});

return selectedBundlesCache;
}

private double getBrokerAvgUsage(final LoadData loadData, final double historyPercentage,
protected double getBrokerAvgUsage(final LoadData loadData,
final ServiceConfiguration conf, boolean sampleLog) {
double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
double totalUsage = 0.0;
int totalBrokers = 0;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance;

import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Multimap;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.impl.RangeThresholdShedder;
import org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder;
import org.apache.pulsar.broker.loadbalance.impl.ThresholdShedderTest;
import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
import org.apache.pulsar.policies.data.loadbalancer.BundleData;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = "broker")
@Slf4j
public class RangeThresholdShedderTest extends ThresholdShedderTest {
private final ServiceConfiguration conf = new ServiceConfiguration();

@BeforeMethod
public void setup() {
super.thresholdShedder = new RangeThresholdShedder();
}

@Test
public void testRangeThroughput() {
int numBundles = 10;
int brokerNum = 11;
int lowLoadNode = 10;
LoadData loadData = new LoadData();
double throughput = 50 * ThresholdShedder.MB;
//There are 11 Brokers, of which 10 are loaded at 80% and 1 is loaded at 0%.
//At this time, the average load is 80*10/11 = 72.73, and the threshold for rebalancing is 72.73 + 10 = 82.73.
//Since 80 < 82.73, rebalancing will not be trigger, and there is one Broker with load of 0.
for (int i = 0; i < brokerNum; i++) {
LocalBrokerData broker = new LocalBrokerData();
for (int j = 0; j < numBundles; j++) {
broker.getBundles().add("bundle-" + j);
BundleData bundle = new BundleData();
TimeAverageMessageData timeAverageMessageData = new TimeAverageMessageData();
timeAverageMessageData.setMsgThroughputIn(i == lowLoadNode ? 0 : throughput);
timeAverageMessageData.setMsgThroughputOut(i == lowLoadNode ? 0 : throughput);
bundle.setShortTermData(timeAverageMessageData);
String broker2BundleName = "broker-" + i + "-bundle-" + (numBundles + i);
loadData.getBundleData().put(broker2BundleName, bundle);
broker.getBundles().add(broker2BundleName);
}
broker.setBandwidthIn(new ResourceUsage(i == lowLoadNode ? 0 : 80, 100));
broker.setBandwidthOut(new ResourceUsage(i == lowLoadNode ? 0 : 80, 100));
broker.setMsgThroughputIn(i == lowLoadNode ? 0 : throughput);
broker.setMsgThroughputOut(i == lowLoadNode ? 0 : throughput);
loadData.getBrokerData().put("broker-" + i, new BrokerData(broker));
}
ThresholdShedder shedder = new ThresholdShedder();
Multimap<String, String> bundlesToUnload = shedder.findBundlesForUnloading(loadData, conf);
assertTrue(bundlesToUnload.isEmpty());
bundlesToUnload = thresholdShedder.findBundlesForUnloading(loadData, conf);
assertFalse(bundlesToUnload.isEmpty());
}

@Test
public void testNoBrokerToOffload() {
int numBundles = 10;
int brokerNum = 11;
LoadData loadData = new LoadData();
double throughput = 80 * ThresholdShedder.MB;
//Load of all Brokers are 80%, and no Broker needs to offload.
for (int i = 0; i < brokerNum; i++) {
LocalBrokerData broker = new LocalBrokerData();
for (int j = 0; j < numBundles; j++) {
broker.getBundles().add("bundle-" + j);
BundleData bundle = new BundleData();
TimeAverageMessageData timeAverageMessageData = new TimeAverageMessageData();
timeAverageMessageData.setMsgThroughputIn(throughput);
timeAverageMessageData.setMsgThroughputOut(throughput);
bundle.setShortTermData(timeAverageMessageData);
String broker2BundleName = "broker-" + i + "-bundle-" + (numBundles + i);
loadData.getBundleData().put(broker2BundleName, bundle);
broker.getBundles().add(broker2BundleName);
}
broker.setBandwidthIn(new ResourceUsage(80, 100));
broker.setBandwidthOut(new ResourceUsage(80, 100));
broker.setMsgThroughputIn(throughput);
broker.setMsgThroughputOut(throughput);
loadData.getBrokerData().put("broker-" + i, new BrokerData(broker));
}
ThresholdShedder shedder = new ThresholdShedder();
Multimap<String, String> bundlesToUnload = shedder.findBundlesForUnloading(loadData, conf);
assertTrue(bundlesToUnload.isEmpty());
bundlesToUnload = thresholdShedder.findBundlesForUnloading(loadData, conf);
assertTrue(bundlesToUnload.isEmpty());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
@Test(groups = "broker")
@Slf4j
public class ThresholdShedderTest {
private ThresholdShedder thresholdShedder;
protected ThresholdShedder thresholdShedder;
private final ServiceConfiguration conf;

public ThresholdShedderTest() {
Expand Down