Skip to content

Commit

Permalink
removed redundant try-catch in reportAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Feb 9, 2023
1 parent cc9203c commit b096091
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2523,7 +2523,7 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se
+ "The bigger value will increase the overhead of reporting many bundles in load data. "
+ "(only used in load balancer extension logics)"
)
private int loadBalancerBundleLoadReportPercentage = 10;
private double loadBalancerBundleLoadReportPercentage = 10;

/**** --- Replication. --- ****/
@FieldContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,24 @@ public void start() throws PulsarServerException {

var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis();
this.pulsar.getLoadManagerExecutor()
.scheduleAtFixedRate(() -> brokerLoadDataReporter.reportAsync(false),
.scheduleAtFixedRate(() -> {
try {
brokerLoadDataReporter.reportAsync(false);
// TODO: update broker load metrics using getLocalData
} catch (Throwable e) {
log.error("Failed to run the broker load manager executor job.", e);
}
},
interval,
interval, TimeUnit.MILLISECONDS);
this.pulsar.getLoadManagerExecutor()
.scheduleAtFixedRate(() -> topBundleLoadDataReporter.reportAsync(false),
.scheduleAtFixedRate(() -> {
try {
topBundleLoadDataReporter.reportAsync(false);
} catch (Throwable e) {
log.error("Failed to run the top bundles load manager executor job.", e);
}
},
interval,
interval, TimeUnit.MILLISECONDS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;

/**
* Defines the information of top bundles load data.
* Defines the information of top k highest-loaded bundles.
*/
@Getter
@ToString
Expand All @@ -45,7 +45,7 @@ public class TopKBundles {
private final TopBundlesLoadData loadData = new TopBundlesLoadData();

/**
* Return TopBundlesLoadData with the given bundleStats.
* Update the topK bundles from the input bundleStats.
*
* @param bundleStats bundle stats.
* @param topk top k bundle stats to select.
Expand All @@ -58,7 +58,7 @@ public void update(Map<String, NamespaceBundleStats> bundleStats, int topk) {
}
arr.add(etr);
}
List<TopBundlesLoadData.BundleLoadData> topKBundlesLoadData = loadData.getTopBundlesLoadData();
var topKBundlesLoadData = loadData.getTopBundlesLoadData();
topKBundlesLoadData.clear();
if (arr.isEmpty()) {
return;
Expand All @@ -71,6 +71,7 @@ public void update(Map<String, NamespaceBundleStats> bundleStats, int topk) {
topKBundlesLoadData.add(
new TopBundlesLoadData.BundleLoadData(etr.getKey(), (NamespaceBundleStats) etr.getValue()));
}
arr.clear();
}

static void partitionSort(List<Map.Entry<String, ? extends Comparable>> arr, int k) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -48,6 +49,7 @@ public class BrokerLoadDataReporter implements LoadDataReporter<BrokerLoadData>

private final String lookupServiceAddress;

@Getter
private final BrokerLoadData localData;

private final BrokerLoadData lastData;
Expand Down Expand Up @@ -89,31 +91,25 @@ public BrokerLoadData generateLoadData() {

@Override
public CompletableFuture<Void> reportAsync(boolean force) {
try {
BrokerLoadData newLoadData = this.generateLoadData();
if (needBrokerDataUpdate() || force) {
log.info("publishing load report:{}", localData.toString(conf));
CompletableFuture<Void> future =
this.brokerLoadDataStore.pushAsync(this.lookupServiceAddress, newLoadData);
future.whenComplete((__, ex) -> {
if (ex == null) {
localData.setReportedAt(System.currentTimeMillis());
lastData.update(localData);
} else {
log.error("Failed to report the broker load data.", ex);
}
return;
});
return future;
} else {
log.info("skipping load report:{}", localData.toString(conf));
}
return CompletableFuture.completedFuture(null);
} catch (Throwable e) {
log.error("Failed to report the broker load data.", e);
return CompletableFuture.failedFuture(e);
BrokerLoadData newLoadData = this.generateLoadData();
if (needBrokerDataUpdate() || force) {
log.info("publishing load report:{}", localData.toString(conf));
CompletableFuture<Void> future =
this.brokerLoadDataStore.pushAsync(this.lookupServiceAddress, newLoadData);
future.whenComplete((__, ex) -> {
if (ex == null) {
localData.setReportedAt(System.currentTimeMillis());
lastData.update(localData);
} else {
log.error("Failed to report the broker load data.", ex);
}
return;
});
return future;
} else {
log.info("skipping load report:{}", localData.toString(conf));
}

return CompletableFuture.completedFuture(null);
}

private boolean needBrokerDataUpdate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import org.apache.pulsar.broker.loadbalance.extensions.models.TopKBundles;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;

/**
* The top k highest-loaded bundles' load data reporter.
*/
@Slf4j
public class TopBundleLoadDataReporter implements LoadDataReporter<TopBundlesLoadData> {

Expand Down Expand Up @@ -69,20 +72,15 @@ public TopBundlesLoadData generateLoadData() {

@Override
public CompletableFuture<Void> reportAsync(boolean force) {
try {
var topBundlesLoadData = generateLoadData();
if (topBundlesLoadData != null || force) {
return this.bundleLoadDataStore.pushAsync(lookupServiceAddress, topKBundles.getLoadData())
.exceptionally(e -> {
log.error("Failed to report top-bundles load data.", e);
return null;
});
} else {
return CompletableFuture.completedFuture(null);
}
} catch (Throwable e) {
log.error("Failed to report top-bundles load data.", e);
return CompletableFuture.failedFuture(e);
var topBundlesLoadData = generateLoadData();
if (topBundlesLoadData != null || force) {
return this.bundleLoadDataStore.pushAsync(lookupServiceAddress, topKBundles.getLoadData())
.exceptionally(e -> {
log.error("Failed to report top-bundles load data.", e);
return null;
});
} else {
return CompletableFuture.completedFuture(null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand All @@ -54,17 +53,6 @@ public class BrokerLoadDataReporterTest {
BrokerStats brokerStats;
SystemResourceUsage usage;

@BeforeClass
void classSetup() {
MockedStatic<LoadManagerShared> mockLoadManagerShared = Mockito.mockStatic(LoadManagerShared.class);
usage = new SystemResourceUsage();
usage.setCpu(new ResourceUsage(1.0, 100.0));
usage.setMemory(new ResourceUsage(800.0, 200.0));
usage.setDirectMemory(new ResourceUsage(2.0, 100.0));
usage.setBandwidthIn(new ResourceUsage(3.0, 100.0));
usage.setBandwidthOut(new ResourceUsage(4.0, 100.0));
mockLoadManagerShared.when(() -> LoadManagerShared.getSystemResourceUsage(any())).thenReturn(usage);
}
@BeforeMethod
void setup() {
config = new ServiceConfiguration();
Expand All @@ -85,41 +73,54 @@ void setup() {
doReturn(pulsarStats).when(brokerService).getPulsarStats();
doReturn(brokerStats).when(pulsarStats).getBrokerStats();
doReturn(CompletableFuture.completedFuture(null)).when(store).pushAsync(any(), any());

usage = new SystemResourceUsage();
usage.setCpu(new ResourceUsage(1.0, 100.0));
usage.setMemory(new ResourceUsage(800.0, 200.0));
usage.setDirectMemory(new ResourceUsage(2.0, 100.0));
usage.setBandwidthIn(new ResourceUsage(3.0, 100.0));
usage.setBandwidthOut(new ResourceUsage(4.0, 100.0));
}

public void testGenerate() throws IllegalAccessException {
doReturn(0l).when(pulsarStats).getUpdatedAt();
var target = new BrokerLoadDataReporter(pulsar, "", store);
var expected = new BrokerLoadData();
expected.update(usage, 1, 2, 3, 4, 5, config);
FieldUtils.writeDeclaredField(expected, "updatedAt", 0l, true);
var actual = target.generateLoadData();
FieldUtils.writeDeclaredField(actual, "updatedAt", 0l, true);
assertEquals(actual, expected);
try (MockedStatic<LoadManagerShared> mockLoadManagerShared = Mockito.mockStatic(LoadManagerShared.class)) {
mockLoadManagerShared.when(() -> LoadManagerShared.getSystemResourceUsage(any())).thenReturn(usage);
doReturn(0l).when(pulsarStats).getUpdatedAt();
var target = new BrokerLoadDataReporter(pulsar, "", store);
var expected = new BrokerLoadData();
expected.update(usage, 1, 2, 3, 4, 5, config);
FieldUtils.writeDeclaredField(expected, "updatedAt", 0l, true);
var actual = target.generateLoadData();
FieldUtils.writeDeclaredField(actual, "updatedAt", 0l, true);
assertEquals(actual, expected);
}
}

public void testReport() throws IllegalAccessException {
var target = new BrokerLoadDataReporter(pulsar, "broker-1", store);
var localData = (BrokerLoadData) FieldUtils.readDeclaredField(target, "localData", true);
localData.setReportedAt(System.currentTimeMillis());
var lastData = (BrokerLoadData) FieldUtils.readDeclaredField(target, "lastData", true);
lastData.update(usage, 1, 2, 3, 4, 5, config);
target.reportAsync(false);
verify(store, times(0)).pushAsync(any(), any());
try (MockedStatic<LoadManagerShared> mockLoadManagerShared = Mockito.mockStatic(LoadManagerShared.class)) {
mockLoadManagerShared.when(() -> LoadManagerShared.getSystemResourceUsage(any())).thenReturn(usage);
var target = new BrokerLoadDataReporter(pulsar, "broker-1", store);
var localData = (BrokerLoadData) FieldUtils.readDeclaredField(target, "localData", true);
localData.setReportedAt(System.currentTimeMillis());
var lastData = (BrokerLoadData) FieldUtils.readDeclaredField(target, "lastData", true);
lastData.update(usage, 1, 2, 3, 4, 5, config);
target.reportAsync(false);
verify(store, times(0)).pushAsync(any(), any());

target.reportAsync(true);
verify(store, times(1)).pushAsync(eq("broker-1"), any());
target.reportAsync(true);
verify(store, times(1)).pushAsync(eq("broker-1"), any());

target.reportAsync(false);
verify(store, times(1)).pushAsync(eq("broker-1"), any());
target.reportAsync(false);
verify(store, times(1)).pushAsync(eq("broker-1"), any());

localData.setReportedAt(0l);
target.reportAsync(false);
verify(store, times(2)).pushAsync(eq("broker-1"), any());
localData.setReportedAt(0l);
target.reportAsync(false);
verify(store, times(2)).pushAsync(eq("broker-1"), any());

lastData.update(usage, 10000, 2, 3, 4, 5, config);
target.reportAsync(false);
verify(store, times(3)).pushAsync(eq("broker-1"), any());
lastData.update(usage, 10000, 2, 3, 4, 5, config);
target.reportAsync(false);
verify(store, times(3)).pushAsync(eq("broker-1"), any());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.testng.Assert.assertNull;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand Down Expand Up @@ -58,6 +59,7 @@ void setup() {
doReturn(brokerService).when(pulsar).getBrokerService();
doReturn(config).when(pulsar).getConfiguration();
doReturn(pulsarStats).when(brokerService).getPulsarStats();
doReturn(CompletableFuture.completedFuture(null)).when(store).pushAsync(any(), any());

bundleStats = new HashMap<>();
NamespaceBundleStats stats1 = new NamespaceBundleStats();
Expand Down

0 comments on commit b096091

Please sign in to comment.