Skip to content

Commit

Permalink
[improve][broker] PIP-192 Excluded bundles with isolation policy or a…
Browse files Browse the repository at this point in the history
…nti-affinity-group policy from topk load bundles
  • Loading branch information
heesung-sn committed Mar 14, 2023
1 parent da3cab5 commit 57f5bb7
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2600,6 +2600,16 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
)
private long loadBalancerServiceUnitStateCleanUpDelayTimeInSeconds = 604800;

@FieldContext(
category = CATEGORY_LOAD_BALANCER,
dynamic = true,
doc = "Option to automatically unload namespace bundles with affinity(isolation) "
+ "or anti-affinity group policies."
+ "Such bundles are not ideal targets to auto-unload as destination brokers are limited."
+ "(only used in load balancer extension logics)"
)
private boolean loadBalancerSheddingBundlesWithPoliciesEnabled = false;

/**** --- Replication. --- ****/
@FieldContext(
category = CATEGORY_REPLICATION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,19 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;

/**
Expand All @@ -36,14 +43,23 @@
@Getter
@ToString
@EqualsAndHashCode
@NoArgsConstructor
@Slf4j
public class TopKBundles {

// temp array for sorting
private final List<Map.Entry<String, ? extends Comparable>> arr = new ArrayList<>();

private final TopBundlesLoadData loadData = new TopBundlesLoadData();

private final PulsarService pulsar;

private final SimpleResourceAllocationPolicies allocationPolicies;

public TopKBundles(PulsarService pulsar) {
this.pulsar = pulsar;
this.allocationPolicies = new SimpleResourceAllocationPolicies(pulsar);
}

/**
* Update the topK bundles from the input bundleStats.
*
Expand All @@ -52,26 +68,35 @@ public class TopKBundles {
*/
public void update(Map<String, NamespaceBundleStats> bundleStats, int topk) {
arr.clear();
for (var etr : bundleStats.entrySet()) {
if (etr.getKey().startsWith(NamespaceName.SYSTEM_NAMESPACE.toString())) {
continue;
try {
var isLoadBalancerSheddingBundlesWithPoliciesEnabled =
pulsar.getConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled();
for (var etr : bundleStats.entrySet()) {
String bundle = etr.getKey();
if (bundle.startsWith(NamespaceName.SYSTEM_NAMESPACE.toString())) {
continue;
}
if (!isLoadBalancerSheddingBundlesWithPoliciesEnabled && hasPolicies(bundle)) {
continue;
}
arr.add(etr);
}
arr.add(etr);
}
var topKBundlesLoadData = loadData.getTopBundlesLoadData();
topKBundlesLoadData.clear();
if (arr.isEmpty()) {
return;
}
topk = Math.min(topk, arr.size());
partitionSort(arr, topk);
var topKBundlesLoadData = loadData.getTopBundlesLoadData();
topKBundlesLoadData.clear();
if (arr.isEmpty()) {
return;
}
topk = Math.min(topk, arr.size());
partitionSort(arr, topk);

for (int i = 0; i < topk; i++) {
var etr = arr.get(i);
topKBundlesLoadData.add(
new TopBundlesLoadData.BundleLoadData(etr.getKey(), (NamespaceBundleStats) etr.getValue()));
for (int i = 0; i < topk; i++) {
var etr = arr.get(i);
topKBundlesLoadData.add(
new TopBundlesLoadData.BundleLoadData(etr.getKey(), (NamespaceBundleStats) etr.getValue()));
}
} finally {
arr.clear();
}
arr.clear();
}

static void partitionSort(List<Map.Entry<String, ? extends Comparable>> arr, int k) {
Expand Down Expand Up @@ -109,4 +134,23 @@ static void partitionSort(List<Map.Entry<String, ? extends Comparable>> arr, int
}
Collections.sort(arr.subList(0, end), (a, b) -> b.getValue().compareTo(a.getValue()));
}

private boolean hasPolicies(String bundle) {
NamespaceName namespace = NamespaceName.get(LoadManagerShared.getNamespaceNameFromBundleName(bundle));
if (allocationPolicies.areIsolationPoliciesPresent(namespace)) {
return true;
}

try {
var antiAffinityGroupOptional =
LoadManagerShared.getNamespaceAntiAffinityGroup(pulsar, namespace.toString());
if (antiAffinityGroupOptional.isPresent()) {
return true;
}
} catch (MetadataStoreException e) {
log.error("Failed to get localPolicies for bundle:{}.", bundle, e);
throw new RuntimeException(e);
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public TopBundleLoadDataReporter(PulsarService pulsar,
this.lookupServiceAddress = lookupServiceAddress;
this.bundleLoadDataStore = bundleLoadDataStore;
this.lastBundleStatsUpdatedAt = 0;
this.topKBundles = new TopKBundles();
this.topKBundles = new TopKBundles(pulsar);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,17 +444,21 @@ private boolean isTransferable(LoadManagerContext context,
if (pulsar == null || allocationPolicies == null) {
return true;
}
var isLoadBalancerSheddingBundlesWithPoliciesEnabled =
context.brokerConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled();
String namespace = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
NamespaceBundle namespaceBundle =
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespace, bundleRange);

if (!canTransferWithIsolationPoliciesToBroker(
if (!isLoadBalancerSheddingBundlesWithPoliciesEnabled
&& !canTransferWithIsolationPoliciesToBroker(
context, availableBrokers, namespaceBundle, srcBroker, dstBroker)) {
return false;
}

if (!antiAffinityGroupPolicyHelper.canUnload(availableBrokers, bundle, srcBroker, dstBroker)) {
if (!isLoadBalancerSheddingBundlesWithPoliciesEnabled
&& !antiAffinityGroupPolicyHelper.canUnload(availableBrokers, bundle, srcBroker, dstBroker)) {
return false;
}

Expand Down
Loading

0 comments on commit 57f5bb7

Please sign in to comment.