Skip to content

Commit

Permalink
[improve][broker] PIP-192: Filter the transfer dest broker (#19958)
Browse files Browse the repository at this point in the history
  • Loading branch information
Demogorgon314 authored Apr 5, 2023
1 parent 0d1fe18 commit 7813dab
Show file tree
Hide file tree
Showing 13 changed files with 195 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper;
import org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.TopBundleLoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler;
Expand All @@ -75,6 +76,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -119,6 +121,9 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
@Getter
private AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper;

@Getter
private IsolationPoliciesHelper isolationPoliciesHelper;

private LoadDataStore<BrokerLoadData> brokerLoadDataStore;
private LoadDataStore<TopBundlesLoadData> topBundlesLoadDataStore;

Expand Down Expand Up @@ -185,7 +190,6 @@ public enum Role {
public ExtensibleLoadManagerImpl() {
this.brokerFilterPipeline = new ArrayList<>();
this.brokerFilterPipeline.add(new BrokerMaxTopicCountFilter());
this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter());
this.brokerFilterPipeline.add(new BrokerVersionFilter());
// TODO: Make brokerSelectionStrategy configurable.
this.brokerSelectionStrategy = new LeastResourceUsageWithWeight();
Expand Down Expand Up @@ -236,6 +240,9 @@ public void start() throws PulsarServerException {
antiAffinityGroupPolicyHelper.listenFailureDomainUpdate();
this.antiAffinityGroupPolicyFilter = new AntiAffinityGroupPolicyFilter(antiAffinityGroupPolicyHelper);
this.brokerFilterPipeline.add(antiAffinityGroupPolicyFilter);
SimpleResourceAllocationPolicies policies = new SimpleResourceAllocationPolicies(pulsar);
this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies);
this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper));

try {
this.brokerLoadDataStore = LoadDataStoreFactory
Expand Down Expand Up @@ -293,8 +300,8 @@ public void start() throws PulsarServerException {
MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS);

this.unloadScheduler = new UnloadScheduler(
pulsar, pulsar.getLoadManagerExecutor(), unloadManager, context, serviceUnitStateChannel,
unloadCounter, unloadMetrics);
pulsar, pulsar.getLoadManagerExecutor(), unloadManager, context,
serviceUnitStateChannel, unloadCounter, unloadMetrics);
this.unloadScheduler.start();
this.splitScheduler = new SplitScheduler(
pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context);
Expand All @@ -307,7 +314,6 @@ public void start() throws PulsarServerException {
public void initialize(PulsarService pulsar) {
this.pulsar = pulsar;
this.conf = pulsar.getConfiguration();
this.brokerFilterPipeline.forEach(brokerFilter -> brokerFilter.initialize(pulsar));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.loadbalance.extensions.filter;

import java.util.Map;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper;
Expand Down Expand Up @@ -50,9 +49,4 @@ public Map<String, BrokerLookupData> filter(
public String name() {
return FILTER_NAME;
}

@Override
public void initialize(PulsarService pulsar) {
return;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.loadbalance.extensions.filter;

import java.util.Map;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
Expand All @@ -35,11 +34,6 @@ public interface BrokerFilter {
*/
String name();

/**
* Initialize this broker filter using the given pulsar service.
*/
void initialize(PulsarService pulsar);

/**
* Filter out unqualified brokers based on implementation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.common.naming.ServiceUnitId;


Expand All @@ -37,14 +35,13 @@ public class BrokerIsolationPoliciesFilter implements BrokerFilter {

private IsolationPoliciesHelper isolationPoliciesHelper;

@Override
public String name() {
return FILTER_NAME;
public BrokerIsolationPoliciesFilter(IsolationPoliciesHelper helper) {
this.isolationPoliciesHelper = helper;
}

@Override
public void initialize(PulsarService pulsar) {
this.isolationPoliciesHelper = new IsolationPoliciesHelper(new SimpleResourceAllocationPolicies(pulsar));
public String name() {
return FILTER_NAME;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.Map;
import java.util.Optional;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
Expand All @@ -36,11 +35,6 @@ public String name() {
return FILTER_NAME;
}

@Override
public void initialize(PulsarService pulsar) {
// No-op
}

@Override
public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> brokers,
ServiceUnitId serviceUnit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Iterator;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerFilterBadVersionException;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
Expand Down Expand Up @@ -148,9 +147,4 @@ public Version getLatestVersionNumber(Map<String, BrokerLookupData> brokerMap)
public String name() {
return FILTER_NAME;
}

@Override
public void initialize(PulsarService pulsar) {
// No-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
Expand Down Expand Up @@ -49,38 +48,10 @@ public void filter(
channel.getOwnershipEntrySet(), brokerToFailureDomainMap);
}

public boolean canUnload(
Map<String, BrokerLookupData> brokers,
String bundle,
String srcBroker,
Optional<String> dstBroker) {

public boolean hasAntiAffinityGroupPolicy(String bundle) {
try {
var antiAffinityGroupOptional = LoadManagerShared.getNamespaceAntiAffinityGroup(
pulsar, LoadManagerShared.getNamespaceNameFromBundleName(bundle));
if (antiAffinityGroupOptional.isEmpty()) {
return true;
}

// bundle has anti-affinityGroup
if (!pulsar.getConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled()) {
return false;
}

// copy to retain the input brokers
Map<String, BrokerLookupData> candidates = new HashMap<>(brokers);

filter(candidates, bundle);

candidates.remove(srcBroker);

// unload case
if (dstBroker.isEmpty()) {
return !candidates.isEmpty();
}

// transfer case
return candidates.containsKey(dstBroker.get());
return LoadManagerShared.getNamespaceAntiAffinityGroup(
pulsar, LoadManagerShared.getNamespaceNameFromBundleName(bundle)).isPresent();
} catch (MetadataStoreException e) {
log.error("Failed to check unload candidates. Assumes that bundle:{} cannot unload ", bundle, e);
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;

@Slf4j
Expand Down Expand Up @@ -65,4 +66,8 @@ public boolean isEnableNonPersistentTopics(String brokerUrl) {
return brokerCandidateCache;
}

public boolean hasIsolationPolicy(NamespaceName namespaceName) {
return policies.areIsolationPoliciesPresent(namespaceName);
}

}
Loading

0 comments on commit 7813dab

Please sign in to comment.