Skip to content

Commit

Permalink
resolved comment regarding force and needBrokerDataUpdate order
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Feb 12, 2023
1 parent b1ce2d4 commit 76d004d
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
Expand Down Expand Up @@ -101,6 +102,9 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {

private TopBundleLoadDataReporter topBundleLoadDataReporter;

private ScheduledFuture brokerLoadDataReportTask;
private ScheduledFuture topBundlesLoadDataReportTask;

private boolean started = false;

private final AssignCounter assignCounter = new AssignCounter();
Expand Down Expand Up @@ -166,7 +170,7 @@ public void start() throws PulsarServerException {
new TopBundleLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), topBundlesLoadDataStore);

var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis();
this.pulsar.getLoadManagerExecutor()
this.brokerLoadDataReportTask = this.pulsar.getLoadManagerExecutor()
.scheduleAtFixedRate(() -> {
try {
brokerLoadDataReporter.reportAsync(false);
Expand All @@ -177,9 +181,11 @@ public void start() throws PulsarServerException {
},
interval,
interval, TimeUnit.MILLISECONDS);
this.pulsar.getLoadManagerExecutor()

this.topBundlesLoadDataReportTask = this.pulsar.getLoadManagerExecutor()
.scheduleAtFixedRate(() -> {
try {
// TODO: consider excluding the bundles that are in the process of split.
topBundleLoadDataReporter.reportAsync(false);
} catch (Throwable e) {
log.error("Failed to run the top bundles load manager executor job.", e);
Expand Down Expand Up @@ -303,6 +309,14 @@ public void close() throws PulsarServerException {
return;
}
try {
if (brokerLoadDataReportTask != null) {
brokerLoadDataReportTask.cancel(true);
}

if (topBundlesLoadDataReportTask != null) {
topBundlesLoadDataReportTask.cancel(true);
}

this.brokerLoadDataStore.close();
this.topBundlesLoadDataStore.close();
} catch (IOException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public BrokerLoadData generateLoadData() {
@Override
public CompletableFuture<Void> reportAsync(boolean force) {
BrokerLoadData newLoadData = this.generateLoadData();
if (needBrokerDataUpdate() || force) {
if (force || needBrokerDataUpdate()) {
log.info("publishing load report:{}", localData.toString(conf));
CompletableFuture<Void> future =
this.brokerLoadDataStore.pushAsync(this.lookupServiceAddress, newLoadData);
Expand All @@ -103,7 +103,6 @@ public CompletableFuture<Void> reportAsync(boolean force) {
} else {
log.error("Failed to report the broker load data.", ex);
}
return;
});
return future;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.loadbalance.extensions.models;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -85,24 +86,32 @@ public void testPartitionSort() {
Random rand = new Random();
List<Map.Entry<String, ? extends Comparable>> actual = new ArrayList<>();
List<Map.Entry<String, ? extends Comparable>> expected = new ArrayList<>();
int max = 10;
for (int j = 0; j < 50; j++) {

for (int j = 0; j < 100; j++) {
Map<String, Integer> map = new HashMap<>();
int max = rand.nextInt(10) + 1;
for (int i = 0; i < max; i++) {
int val = rand.nextInt(max);
map.put("" + i, val);
}
actual.clear();
expected.clear();
for(var etr : map.entrySet()){
for (var etr : map.entrySet()) {
actual.add(etr);
expected.add(etr);
}
int topk = rand.nextInt(max) + 1;
TopKBundles.partitionSort(actual, topk);
Collections.sort(expected, (a, b) -> b.getValue().compareTo(a.getValue()));
String errorMsg = null;
for (int i = 0; i < topk; i++) {
assertEquals(actual.get(i).getValue(), expected.get(i).getValue());
Integer l = (Integer) actual.get(i).getValue();
Integer r = (Integer) expected.get(i).getValue();
if (!l.equals(r)) {
errorMsg = String.format("Diff found at i=%d, %d != %d, actual:%s, expected:%s",
i, l, r, actual, expected);
}
assertNull(errorMsg);
}
}
}
Expand Down

0 comments on commit 76d004d

Please sign in to comment.