Skip to content

Commit

Permalink
Filter the transfer dest broker
Browse files Browse the repository at this point in the history
  • Loading branch information
Demogorgon314 committed Mar 29, 2023
1 parent 7a99e74 commit 9f2a551
Show file tree
Hide file tree
Showing 11 changed files with 107 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper;
import org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.TopBundleLoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler;
Expand All @@ -75,6 +76,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -177,7 +179,6 @@ public enum Role {
public ExtensibleLoadManagerImpl() {
this.brokerFilterPipeline = new ArrayList<>();
this.brokerFilterPipeline.add(new BrokerMaxTopicCountFilter());
this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter());
this.brokerFilterPipeline.add(new BrokerVersionFilter());
// TODO: Make brokerSelectionStrategy configurable.
this.brokerSelectionStrategy = new LeastResourceUsageWithWeight();
Expand Down Expand Up @@ -224,6 +225,9 @@ public void start() throws PulsarServerException {
antiAffinityGroupPolicyHelper.listenFailureDomainUpdate();
this.antiAffinityGroupPolicyFilter = new AntiAffinityGroupPolicyFilter(antiAffinityGroupPolicyHelper);
this.brokerFilterPipeline.add(antiAffinityGroupPolicyFilter);
IsolationPoliciesHelper isolationPoliciesHelper = new IsolationPoliciesHelper(
new SimpleResourceAllocationPolicies(pulsar));
this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper));

try {
this.brokerLoadDataStore = LoadDataStoreFactory
Expand Down Expand Up @@ -275,7 +279,8 @@ public void start() throws PulsarServerException {

this.unloadScheduler = new UnloadScheduler(
pulsar, pulsar.getLoadManagerExecutor(), unloadManager,
context, serviceUnitStateChannel, antiAffinityGroupPolicyHelper, unloadCounter, unloadMetrics);
context, serviceUnitStateChannel, antiAffinityGroupPolicyHelper, brokerFilterPipeline,
unloadCounter, unloadMetrics);
this.unloadScheduler.start();
this.splitScheduler = new SplitScheduler(
pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context);
Expand All @@ -287,7 +292,6 @@ public void start() throws PulsarServerException {
public void initialize(PulsarService pulsar) {
this.pulsar = pulsar;
this.conf = pulsar.getConfiguration();
this.brokerFilterPipeline.forEach(brokerFilter -> brokerFilter.initialize(pulsar));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.loadbalance.extensions.filter;

import java.util.Map;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper;
Expand Down Expand Up @@ -50,9 +49,4 @@ public Map<String, BrokerLookupData> filter(
public String name() {
return FILTER_NAME;
}

@Override
public void initialize(PulsarService pulsar) {
return;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.loadbalance.extensions.filter;

import java.util.Map;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
Expand All @@ -35,11 +34,6 @@ public interface BrokerFilter {
*/
String name();

/**
* Initialize this broker filter using the given pulsar service.
*/
void initialize(PulsarService pulsar);

/**
* Filter out unqualified brokers based on implementation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.common.naming.ServiceUnitId;


Expand All @@ -35,16 +33,15 @@ public class BrokerIsolationPoliciesFilter implements BrokerFilter {

public static final String FILTER_NAME = "broker_isolation_policies_filter";

private IsolationPoliciesHelper isolationPoliciesHelper;
private final IsolationPoliciesHelper helper;

@Override
public String name() {
return FILTER_NAME;
public BrokerIsolationPoliciesFilter(IsolationPoliciesHelper helper) {
this.helper = helper;
}

@Override
public void initialize(PulsarService pulsar) {
this.isolationPoliciesHelper = new IsolationPoliciesHelper(new SimpleResourceAllocationPolicies(pulsar));
public String name() {
return FILTER_NAME;
}

@Override
Expand All @@ -53,7 +50,7 @@ public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> availa
LoadManagerContext context)
throws BrokerFilterException {
Set<String> brokerCandidateCache =
isolationPoliciesHelper.applyIsolationPolicies(availableBrokers, serviceUnit);
helper.applyIsolationPolicies(availableBrokers, serviceUnit);
availableBrokers.keySet().retainAll(brokerCandidateCache);
return availableBrokers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.Map;
import java.util.Optional;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
Expand All @@ -36,11 +35,6 @@ public String name() {
return FILTER_NAME;
}

@Override
public void initialize(PulsarService pulsar) {
// No-op
}

@Override
public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> brokers,
ServiceUnitId serviceUnit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Iterator;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerFilterBadVersionException;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
Expand Down Expand Up @@ -148,9 +147,4 @@ public Version getLatestVersionNumber(Map<String, BrokerLookupData> brokerMap)
public String name() {
return FILTER_NAME;
}

@Override
public void initialize(PulsarService pulsar) {
// No-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.MinMaxPriorityQueue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand All @@ -42,10 +45,12 @@
import lombok.experimental.Accessors;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
Expand Down Expand Up @@ -88,6 +93,7 @@ public class TransferShedder implements NamespaceUnloadStrategy {
private final SimpleResourceAllocationPolicies allocationPolicies;
private final IsolationPoliciesHelper isolationPoliciesHelper;
private final AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper;
private final List<BrokerFilter> brokerFilterPipeline;

private final Set<UnloadDecision> decisionCache;
private final UnloadCounter counter;
Expand All @@ -100,12 +106,15 @@ public TransferShedder(UnloadCounter counter){
this.counter = counter;
this.isolationPoliciesHelper = null;
this.antiAffinityGroupPolicyHelper = null;
this.brokerFilterPipeline = new ArrayList<>();
}

public TransferShedder(PulsarService pulsar,
UnloadCounter counter,
List<BrokerFilter> brokerFilterPipeline,
AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper){
this.pulsar = pulsar;
this.brokerFilterPipeline = brokerFilterPipeline;
this.decisionCache = new HashSet<>();
this.allocationPolicies = new SimpleResourceAllocationPolicies(pulsar);
this.counter = counter;
Expand Down Expand Up @@ -462,15 +471,34 @@ private boolean isTransferable(LoadManagerContext context,
NamespaceBundle namespaceBundle =
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespace, bundleRange);

if (!canTransferWithIsolationPoliciesToBroker(
context, availableBrokers, namespaceBundle, srcBroker, dstBroker)) {
return false;
Map<String, BrokerLookupData> candidates = new HashMap<>(availableBrokers);
brokerFilterPipeline.forEach(filter -> {
try {
filter.filter(candidates, namespaceBundle, context);
} catch (BrokerFilterException e) {
log.error("Failed to filter brokers with filter: {}", filter.getClass().getName(), e);
}
});
if (dstBroker.isPresent()) {
if (!candidates.containsKey(dstBroker.get())) {
return false;
}
}

if (!antiAffinityGroupPolicyHelper.canUnload(availableBrokers, bundle, srcBroker, dstBroker)) {
if (!context.brokerConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled()) {
return false;
}
return true;

// Remove the current bundle owner broker.
candidates.remove(srcBroker);
boolean transfer = context.brokerConfiguration().isLoadBalancerTransferEnabled();

// Unload: Check if there are any more candidates available for selection.
if (dstBroker.isEmpty() || !transfer) {
return !candidates.isEmpty();
}
// Transfer: Check if this broker is among the candidates.
return candidates.containsKey(dstBroker.get());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
Expand Down Expand Up @@ -82,10 +83,12 @@ public UnloadScheduler(PulsarService pulsar,
LoadManagerContext context,
ServiceUnitStateChannel channel,
AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper,
List<BrokerFilter> brokerFilterPipeline,
UnloadCounter counter,
AtomicReference<List<Metrics>> unloadMetrics) {
this(pulsar, loadManagerExecutor, unloadManager, context, channel,
createNamespaceUnloadStrategy(pulsar, antiAffinityGroupPolicyHelper, counter), counter, unloadMetrics);
createNamespaceUnloadStrategy(pulsar, brokerFilterPipeline, antiAffinityGroupPolicyHelper, counter),
counter, unloadMetrics);
}

@VisibleForTesting
Expand Down Expand Up @@ -213,6 +216,7 @@ public void close() {
}

private static NamespaceUnloadStrategy createNamespaceUnloadStrategy(PulsarService pulsar,
List<BrokerFilter> brokerFilterPipeline,
AntiAffinityGroupPolicyHelper helper,
UnloadCounter counter) {
ServiceConfiguration conf = pulsar.getConfiguration();
Expand All @@ -224,7 +228,7 @@ private static NamespaceUnloadStrategy createNamespaceUnloadStrategy(PulsarServi
conf.getLoadBalancerLoadPlacementStrategy(), e);
}
log.error("create namespace unload strategy failed. using TransferShedder instead.");
return new TransferShedder(pulsar, counter, helper);
return new TransferShedder(pulsar, counter, brokerFilterPipeline, helper);
}

private boolean isLoadBalancerSheddingEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,6 @@ public String name() {
return "Mock broker filter";
}

@Override
public void initialize(PulsarService pulsar) {
// No-op
}

@Override
public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> brokers,
ServiceUnitId serviceUnit,
Expand Down Expand Up @@ -771,11 +766,6 @@ public String name() {
return "Mock-broker-filter";
}

@Override
public void initialize(PulsarService pulsar) {
// No-op
}

}

private void setPrimaryLoadManager() throws IllegalAccessException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
Expand Down Expand Up @@ -71,13 +70,13 @@ public void testFilterWithNamespaceIsolationPoliciesForPrimaryAndSecondaryBroker
var namespace = "my-tenant/my-ns";
NamespaceName namespaceName = NamespaceName.get(namespace);

BrokerIsolationPoliciesFilter filter = new BrokerIsolationPoliciesFilter();
var policies = mock(SimpleResourceAllocationPolicies.class);

// 1. Namespace: primary=broker1, secondary=broker2, shared=broker3, min_limit = 1
setIsolationPolicies(policies, namespaceName, Set.of("broker1"), Set.of("broker2"), Set.of("broker3"), 1);
IsolationPoliciesHelper isolationPoliciesHelper = new IsolationPoliciesHelper(policies);
FieldUtils.writeDeclaredField(filter, "isolationPoliciesHelper", isolationPoliciesHelper, true);

BrokerIsolationPoliciesFilter filter = new BrokerIsolationPoliciesFilter(isolationPoliciesHelper);

// a. available-brokers: broker1, broker2, broker3 => result: broker1
Map<String, BrokerLookupData> result = filter.filter(new HashMap<>(Map.of(
Expand Down Expand Up @@ -128,13 +127,14 @@ public void testFilterWithPersistentOrNonPersistentDisabled()
doReturn(true).when(namespaceBundle).hasNonPersistentTopic();
doReturn(namespaceName).when(namespaceBundle).getNamespaceObject();

BrokerIsolationPoliciesFilter filter = new BrokerIsolationPoliciesFilter();

var policies = mock(SimpleResourceAllocationPolicies.class);
doReturn(false).when(policies).areIsolationPoliciesPresent(eq(namespaceName));
doReturn(true).when(policies).isSharedBroker(any());
IsolationPoliciesHelper isolationPoliciesHelper = new IsolationPoliciesHelper(policies);
FieldUtils.writeDeclaredField(filter, "isolationPoliciesHelper", isolationPoliciesHelper, true);

BrokerIsolationPoliciesFilter filter = new BrokerIsolationPoliciesFilter(isolationPoliciesHelper);



Map<String, BrokerLookupData> result = filter.filter(new HashMap<>(Map.of(
"broker1", getLookupData(),
Expand Down
Loading

0 comments on commit 9f2a551

Please sign in to comment.