Skip to content

Commit

Permalink
Filter the transfer dest broker
Browse files Browse the repository at this point in the history
  • Loading branch information
Demogorgon314 committed Mar 30, 2023
1 parent 68c10ee commit abee92c
Show file tree
Hide file tree
Showing 12 changed files with 155 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper;
import org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.TopBundleLoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler;
Expand All @@ -75,6 +76,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -119,6 +121,9 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
@Getter
private AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper;

@Getter
private IsolationPoliciesHelper isolationPoliciesHelper;

private LoadDataStore<BrokerLoadData> brokerLoadDataStore;
private LoadDataStore<TopBundlesLoadData> topBundlesLoadDataStore;

Expand Down Expand Up @@ -185,7 +190,6 @@ public enum Role {
public ExtensibleLoadManagerImpl() {
this.brokerFilterPipeline = new ArrayList<>();
this.brokerFilterPipeline.add(new BrokerMaxTopicCountFilter());
this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter());
this.brokerFilterPipeline.add(new BrokerVersionFilter());
// TODO: Make brokerSelectionStrategy configurable.
this.brokerSelectionStrategy = new LeastResourceUsageWithWeight();
Expand Down Expand Up @@ -236,6 +240,9 @@ public void start() throws PulsarServerException {
antiAffinityGroupPolicyHelper.listenFailureDomainUpdate();
this.antiAffinityGroupPolicyFilter = new AntiAffinityGroupPolicyFilter(antiAffinityGroupPolicyHelper);
this.brokerFilterPipeline.add(antiAffinityGroupPolicyFilter);
SimpleResourceAllocationPolicies policies = new SimpleResourceAllocationPolicies(pulsar);
this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies);
this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper));

try {
this.brokerLoadDataStore = LoadDataStoreFactory
Expand Down Expand Up @@ -293,8 +300,8 @@ public void start() throws PulsarServerException {
MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS);

this.unloadScheduler = new UnloadScheduler(
pulsar, pulsar.getLoadManagerExecutor(), unloadManager, context, serviceUnitStateChannel,
unloadCounter, unloadMetrics);
pulsar, pulsar.getLoadManagerExecutor(), unloadManager, context,
serviceUnitStateChannel, unloadCounter, unloadMetrics);
this.unloadScheduler.start();
this.splitScheduler = new SplitScheduler(
pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context);
Expand All @@ -307,7 +314,6 @@ public void start() throws PulsarServerException {
public void initialize(PulsarService pulsar) {
this.pulsar = pulsar;
this.conf = pulsar.getConfiguration();
this.brokerFilterPipeline.forEach(brokerFilter -> brokerFilter.initialize(pulsar));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.loadbalance.extensions.filter;

import java.util.Map;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper;
Expand Down Expand Up @@ -50,9 +49,4 @@ public Map<String, BrokerLookupData> filter(
public String name() {
return FILTER_NAME;
}

@Override
public void initialize(PulsarService pulsar) {
return;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.loadbalance.extensions.filter;

import java.util.Map;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
Expand All @@ -35,11 +34,6 @@ public interface BrokerFilter {
*/
String name();

/**
* Initialize this broker filter using the given pulsar service.
*/
void initialize(PulsarService pulsar);

/**
* Filter out unqualified brokers based on implementation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.common.naming.ServiceUnitId;


Expand All @@ -37,14 +35,13 @@ public class BrokerIsolationPoliciesFilter implements BrokerFilter {

private IsolationPoliciesHelper isolationPoliciesHelper;

@Override
public String name() {
return FILTER_NAME;
public BrokerIsolationPoliciesFilter(IsolationPoliciesHelper helper) {
this.isolationPoliciesHelper = helper;
}

@Override
public void initialize(PulsarService pulsar) {
this.isolationPoliciesHelper = new IsolationPoliciesHelper(new SimpleResourceAllocationPolicies(pulsar));
public String name() {
return FILTER_NAME;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.Map;
import java.util.Optional;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
Expand All @@ -36,11 +35,6 @@ public String name() {
return FILTER_NAME;
}

@Override
public void initialize(PulsarService pulsar) {
// No-op
}

@Override
public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> brokers,
ServiceUnitId serviceUnit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Iterator;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerFilterBadVersionException;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
Expand Down Expand Up @@ -148,9 +147,4 @@ public Version getLatestVersionNumber(Map<String, BrokerLookupData> brokerMap)
public String name() {
return FILTER_NAME;
}

@Override
public void initialize(PulsarService pulsar) {
// No-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,16 @@ public boolean canUnload(
}
}

public boolean hasAntiAffinityGroupPolicy(String bundle) {
try {
return LoadManagerShared.getNamespaceAntiAffinityGroup(
pulsar, LoadManagerShared.getNamespaceNameFromBundleName(bundle)).isPresent();
} catch (MetadataStoreException e) {
log.error("Failed to check unload candidates. Assumes that bundle:{} cannot unload ", bundle, e);
return false;
}
}

public void listenFailureDomainUpdate() {
LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap);
// register listeners for domain changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;

@Slf4j
Expand Down Expand Up @@ -65,4 +66,8 @@ public boolean isEnableNonPersistentTopics(String brokerUrl) {
return brokerCandidateCache;
}

public boolean hasIsolationPolicy(NamespaceName namespaceName) {
return policies.areIsolationPoliciesPresent(namespaceName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -46,21 +47,22 @@
import lombok.experimental.Accessors;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper;
import org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -96,9 +98,10 @@ public class TransferShedder implements NamespaceUnloadStrategy {
private static final String CANNOT_UNLOAD_BUNDLE_MSG = "Can't unload bundle:%s.";
private final LoadStats stats = new LoadStats();
private PulsarService pulsar;
private SimpleResourceAllocationPolicies allocationPolicies;
private IsolationPoliciesHelper isolationPoliciesHelper;
private AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper;
private List<BrokerFilter> brokerFilterPipeline;

private Set<UnloadDecision> decisionCache;
@Getter
private UnloadCounter counter;
Expand All @@ -109,34 +112,35 @@ public class TransferShedder implements NamespaceUnloadStrategy {
public TransferShedder(UnloadCounter counter){
this.pulsar = null;
this.decisionCache = new HashSet<>();
this.allocationPolicies = null;
this.counter = counter;
this.isolationPoliciesHelper = null;
this.antiAffinityGroupPolicyHelper = null;
}

public TransferShedder(PulsarService pulsar,
UnloadCounter counter,
List<BrokerFilter> brokerFilterPipeline,
IsolationPoliciesHelper isolationPoliciesHelper,
AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper){
this.pulsar = pulsar;
this.decisionCache = new HashSet<>();
this.allocationPolicies = new SimpleResourceAllocationPolicies(pulsar);
this.counter = counter;
this.isolationPoliciesHelper = new IsolationPoliciesHelper(allocationPolicies);
this.channel = ServiceUnitStateChannelImpl.get(pulsar);
this.isolationPoliciesHelper = isolationPoliciesHelper;
this.antiAffinityGroupPolicyHelper = antiAffinityGroupPolicyHelper;
this.channel = ServiceUnitStateChannelImpl.get(pulsar);
this.brokerFilterPipeline = brokerFilterPipeline;
}

@Override
public void initialize(PulsarService pulsar){
this.pulsar = pulsar;
this.decisionCache = new HashSet<>();
this.allocationPolicies = new SimpleResourceAllocationPolicies(pulsar);
var manager = ExtensibleLoadManagerImpl.get(pulsar.getLoadManager().get());
this.counter = manager.getUnloadCounter();
this.isolationPoliciesHelper = new IsolationPoliciesHelper(allocationPolicies);
this.channel = ServiceUnitStateChannelImpl.get(pulsar);
this.isolationPoliciesHelper = manager.getIsolationPoliciesHelper();
this.antiAffinityGroupPolicyHelper = manager.getAntiAffinityGroupPolicyHelper();
this.channel = ServiceUnitStateChannelImpl.get(pulsar);
this.brokerFilterPipeline = manager.getBrokerFilterPipeline();
}


Expand Down Expand Up @@ -673,7 +677,7 @@ private boolean isTransferable(LoadManagerContext context,
String bundle,
String srcBroker,
Optional<String> dstBroker) {
if (pulsar == null || allocationPolicies == null) {
if (pulsar == null) {
return true;
}

Expand All @@ -682,54 +686,49 @@ private boolean isTransferable(LoadManagerContext context,
NamespaceBundle namespaceBundle =
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespace, bundleRange);

if (!canTransferWithIsolationPoliciesToBroker(
context, availableBrokers, namespaceBundle, srcBroker, dstBroker)) {
return false;
}

if (antiAffinityGroupPolicyHelper != null
&& !antiAffinityGroupPolicyHelper.canUnload(availableBrokers, bundle, srcBroker, dstBroker)) {
if (!isLoadBalancerSheddingBundlesWithPoliciesEnabled(context, namespaceBundle)) {
return false;
}
return true;
}

/**
* Check the gave bundle and broker can be transfer or unload with isolation policies applied.
*
* @param context The load manager context.
* @param availableBrokers The available brokers.
* @param namespaceBundle The bundle try to unload or transfer.
* @param currentBroker The current broker.
* @param targetBroker The broker will be transfer to.
* @return Can be transfer/unload or not.
*/
private boolean canTransferWithIsolationPoliciesToBroker(LoadManagerContext context,
Map<String, BrokerLookupData> availableBrokers,
NamespaceBundle namespaceBundle,
String currentBroker,
Optional<String> targetBroker) {
if (isolationPoliciesHelper == null
|| !allocationPolicies.areIsolationPoliciesPresent(namespaceBundle.getNamespaceObject())) {
return true;
}

// bundle has isolation policies.
if (!context.brokerConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled()) {
return false;
Map<String, BrokerLookupData> candidates = new HashMap<>(availableBrokers);
brokerFilterPipeline.forEach(filter -> {
try {
filter.filter(candidates, namespaceBundle, context);
} catch (BrokerFilterException e) {
log.error("Failed to filter brokers with filter: {}", filter.getClass().getName(), e);
}
});
if (dstBroker.isPresent()) {
if (!candidates.containsKey(dstBroker.get())) {
return false;
}
}

boolean transfer = context.brokerConfiguration().isLoadBalancerTransferEnabled();
Set<String> candidates = isolationPoliciesHelper.applyIsolationPolicies(availableBrokers, namespaceBundle);

// Remove the current bundle owner broker.
candidates.remove(currentBroker);
candidates.remove(srcBroker);
boolean transfer = context.brokerConfiguration().isLoadBalancerTransferEnabled();

// Unload: Check if there are any more candidates available for selection.
if (targetBroker.isEmpty() || !transfer) {
if (dstBroker.isEmpty() || !transfer) {
return !candidates.isEmpty();
}
// Transfer: Check if this broker is among the candidates.
return candidates.contains(targetBroker.get());
return candidates.containsKey(dstBroker.get());
}

protected boolean isLoadBalancerSheddingBundlesWithPoliciesEnabled(LoadManagerContext context,
NamespaceBundle namespaceBundle) {
if (isolationPoliciesHelper != null
&& isolationPoliciesHelper.hasIsolationPolicy(namespaceBundle.getNamespaceObject())) {
return context.brokerConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled();
}

String bundle = namespaceBundle.toString();
if (antiAffinityGroupPolicyHelper != null
&& antiAffinityGroupPolicyHelper.hasAntiAffinityGroupPolicy(namespaceBundle.toString())) {
return context.brokerConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled();
}

return true;
}
}
Loading

0 comments on commit abee92c

Please sign in to comment.