Skip to content

Commit

Permalink
disable by default.
Browse files Browse the repository at this point in the history
  • Loading branch information
thetumbled committed May 22, 2024
1 parent 4658f4e commit 6ead458
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1139,13 +1139,18 @@ private CompletableFuture<Integer> selectTopKBundle() {
bundleArr.addAll((Collection<? extends Map.Entry<String, ? extends Comparable>>)
loadData.getBundleData().entrySet());

// select topK bundle for each broker, so select topK * brokerCount bundle in total
int brokerCount = Math.max(1, loadData.getBrokerData().size());
int updateBundleCount = Math.min(pulsar.getConfiguration()
.getLoadBalancerMaxNumberOfBundlesInBundleLoadReport() * brokerCount, bundleArr.size());

TopKBundles.partitionSort(bundleArr, updateBundleCount);
completableFuture.complete(updateBundleCount);
int maxNumberOfBundlesInBundleLoadReport = pulsar.getConfiguration()
.getLoadBalancerMaxNumberOfBundlesInBundleLoadReport();
if (maxNumberOfBundlesInBundleLoadReport <= 0) {
// select all bundle
completableFuture.complete(bundleArr.size());
} else {
// select topK bundle for each broker, so select topK * brokerCount bundle in total
int brokerCount = Math.max(1, loadData.getBrokerData().size());
int updateBundleCount = Math.min(maxNumberOfBundlesInBundleLoadReport * brokerCount, bundleArr.size());
TopKBundles.partitionSort(bundleArr, updateBundleCount);
completableFuture.complete(updateBundleCount);
}
});
return completableFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,8 @@ public void testFilterBundlesWhileWritingToMetadataStore() throws Exception {
pulsarServices.put(pulsar1.getWebServiceAddress(), pulsar1);
pulsarServices.put(pulsar2.getWebServiceAddress(), pulsar2);
MetadataCache<BundleData> metadataCache = pulsar1.getLocalMetadataStore().getMetadataCache(BundleData.class);
PulsarService leaderBroker = pulsarServices.get("http://" + pulsar1.getLeaderElectionService().getCurrentLeader().get().getBrokerId());
String protocol = "http://";
PulsarService leaderBroker = pulsarServices.get(protocol + pulsar1.getLeaderElectionService().getCurrentLeader().get().getBrokerId());
ModularLoadManagerImpl loadManager = (ModularLoadManagerImpl) getField(
leaderBroker.getLoadManager().get(), "loadManager");
int topK = 1;
Expand All @@ -448,7 +449,6 @@ public void testFilterBundlesWhileWritingToMetadataStore() throws Exception {
final NamespaceBundle[] bundles = LoadBalancerTestingUtils.makeBundles(
nsFactory, "test", "test", "test", totalBundles);
LoadData loadData = (LoadData) getField(loadManager, "loadData");
String protocol = "http://";
for (int i = 0; i < totalBundles; i++) {
final BundleData bundleData = new BundleData(10, 1000);
final String bundleDataPath = String.format("%s/%s", BUNDLE_DATA_BASE_PATH, bundles[i]);
Expand Down

0 comments on commit 6ead458

Please sign in to comment.