From abee92c47e3b90b4cf7e956ecdbed7731a30bf8d Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Thu, 30 Mar 2023 09:03:11 +0800 Subject: [PATCH] Filter the transfer dest broker --- .../extensions/ExtensibleLoadManagerImpl.java | 14 ++- .../filter/AntiAffinityGroupPolicyFilter.java | 6 -- .../extensions/filter/BrokerFilter.java | 6 -- .../filter/BrokerIsolationPoliciesFilter.java | 11 +- .../filter/BrokerMaxTopicCountFilter.java | 6 -- .../filter/BrokerVersionFilter.java | 6 -- .../AntiAffinityGroupPolicyHelper.java | 10 ++ .../policies/IsolationPoliciesHelper.java | 5 + .../extensions/scheduler/TransferShedder.java | 95 ++++++++-------- .../ExtensibleLoadManagerImplTest.java | 10 -- .../BrokerIsolationPoliciesFilterTest.java | 12 +-- .../scheduler/TransferShedderTest.java | 102 +++++++++++++----- 12 files changed, 155 insertions(+), 128 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 3e078d0a5eb20..7e84fa5969a6d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -64,6 +64,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter; import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision; import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper; +import org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper; import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter; import org.apache.pulsar.broker.loadbalance.extensions.reporter.TopBundleLoadDataReporter; import org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler; @@ -75,6 +76,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy; import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; +import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.naming.NamespaceName; @@ -119,6 +121,9 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { @Getter private AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper; + @Getter + private IsolationPoliciesHelper isolationPoliciesHelper; + private LoadDataStore brokerLoadDataStore; private LoadDataStore topBundlesLoadDataStore; @@ -185,7 +190,6 @@ public enum Role { public ExtensibleLoadManagerImpl() { this.brokerFilterPipeline = new ArrayList<>(); this.brokerFilterPipeline.add(new BrokerMaxTopicCountFilter()); - this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter()); this.brokerFilterPipeline.add(new BrokerVersionFilter()); // TODO: Make brokerSelectionStrategy configurable. this.brokerSelectionStrategy = new LeastResourceUsageWithWeight(); @@ -236,6 +240,9 @@ public void start() throws PulsarServerException { antiAffinityGroupPolicyHelper.listenFailureDomainUpdate(); this.antiAffinityGroupPolicyFilter = new AntiAffinityGroupPolicyFilter(antiAffinityGroupPolicyHelper); this.brokerFilterPipeline.add(antiAffinityGroupPolicyFilter); + SimpleResourceAllocationPolicies policies = new SimpleResourceAllocationPolicies(pulsar); + this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies); + this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper)); try { this.brokerLoadDataStore = LoadDataStoreFactory @@ -293,8 +300,8 @@ public void start() throws PulsarServerException { MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS); this.unloadScheduler = new UnloadScheduler( - pulsar, pulsar.getLoadManagerExecutor(), unloadManager, context, serviceUnitStateChannel, - unloadCounter, unloadMetrics); + pulsar, pulsar.getLoadManagerExecutor(), unloadManager, context, + serviceUnitStateChannel, unloadCounter, unloadMetrics); this.unloadScheduler.start(); this.splitScheduler = new SplitScheduler( pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context); @@ -307,7 +314,6 @@ public void start() throws PulsarServerException { public void initialize(PulsarService pulsar) { this.pulsar = pulsar; this.conf = pulsar.getConfiguration(); - this.brokerFilterPipeline.forEach(brokerFilter -> brokerFilter.initialize(pulsar)); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/AntiAffinityGroupPolicyFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/AntiAffinityGroupPolicyFilter.java index 358f985f83e12..462f8f0e3597a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/AntiAffinityGroupPolicyFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/AntiAffinityGroupPolicyFilter.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.loadbalance.extensions.filter; import java.util.Map; -import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper; @@ -50,9 +49,4 @@ public Map filter( public String name() { return FILTER_NAME; } - - @Override - public void initialize(PulsarService pulsar) { - return; - } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java index 30d25f559b11e..d9cbfdc391ed4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.loadbalance.extensions.filter; import java.util.Map; -import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.loadbalance.BrokerFilterException; import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; @@ -35,11 +34,6 @@ public interface BrokerFilter { */ String name(); - /** - * Initialize this broker filter using the given pulsar service. - */ - void initialize(PulsarService pulsar); - /** * Filter out unqualified brokers based on implementation. * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilter.java index b28c77f76f3eb..eeb0d9d3a3309 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilter.java @@ -21,12 +21,10 @@ import java.util.Map; import java.util.Set; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.loadbalance.BrokerFilterException; import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper; -import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies; import org.apache.pulsar.common.naming.ServiceUnitId; @@ -37,14 +35,13 @@ public class BrokerIsolationPoliciesFilter implements BrokerFilter { private IsolationPoliciesHelper isolationPoliciesHelper; - @Override - public String name() { - return FILTER_NAME; + public BrokerIsolationPoliciesFilter(IsolationPoliciesHelper helper) { + this.isolationPoliciesHelper = helper; } @Override - public void initialize(PulsarService pulsar) { - this.isolationPoliciesHelper = new IsolationPoliciesHelper(new SimpleResourceAllocationPolicies(pulsar)); + public String name() { + return FILTER_NAME; } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java index b98edd3d425e5..0bceae36bb8c2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java @@ -20,7 +20,6 @@ import java.util.Map; import java.util.Optional; -import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.loadbalance.BrokerFilterException; import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData; @@ -36,11 +35,6 @@ public String name() { return FILTER_NAME; } - @Override - public void initialize(PulsarService pulsar) { - // No-op - } - @Override public Map filter(Map brokers, ServiceUnitId serviceUnit, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java index b7332a5ff10a0..7420fcc211309 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java @@ -22,7 +22,6 @@ import java.util.Iterator; import java.util.Map; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.BrokerFilterBadVersionException; import org.apache.pulsar.broker.loadbalance.BrokerFilterException; @@ -148,9 +147,4 @@ public Version getLatestVersionNumber(Map brokerMap) public String name() { return FILTER_NAME; } - - @Override - public void initialize(PulsarService pulsar) { - // No-op - } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/AntiAffinityGroupPolicyHelper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/AntiAffinityGroupPolicyHelper.java index 69e3302bebd50..bde932b4df714 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/AntiAffinityGroupPolicyHelper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/AntiAffinityGroupPolicyHelper.java @@ -87,6 +87,16 @@ public boolean canUnload( } } + public boolean hasAntiAffinityGroupPolicy(String bundle) { + try { + return LoadManagerShared.getNamespaceAntiAffinityGroup( + pulsar, LoadManagerShared.getNamespaceNameFromBundleName(bundle)).isPresent(); + } catch (MetadataStoreException e) { + log.error("Failed to check unload candidates. Assumes that bundle:{} cannot unload ", bundle, e); + return false; + } + } + public void listenFailureDomainUpdate() { LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap); // register listeners for domain changes diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java index 4d7a5bf22d661..67dc702cc0c9f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java @@ -26,6 +26,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; @Slf4j @@ -65,4 +66,8 @@ public boolean isEnableNonPersistentTopics(String brokerUrl) { return brokerCandidateCache; } + public boolean hasIsolationPolicy(NamespaceName namespaceName) { + return policies.areIsolationPoliciesPresent(namespaceName); + } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java index 669219daba273..d9125ff61b5cb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java @@ -32,6 +32,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -46,6 +47,7 @@ import lombok.experimental.Accessors; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.BrokerFilterException; import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; @@ -53,6 +55,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData; +import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter; import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter; import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision; @@ -60,7 +63,6 @@ import org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper; import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; -import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies; import org.apache.pulsar.common.naming.NamespaceBundle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,9 +98,10 @@ public class TransferShedder implements NamespaceUnloadStrategy { private static final String CANNOT_UNLOAD_BUNDLE_MSG = "Can't unload bundle:%s."; private final LoadStats stats = new LoadStats(); private PulsarService pulsar; - private SimpleResourceAllocationPolicies allocationPolicies; private IsolationPoliciesHelper isolationPoliciesHelper; private AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper; + private List brokerFilterPipeline; + private Set decisionCache; @Getter private UnloadCounter counter; @@ -109,7 +112,6 @@ public class TransferShedder implements NamespaceUnloadStrategy { public TransferShedder(UnloadCounter counter){ this.pulsar = null; this.decisionCache = new HashSet<>(); - this.allocationPolicies = null; this.counter = counter; this.isolationPoliciesHelper = null; this.antiAffinityGroupPolicyHelper = null; @@ -117,26 +119,28 @@ public TransferShedder(UnloadCounter counter){ public TransferShedder(PulsarService pulsar, UnloadCounter counter, + List brokerFilterPipeline, + IsolationPoliciesHelper isolationPoliciesHelper, AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper){ this.pulsar = pulsar; this.decisionCache = new HashSet<>(); - this.allocationPolicies = new SimpleResourceAllocationPolicies(pulsar); this.counter = counter; - this.isolationPoliciesHelper = new IsolationPoliciesHelper(allocationPolicies); - this.channel = ServiceUnitStateChannelImpl.get(pulsar); + this.isolationPoliciesHelper = isolationPoliciesHelper; this.antiAffinityGroupPolicyHelper = antiAffinityGroupPolicyHelper; + this.channel = ServiceUnitStateChannelImpl.get(pulsar); + this.brokerFilterPipeline = brokerFilterPipeline; } @Override public void initialize(PulsarService pulsar){ this.pulsar = pulsar; this.decisionCache = new HashSet<>(); - this.allocationPolicies = new SimpleResourceAllocationPolicies(pulsar); var manager = ExtensibleLoadManagerImpl.get(pulsar.getLoadManager().get()); this.counter = manager.getUnloadCounter(); - this.isolationPoliciesHelper = new IsolationPoliciesHelper(allocationPolicies); - this.channel = ServiceUnitStateChannelImpl.get(pulsar); + this.isolationPoliciesHelper = manager.getIsolationPoliciesHelper(); this.antiAffinityGroupPolicyHelper = manager.getAntiAffinityGroupPolicyHelper(); + this.channel = ServiceUnitStateChannelImpl.get(pulsar); + this.brokerFilterPipeline = manager.getBrokerFilterPipeline(); } @@ -673,7 +677,7 @@ private boolean isTransferable(LoadManagerContext context, String bundle, String srcBroker, Optional dstBroker) { - if (pulsar == null || allocationPolicies == null) { + if (pulsar == null) { return true; } @@ -682,54 +686,49 @@ private boolean isTransferable(LoadManagerContext context, NamespaceBundle namespaceBundle = pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespace, bundleRange); - if (!canTransferWithIsolationPoliciesToBroker( - context, availableBrokers, namespaceBundle, srcBroker, dstBroker)) { - return false; - } - - if (antiAffinityGroupPolicyHelper != null - && !antiAffinityGroupPolicyHelper.canUnload(availableBrokers, bundle, srcBroker, dstBroker)) { + if (!isLoadBalancerSheddingBundlesWithPoliciesEnabled(context, namespaceBundle)) { return false; } - return true; - } - - /** - * Check the gave bundle and broker can be transfer or unload with isolation policies applied. - * - * @param context The load manager context. - * @param availableBrokers The available brokers. - * @param namespaceBundle The bundle try to unload or transfer. - * @param currentBroker The current broker. - * @param targetBroker The broker will be transfer to. - * @return Can be transfer/unload or not. - */ - private boolean canTransferWithIsolationPoliciesToBroker(LoadManagerContext context, - Map availableBrokers, - NamespaceBundle namespaceBundle, - String currentBroker, - Optional targetBroker) { - if (isolationPoliciesHelper == null - || !allocationPolicies.areIsolationPoliciesPresent(namespaceBundle.getNamespaceObject())) { - return true; - } - // bundle has isolation policies. - if (!context.brokerConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled()) { - return false; + Map candidates = new HashMap<>(availableBrokers); + brokerFilterPipeline.forEach(filter -> { + try { + filter.filter(candidates, namespaceBundle, context); + } catch (BrokerFilterException e) { + log.error("Failed to filter brokers with filter: {}", filter.getClass().getName(), e); + } + }); + if (dstBroker.isPresent()) { + if (!candidates.containsKey(dstBroker.get())) { + return false; + } } - boolean transfer = context.brokerConfiguration().isLoadBalancerTransferEnabled(); - Set candidates = isolationPoliciesHelper.applyIsolationPolicies(availableBrokers, namespaceBundle); - // Remove the current bundle owner broker. - candidates.remove(currentBroker); + candidates.remove(srcBroker); + boolean transfer = context.brokerConfiguration().isLoadBalancerTransferEnabled(); // Unload: Check if there are any more candidates available for selection. - if (targetBroker.isEmpty() || !transfer) { + if (dstBroker.isEmpty() || !transfer) { return !candidates.isEmpty(); } // Transfer: Check if this broker is among the candidates. - return candidates.contains(targetBroker.get()); + return candidates.containsKey(dstBroker.get()); + } + + protected boolean isLoadBalancerSheddingBundlesWithPoliciesEnabled(LoadManagerContext context, + NamespaceBundle namespaceBundle) { + if (isolationPoliciesHelper != null + && isolationPoliciesHelper.hasIsolationPolicy(namespaceBundle.getNamespaceObject())) { + return context.brokerConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled(); + } + + String bundle = namespaceBundle.toString(); + if (antiAffinityGroupPolicyHelper != null + && antiAffinityGroupPolicyHelper.hasAntiAffinityGroupPolicy(namespaceBundle.toString())) { + return context.brokerConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled(); + } + + return true; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 4c57e6b93fa3c..502e02e465e64 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -284,11 +284,6 @@ public String name() { return "Mock broker filter"; } - @Override - public void initialize(PulsarService pulsar) { - // No-op - } - @Override public Map filter(Map brokers, ServiceUnitId serviceUnit, @@ -821,11 +816,6 @@ public String name() { return "Mock-broker-filter"; } - @Override - public void initialize(PulsarService pulsar) { - // No-op - } - } private void setPrimaryLoadManager() throws IllegalAccessException { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java index a079a23bcea04..5bc1e436bae25 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java @@ -31,7 +31,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; -import org.apache.commons.lang.reflect.FieldUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.BrokerFilterException; import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; @@ -71,13 +70,13 @@ public void testFilterWithNamespaceIsolationPoliciesForPrimaryAndSecondaryBroker var namespace = "my-tenant/my-ns"; NamespaceName namespaceName = NamespaceName.get(namespace); - BrokerIsolationPoliciesFilter filter = new BrokerIsolationPoliciesFilter(); var policies = mock(SimpleResourceAllocationPolicies.class); // 1. Namespace: primary=broker1, secondary=broker2, shared=broker3, min_limit = 1 setIsolationPolicies(policies, namespaceName, Set.of("broker1"), Set.of("broker2"), Set.of("broker3"), 1); IsolationPoliciesHelper isolationPoliciesHelper = new IsolationPoliciesHelper(policies); - FieldUtils.writeDeclaredField(filter, "isolationPoliciesHelper", isolationPoliciesHelper, true); + + BrokerIsolationPoliciesFilter filter = new BrokerIsolationPoliciesFilter(isolationPoliciesHelper); // a. available-brokers: broker1, broker2, broker3 => result: broker1 Map result = filter.filter(new HashMap<>(Map.of( @@ -128,13 +127,14 @@ public void testFilterWithPersistentOrNonPersistentDisabled() doReturn(true).when(namespaceBundle).hasNonPersistentTopic(); doReturn(namespaceName).when(namespaceBundle).getNamespaceObject(); - BrokerIsolationPoliciesFilter filter = new BrokerIsolationPoliciesFilter(); - var policies = mock(SimpleResourceAllocationPolicies.class); doReturn(false).when(policies).areIsolationPoliciesPresent(eq(namespaceName)); doReturn(true).when(policies).isSharedBroker(any()); IsolationPoliciesHelper isolationPoliciesHelper = new IsolationPoliciesHelper(policies); - FieldUtils.writeDeclaredField(filter, "isolationPoliciesHelper", isolationPoliciesHelper, true); + + BrokerIsolationPoliciesFilter filter = new BrokerIsolationPoliciesFilter(isolationPoliciesHelper); + + Map result = filter.filter(new HashMap<>(Map.of( "broker1", getLookupData(), diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java index 8c8d18e202a00..8953c04176539 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java @@ -40,13 +40,16 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import com.google.common.collect.BoundType; import com.google.common.collect.Range; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; @@ -69,6 +72,9 @@ import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData; +import org.apache.pulsar.broker.loadbalance.extensions.filter.AntiAffinityGroupPolicyFilter; +import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter; +import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerIsolationPoliciesFilter; import org.apache.pulsar.broker.loadbalance.extensions.models.TopKBundles; import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter; @@ -108,6 +114,7 @@ public class TransferShedderTest { ExtensibleLoadManagerImpl loadManager; ServiceUnitStateChannel channel; ServiceConfiguration conf; + IsolationPoliciesHelper isolationPoliciesHelper; AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper; LocalPoliciesResources localPoliciesResources; String bundleD1 = "my-tenant/my-namespaceD/0x00000000_0x0FFFFFFF"; @@ -129,6 +136,7 @@ public void init() throws MetadataStoreException { var factory = mock(NamespaceBundleFactory.class); namespaceService = mock(NamespaceService.class); localPoliciesResources = mock(LocalPoliciesResources.class); + isolationPoliciesHelper = mock(IsolationPoliciesHelper.class); antiAffinityGroupPolicyHelper = mock(AntiAffinityGroupPolicyHelper.class); doReturn(conf).when(pulsar).getConfiguration(); doReturn(namespaceService).when(pulsar).getNamespaceService(); @@ -158,8 +166,6 @@ public void init() throws MetadataStoreException { public LoadManagerContext setupContext(){ var ctx = getContext(); - - var topBundlesLoadDataStore = ctx.topBundleLoadDataStore(); topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("my-tenant/my-namespaceA", 1000000, 2000000)); topBundlesLoadDataStore.pushAsync("broker2", getTopBundlesLoad("my-tenant/my-namespaceB", 1000000, 3000000)); @@ -389,13 +395,12 @@ public void startTableView() throws LoadDataStoreException { BrokerRegistry brokerRegistry = mock(BrokerRegistry.class); doReturn(CompletableFuture.completedFuture(Map.of( - "broker1", mock(BrokerLookupData.class), - "broker2", mock(BrokerLookupData.class), - "broker3", mock(BrokerLookupData.class), - "broker4", mock(BrokerLookupData.class), - "broker5", mock(BrokerLookupData.class) - ))) - .when(brokerRegistry).getAvailableBrokerLookupDataAsync(); + "broker1", getMockBrokerLookupData(), + "broker2", getMockBrokerLookupData(), + "broker3", getMockBrokerLookupData(), + "broker4", getMockBrokerLookupData(), + "broker5", getMockBrokerLookupData() + ))).when(brokerRegistry).getAvailableBrokerLookupDataAsync(); doReturn(conf).when(ctx).brokerConfiguration(); doReturn(brokerLoadDataStore).when(ctx).brokerLoadDataStore(); doReturn(topBundleLoadDataStore).when(ctx).topBundleLoadDataStore(); @@ -403,6 +408,14 @@ public void startTableView() throws LoadDataStoreException { return ctx; } + + BrokerLookupData getMockBrokerLookupData() { + BrokerLookupData brokerLookupData = mock(BrokerLookupData.class); + doReturn(true).when(brokerLookupData).persistentTopicsEnabled(); + doReturn(true).when(brokerLookupData).nonPersistentTopicsEnabled(); + return brokerLookupData; + } + @Test public void testEmptyBrokerLoadData() { UnloadCounter counter = new UnloadCounter(); @@ -524,12 +537,12 @@ public void testRecentlyUnloadedBundles() { @Test public void testGetAvailableBrokersFailed() { UnloadCounter counter = new UnloadCounter(); - AntiAffinityGroupPolicyHelper affinityGroupPolicyHelper = mock(AntiAffinityGroupPolicyHelper.class); - TransferShedder transferShedder = new TransferShedder(pulsar, counter, affinityGroupPolicyHelper); + TransferShedder transferShedder = new TransferShedder(pulsar, counter, null, + isolationPoliciesHelper, antiAffinityGroupPolicyHelper); var ctx = setupContext(); BrokerRegistry registry = ctx.brokerRegistry(); doReturn(FutureUtil.failedFuture(new TimeoutException())).when(registry).getAvailableBrokerLookupDataAsync(); - var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of()); + transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of()); assertEquals(counter.getBreakdownCounters().get(Failure).get(Unknown).get(), 1); assertEquals(counter.getLoadAvg(), 0.0); assertEquals(counter.getLoadStd(), 0.0); @@ -537,18 +550,21 @@ public void testGetAvailableBrokersFailed() { @Test(timeOut = 30 * 1000) public void testBundlesWithIsolationPolicies() throws IllegalAccessException { - doReturn(true).when(antiAffinityGroupPolicyHelper).canUnload(any(), any(), any(), any()); + List filters = new ArrayList<>(); + var allocationPoliciesSpy = mock(SimpleResourceAllocationPolicies.class); + IsolationPoliciesHelper isolationPoliciesHelper = new IsolationPoliciesHelper(allocationPoliciesSpy); + BrokerIsolationPoliciesFilter filter = new BrokerIsolationPoliciesFilter(isolationPoliciesHelper); + filters.add(filter); UnloadCounter counter = new UnloadCounter(); - TransferShedder transferShedder = spy(new TransferShedder(pulsar, counter, antiAffinityGroupPolicyHelper)); - var allocationPoliciesSpy = (SimpleResourceAllocationPolicies) - spy(FieldUtils.readDeclaredField(transferShedder, "allocationPolicies", true)); + TransferShedder transferShedder = spy(new TransferShedder(pulsar, counter, null, + isolationPoliciesHelper, antiAffinityGroupPolicyHelper)); FieldUtils.writeDeclaredField(transferShedder, "allocationPolicies", allocationPoliciesSpy, true); - IsolationPoliciesHelper isolationPoliciesHelper = new IsolationPoliciesHelper(allocationPoliciesSpy); - FieldUtils.writeDeclaredField(transferShedder, "isolationPoliciesHelper", isolationPoliciesHelper, true); setIsolationPolicies(allocationPoliciesSpy, "my-tenant/my-namespaceE", Set.of("broker5"), Set.of(), Set.of(), 1); var ctx = setupContext(); + ctx.brokerConfiguration().setLoadBalancerSheddingBundlesWithPoliciesEnabled(true); + doReturn(ctx.brokerConfiguration()).when(pulsar).getConfiguration(); var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of()); var expected = new HashSet(); expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.of("broker1")), @@ -641,22 +657,25 @@ private void setIsolationPolicies(SimpleResourceAllocationPolicies policies, @Test - public void testBundlesWithAntiAffinityGroup() throws IllegalAccessException, MetadataStoreException { + public void testBundlesWithAntiAffinityGroup() throws MetadataStoreException { + var filters = new ArrayList(); + AntiAffinityGroupPolicyFilter filter = new AntiAffinityGroupPolicyFilter(antiAffinityGroupPolicyHelper); + filters.add(filter); var counter = new UnloadCounter(); - TransferShedder transferShedder = new TransferShedder(pulsar, counter, antiAffinityGroupPolicyHelper); - var allocationPoliciesSpy = (SimpleResourceAllocationPolicies) - spy(FieldUtils.readDeclaredField(transferShedder, "allocationPolicies", true)); - doReturn(false).when(allocationPoliciesSpy).areIsolationPoliciesPresent(any()); - FieldUtils.writeDeclaredField(transferShedder, "allocationPolicies", allocationPoliciesSpy, true); + TransferShedder transferShedder = new TransferShedder(pulsar, counter, filters, + isolationPoliciesHelper, antiAffinityGroupPolicyHelper); LocalPolicies localPolicies = new LocalPolicies(null, null, "namespaceAntiAffinityGroup"); doReturn(Optional.of(localPolicies)).when(localPoliciesResources).getLocalPolicies(any()); var ctx = setupContext(); - var antiAffinityGroupPolicyHelperSpy = (AntiAffinityGroupPolicyHelper) - spy(FieldUtils.readDeclaredField(transferShedder, "antiAffinityGroupPolicyHelper", true)); - doReturn(false).when(antiAffinityGroupPolicyHelperSpy).canUnload(any(), any(), any(), any()); - FieldUtils.writeDeclaredField(transferShedder, "antiAffinityGroupPolicyHelper", antiAffinityGroupPolicyHelperSpy, true); + ctx.brokerConfiguration().setLoadBalancerSheddingBundlesWithPoliciesEnabled(true); + + doAnswer(invocationOnMock -> { + Map brokers = invocationOnMock.getArgument(0); + brokers.clear(); + return brokers; + }).when(antiAffinityGroupPolicyHelper).filter(any(), any()); var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of()); assertTrue(res.isEmpty()); @@ -664,8 +683,16 @@ public void testBundlesWithAntiAffinityGroup() throws IllegalAccessException, Me assertEquals(counter.getLoadAvg(), setupLoadAvg); assertEquals(counter.getLoadStd(), setupLoadStd); + doAnswer(invocationOnMock -> { + Map brokers = invocationOnMock.getArgument(0); + String bundle = invocationOnMock.getArgument(1, String.class); - doReturn(true).when(antiAffinityGroupPolicyHelperSpy).canUnload(any(), eq(bundleE1), any(), any()); + if (bundle.equalsIgnoreCase(bundleE1)) { + return brokers; + } + brokers.clear(); + return brokers; + }).when(antiAffinityGroupPolicyHelper).filter(any(), any()); var res2 = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of()); var expected2 = new HashSet<>(); expected2.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")), @@ -675,6 +702,23 @@ public void testBundlesWithAntiAffinityGroup() throws IllegalAccessException, Me assertEquals(counter.getLoadStd(), setupLoadStd); } + @Test + public void testIsLoadBalancerSheddingBundlesWithPoliciesEnabled() { + var counter = new UnloadCounter(); + TransferShedder transferShedder = new TransferShedder(pulsar, counter, new ArrayList<>(), + isolationPoliciesHelper, antiAffinityGroupPolicyHelper); + + var ctx = setupContext(); + + ctx.brokerConfiguration().setLoadBalancerSheddingBundlesWithPoliciesEnabled(false); + NamespaceBundle namespaceBundle = mock(NamespaceBundle.class); + doReturn("bundle").when(namespaceBundle).toString(); + assertFalse(transferShedder.isLoadBalancerSheddingBundlesWithPoliciesEnabled(ctx, namespaceBundle)); + + + + } + @Test public void testTargetStd() { UnloadCounter counter = new UnloadCounter();