Skip to content

Commit

Permalink
resolved comments
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Jan 9, 2023
1 parent 3fb8583 commit 9528960
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,62 +33,52 @@
@Data
public class BrokerLoadData {

private static final double MAX_RESOURCE_USAGE = 1.0d;

// Most recently available system resource usage.
private ResourceUsage cpu;
private ResourceUsage memory;
private ResourceUsage directMemory;

private ResourceUsage bandwidthIn;
private ResourceUsage bandwidthOut;

// Message data from the most recent namespace bundle stats.
private double msgThroughputIn;
private ResourceUsage msgThroughputInUsage;
private double msgThroughputOut;
private ResourceUsage msgThroughputOutUsage;
private double msgRateIn;
private double msgRateOut;

// load data features
private Double weightedMaxEMA; // exponential moving average of max of weighted resource usages
private double weightedMaxEMA; // exponential moving average of max of weighted resource usages
private long updatedAt;

public BrokerLoadData() {
cpu = new ResourceUsage();
memory = new ResourceUsage();
directMemory = new ResourceUsage();
bandwidthIn = new ResourceUsage();
bandwidthOut = new ResourceUsage();
msgThroughputInUsage = new ResourceUsage();
msgThroughputOutUsage = new ResourceUsage();
weightedMaxEMA = null;
weightedMaxEMA = MAX_RESOURCE_USAGE;
}

/**
* Using the system resource usage and bundle stats acquired from the Pulsar client, update this LocalBrokerData.
* Using the system resource usage from the Pulsar client, update this BrokerLoadData.
*
* @param systemResourceUsage
* @param usage
* System resource usage (cpu, memory, and direct memory).
* @param conf
* Service configuration to compute load data features.
*/
public void update(final SystemResourceUsage systemResourceUsage, ServiceConfiguration conf) {
updateSystemResourceUsage(systemResourceUsage);
public void update(final SystemResourceUsage usage, ServiceConfiguration conf) {
updateSystemResourceUsage(usage.cpu, usage.memory, usage.directMemory, usage.bandwidthIn, usage.bandwidthOut);
updateFeatures(conf);
updatedAt = System.currentTimeMillis();
}

/**
* Using another LocalBrokerData, update this.
* Using another BrokerLoadData, update this.
*
* @param other
* LocalBrokerData to update from.
* BrokerLoadData to update from.
*/
public void update(final BrokerLoadData other, ServiceConfiguration conf) {
public void update(final BrokerLoadData other) {
updateSystemResourceUsage(other.cpu, other.memory, other.directMemory, other.bandwidthIn, other.bandwidthOut);
updateFeatures(conf);
}

// Set the cpu, memory, and direct memory to that of the new system resource usage data.
private void updateSystemResourceUsage(final SystemResourceUsage systemResourceUsage) {
updateSystemResourceUsage(systemResourceUsage.cpu, systemResourceUsage.memory, systemResourceUsage.directMemory,
systemResourceUsage.bandwidthIn, systemResourceUsage.bandwidthOut);
weightedMaxEMA = other.weightedMaxEMA;
updatedAt = other.updatedAt;
}

// Update resource usage given each individual usage.
Expand Down Expand Up @@ -119,14 +109,14 @@ private void updateFeatures(ServiceConfiguration conf) {
updateWeightedMaxEMA(conf);
}

public void updateWeightedMaxEMA(ServiceConfiguration conf) {
private void updateWeightedMaxEMA(ServiceConfiguration conf) {
var historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
var weightedMax = getMaxResourceUsageWithWeight(
conf.getLoadBalancerCPUResourceWeight(),
conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
conf.getLoadBalancerBandwithInResourceWeight(),
conf.getLoadBalancerBandwithOutResourceWeight());
weightedMaxEMA = weightedMaxEMA == null ? weightedMax :
weightedMaxEMA = updatedAt == 0 ? weightedMax :
weightedMaxEMA * historyPercentage + (1 - historyPercentage) * weightedMax;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
*/
@Slf4j
public class LeastResourceUsageWithWeight implements BrokerSelectionStrategy {
private static final double MAX_RESOURCE_USAGE = 1.0d;
// Maintain this list to reduce object creation.
private final ArrayList<String> bestBrokers;
private final Set<String> noLoadDataBrokers;
Expand All @@ -52,6 +51,7 @@ private double getMaxResourceUsageWithWeight(final String broker, final BrokerLo
final ServiceConfiguration conf, boolean debugMode) {
final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
final var maxUsageWithWeight = brokerLoadData.getWeightedMaxEMA();

if (maxUsageWithWeight > overloadThreshold) {
log.warn(
"Broker {} is overloaded, max resource usage with weight percentage: {}%, "
Expand Down Expand Up @@ -128,8 +128,7 @@ public Optional<String> select(List<String> candidates, ServiceUnitId bundleToAs
log.warn("There is no broker load data for broker:{}. Skipping this broker. Phase two", broker);
continue;
}
Double avgResUsageObj = brokerLoadDataOptional.get().getWeightedMaxEMA();
double avgResUsage = avgResUsageObj == null ? MAX_RESOURCE_USAGE : avgResUsageObj.doubleValue();
double avgResUsage = brokerLoadDataOptional.get().getWeightedMaxEMA();
if ((avgResUsage + diffThreshold <= avgUsage && !noLoadDataBrokers.contains(broker))) {
bestBrokers.add(broker);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.data;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.testng.Assert.assertEquals;

import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.testng.annotations.Test;

/**
Expand All @@ -48,15 +51,14 @@ public void testMaxResourceUsage() {
assertEquals(
data.getMaxResourceUsageWithWeight(
weight, weight, weight, weight, weight), 2.0, epsilon);
assertEquals(
data.getWeightedMaxEMA(), null);
}

@Test
public void testWeightedMaxEMA() {
BrokerLoadData data = new BrokerLoadData();
assertEquals(
data.getWeightedMaxEMA(), null);
long now = System.currentTimeMillis();
assertEquals(data.getUpdatedAt(), 0);
assertEquals(data.getWeightedMaxEMA(), 1.0);
ServiceConfiguration conf = new ServiceConfiguration();
conf.setLoadBalancerCPUResourceWeight(0.5);
conf.setLoadBalancerMemoryResourceWeight(0.5);
Expand All @@ -65,25 +67,27 @@ public void testWeightedMaxEMA() {
conf.setLoadBalancerBandwithOutResourceWeight(0.5);
conf.setLoadBalancerHistoryResourcePercentage(0.75);

BrokerLoadData data2 = new BrokerLoadData();
data2.setCpu(new ResourceUsage(1.0, 100.0));
data2.setMemory(new ResourceUsage(800.0, 200.0));
data2.setDirectMemory(new ResourceUsage(2.0, 100.0));
data2.setBandwidthIn(new ResourceUsage(3.0, 100.0));
data2.setBandwidthOut(new ResourceUsage(4.0, 100.0));
data.update(data2, conf);
assertEquals(
data.getWeightedMaxEMA(), 2);
SystemResourceUsage usage1 = new SystemResourceUsage();
usage1.setCpu(new ResourceUsage(1.0, 100.0));
usage1.setMemory(new ResourceUsage(800.0, 200.0));
usage1.setDirectMemory(new ResourceUsage(2.0, 100.0));
usage1.setBandwidthIn(new ResourceUsage(3.0, 100.0));
usage1.setBandwidthOut(new ResourceUsage(4.0, 100.0));
data.update(usage1, conf);
assertEquals(data.getWeightedMaxEMA(), 2);

BrokerLoadData data3 = new BrokerLoadData();
data3.setCpu(new ResourceUsage(300.0, 100.0));
data3.setMemory(new ResourceUsage(200.0, 200.0));
data3.setDirectMemory(new ResourceUsage(2.0, 100.0));
data3.setBandwidthIn(new ResourceUsage(3.0, 100.0));
data3.setBandwidthOut(new ResourceUsage(4.0, 100.0));
data.update(data3, conf);
assertEquals(
data.getWeightedMaxEMA(), 1.875);
assertThat(data.getUpdatedAt(), greaterThanOrEqualTo(now));

now = System.currentTimeMillis();
SystemResourceUsage usage2 = new SystemResourceUsage();
usage2.setCpu(new ResourceUsage(300.0, 100.0));
usage2.setMemory(new ResourceUsage(200.0, 200.0));
usage2.setDirectMemory(new ResourceUsage(2.0, 100.0));
usage2.setBandwidthIn(new ResourceUsage(3.0, 100.0));
usage2.setBandwidthOut(new ResourceUsage(4.0, 100.0));
data.update(usage2, conf);
assertEquals(data.getWeightedMaxEMA(), 1.875);
assertThat(data.getUpdatedAt(), greaterThanOrEqualTo(now));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.testng.annotations.Test;

@Test(groups = "broker")
Expand Down Expand Up @@ -102,21 +103,21 @@ public void testSelect() {

assertEquals(strategy.select(candidates, bundleData, ctx), Optional.of("broker1"));

brokerLoadDataStore.get("broker1").get().update(createBrokerData(30, 100), conf);
brokerLoadDataStore.get("broker2").get().update(createBrokerData(30, 100), conf);
brokerLoadDataStore.get("broker3").get().update(createBrokerData(40, 100), conf);
brokerLoadDataStore.get("broker1").get().update(createUsage(30, 100), conf);
brokerLoadDataStore.get("broker2").get().update(createUsage(30, 100), conf);
brokerLoadDataStore.get("broker3").get().update(createUsage(40, 100), conf);

assertEquals(strategy.select(candidates, bundleData, ctx), Optional.of("broker1"));

brokerLoadDataStore.get("broker1").get().update(createBrokerData(30, 100), conf);
brokerLoadDataStore.get("broker2").get().update(createBrokerData(30, 100), conf);
brokerLoadDataStore.get("broker3").get().update(createBrokerData(40, 100), conf);
brokerLoadDataStore.get("broker1").get().update(createUsage(30, 100), conf);
brokerLoadDataStore.get("broker2").get().update(createUsage(30, 100), conf);
brokerLoadDataStore.get("broker3").get().update(createUsage(40, 100), conf);

assertEquals(strategy.select(candidates, bundleData, ctx), Optional.of("broker1"));

brokerLoadDataStore.get("broker1").get().update(createBrokerData(35, 100), conf);
brokerLoadDataStore.get("broker2").get().update(createBrokerData(20, 100), conf);
brokerLoadDataStore.get("broker3").get().update(createBrokerData(45, 100), conf);
brokerLoadDataStore.get("broker1").get().update(createUsage(35, 100), conf);
brokerLoadDataStore.get("broker2").get().update(createUsage(20, 100), conf);
brokerLoadDataStore.get("broker3").get().update(createUsage(45, 100), conf);

assertEquals(strategy.select(candidates, bundleData, ctx), Optional.of("broker2"));
}
Expand Down Expand Up @@ -171,15 +172,21 @@ public void testNoLoadDataBrokers() {

private BrokerLoadData createBrokerData(double usage, double limit) {
var brokerLoadData = new BrokerLoadData();
brokerLoadData.setCpu(new ResourceUsage(usage, limit));
brokerLoadData.setMemory(new ResourceUsage(usage, limit));
brokerLoadData.setDirectMemory(new ResourceUsage(usage, limit));
brokerLoadData.setBandwidthIn(new ResourceUsage(usage, limit));
brokerLoadData.setBandwidthOut(new ResourceUsage(usage, limit));
brokerLoadData.updateWeightedMaxEMA(conf);
SystemResourceUsage usages = createUsage(usage, limit);
brokerLoadData.update(usages, conf);
return brokerLoadData;
}

private SystemResourceUsage createUsage(double usage, double limit) {
SystemResourceUsage usages = new SystemResourceUsage();
usages.setCpu(new ResourceUsage(usage, limit));
usages.setMemory(new ResourceUsage(usage, limit));
usages.setDirectMemory(new ResourceUsage(usage, limit));
usages.setBandwidthIn(new ResourceUsage(usage, limit));
usages.setBandwidthOut(new ResourceUsage(usage, limit));
return usages;
}

public LoadManagerContext getContext() {
var ctx = mock(LoadManagerContext.class);
var conf = new ServiceConfiguration();
Expand Down

0 comments on commit 9528960

Please sign in to comment.