Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-192: Filter the transfer dest broker #19958

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper;
import org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.TopBundleLoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler;
Expand All @@ -75,6 +76,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -119,6 +121,9 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
@Getter
private AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper;

@Getter
private IsolationPoliciesHelper isolationPoliciesHelper;

private LoadDataStore<BrokerLoadData> brokerLoadDataStore;
private LoadDataStore<TopBundlesLoadData> topBundlesLoadDataStore;

Expand Down Expand Up @@ -185,7 +190,6 @@ public enum Role {
public ExtensibleLoadManagerImpl() {
this.brokerFilterPipeline = new ArrayList<>();
this.brokerFilterPipeline.add(new BrokerMaxTopicCountFilter());
this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter());
this.brokerFilterPipeline.add(new BrokerVersionFilter());
// TODO: Make brokerSelectionStrategy configurable.
this.brokerSelectionStrategy = new LeastResourceUsageWithWeight();
Expand Down Expand Up @@ -236,6 +240,9 @@ public void start() throws PulsarServerException {
antiAffinityGroupPolicyHelper.listenFailureDomainUpdate();
this.antiAffinityGroupPolicyFilter = new AntiAffinityGroupPolicyFilter(antiAffinityGroupPolicyHelper);
this.brokerFilterPipeline.add(antiAffinityGroupPolicyFilter);
SimpleResourceAllocationPolicies policies = new SimpleResourceAllocationPolicies(pulsar);
this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies);
this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper));

try {
this.brokerLoadDataStore = LoadDataStoreFactory
Expand Down Expand Up @@ -293,8 +300,8 @@ public void start() throws PulsarServerException {
MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS);

this.unloadScheduler = new UnloadScheduler(
pulsar, pulsar.getLoadManagerExecutor(), unloadManager, context, serviceUnitStateChannel,
unloadCounter, unloadMetrics);
pulsar, pulsar.getLoadManagerExecutor(), unloadManager, context,
serviceUnitStateChannel, unloadCounter, unloadMetrics);
this.unloadScheduler.start();
this.splitScheduler = new SplitScheduler(
pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context);
Expand All @@ -307,7 +314,6 @@ public void start() throws PulsarServerException {
public void initialize(PulsarService pulsar) {
this.pulsar = pulsar;
this.conf = pulsar.getConfiguration();
this.brokerFilterPipeline.forEach(brokerFilter -> brokerFilter.initialize(pulsar));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.loadbalance.extensions.filter;

import java.util.Map;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper;
Expand Down Expand Up @@ -50,9 +49,4 @@ public Map<String, BrokerLookupData> filter(
public String name() {
return FILTER_NAME;
}

@Override
public void initialize(PulsarService pulsar) {
return;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.loadbalance.extensions.filter;

import java.util.Map;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
Expand All @@ -35,11 +34,6 @@ public interface BrokerFilter {
*/
String name();

/**
* Initialize this broker filter using the given pulsar service.
*/
void initialize(PulsarService pulsar);

/**
* Filter out unqualified brokers based on implementation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.common.naming.ServiceUnitId;


Expand All @@ -37,14 +35,13 @@ public class BrokerIsolationPoliciesFilter implements BrokerFilter {

private IsolationPoliciesHelper isolationPoliciesHelper;

@Override
public String name() {
return FILTER_NAME;
public BrokerIsolationPoliciesFilter(IsolationPoliciesHelper helper) {
this.isolationPoliciesHelper = helper;
}

@Override
public void initialize(PulsarService pulsar) {
this.isolationPoliciesHelper = new IsolationPoliciesHelper(new SimpleResourceAllocationPolicies(pulsar));
public String name() {
return FILTER_NAME;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.Map;
import java.util.Optional;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
Expand All @@ -36,11 +35,6 @@ public String name() {
return FILTER_NAME;
}

@Override
public void initialize(PulsarService pulsar) {
// No-op
}

@Override
public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> brokers,
ServiceUnitId serviceUnit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Iterator;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerFilterBadVersionException;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
Expand Down Expand Up @@ -148,9 +147,4 @@ public Version getLatestVersionNumber(Map<String, BrokerLookupData> brokerMap)
public String name() {
return FILTER_NAME;
}

@Override
public void initialize(PulsarService pulsar) {
// No-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
Expand Down Expand Up @@ -49,38 +48,10 @@ public void filter(
channel.getOwnershipEntrySet(), brokerToFailureDomainMap);
}

public boolean canUnload(
Map<String, BrokerLookupData> brokers,
String bundle,
String srcBroker,
Optional<String> dstBroker) {

public boolean hasAntiAffinityGroupPolicy(String bundle) {
try {
var antiAffinityGroupOptional = LoadManagerShared.getNamespaceAntiAffinityGroup(
pulsar, LoadManagerShared.getNamespaceNameFromBundleName(bundle));
if (antiAffinityGroupOptional.isEmpty()) {
return true;
}

// bundle has anti-affinityGroup
if (!pulsar.getConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled()) {
return false;
}

// copy to retain the input brokers
Map<String, BrokerLookupData> candidates = new HashMap<>(brokers);

filter(candidates, bundle);

candidates.remove(srcBroker);

// unload case
if (dstBroker.isEmpty()) {
return !candidates.isEmpty();
}

// transfer case
return candidates.containsKey(dstBroker.get());
return LoadManagerShared.getNamespaceAntiAffinityGroup(
pulsar, LoadManagerShared.getNamespaceNameFromBundleName(bundle)).isPresent();
} catch (MetadataStoreException e) {
log.error("Failed to check unload candidates. Assumes that bundle:{} cannot unload ", bundle, e);
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;

@Slf4j
Expand Down Expand Up @@ -65,4 +66,8 @@ public boolean isEnableNonPersistentTopics(String brokerUrl) {
return brokerCandidateCache;
}

public boolean hasIsolationPolicy(NamespaceName namespaceName) {
return policies.areIsolationPoliciesPresent(namespaceName);
}

}
Loading