From 3ec363045af710dbe110cc57c0268880b68f0e71 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Thu, 2 Mar 2023 10:33:50 +0800 Subject: [PATCH 1/9] Add metrics --- .../extensions/ExtensibleLoadManagerImpl.java | 18 +- .../extensions/manager/UnloadManager.java | 25 +- .../extensions/models/UnloadCounter.java | 72 ++++-- .../extensions/models/UnloadDecision.java | 62 +---- .../scheduler/NamespaceUnloadStrategy.java | 7 +- .../extensions/scheduler/TransferShedder.java | 92 ++++---- .../extensions/scheduler/UnloadScheduler.java | 124 ++++++---- .../ExtensibleLoadManagerImplTest.java | 20 +- .../extensions/manager/UnloadManagerTest.java | 51 ++++- .../scheduler/TransferShedderTest.java | 216 +++++++++--------- .../scheduler/UnloadSchedulerTest.java | 51 +++-- 11 files changed, 422 insertions(+), 316 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 716be3718bf19..b9c8c1f632d91 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.loadbalance.extensions; +import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Success; +import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Admin; import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Follower; import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Leader; import com.google.common.annotations.VisibleForTesting; @@ -204,8 +206,8 @@ public void start() throws PulsarServerException { }); this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar); this.brokerRegistry.start(); - this.unloadManager = new UnloadManager(); this.splitManager = new SplitManager(splitCounter); + this.unloadManager = new UnloadManager(unloadCounter); this.serviceUnitStateChannel.listen(unloadManager); this.serviceUnitStateChannel.listen(splitManager); this.leaderElectionService.start(); @@ -265,7 +267,8 @@ public void start() throws PulsarServerException { interval, TimeUnit.MILLISECONDS); this.unloadScheduler = new UnloadScheduler( - pulsar.getLoadManagerExecutor(), unloadManager, context, serviceUnitStateChannel); + pulsar, pulsar.getLoadManagerExecutor(), unloadManager, + context, serviceUnitStateChannel, unloadCounter, unloadMetrics); this.unloadScheduler.start(); this.splitScheduler = new SplitScheduler( pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context); @@ -401,16 +404,21 @@ public CompletableFuture unloadNamespaceBundleAsync(ServiceUnitId bundle, log.warn(msg); throw new IllegalArgumentException(msg); } - return unloadAsync(new Unload(sourceBroker, bundle.toString(), destinationBroker), + Unload unload = new Unload(sourceBroker, bundle.toString(), destinationBroker); + UnloadDecision unloadDecision = + new UnloadDecision(unload, Success, Admin); + return unloadAsync(unloadDecision, conf.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS); }); } - private CompletableFuture unloadAsync(Unload unload, + private CompletableFuture unloadAsync(UnloadDecision unloadDecision, long timeout, TimeUnit timeoutUnit) { + Unload unload = unloadDecision.getUnload(); CompletableFuture future = serviceUnitStateChannel.publishUnloadEventAsync(unload); - return unloadManager.waitAsync(future, unload.serviceUnit(), timeout, timeoutUnit); + return unloadManager.waitAsync(future, unload.serviceUnit(), unloadDecision, timeout, timeoutUnit) + .thenRun(() -> unloadCounter.updateUnloadBrokerCount(1)); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java index ead6384daba8d..2dde0c4708e41 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.loadbalance.extensions.manager; +import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure; +import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -25,6 +27,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; +import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter; +import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision; /** * Unload manager. @@ -32,9 +36,11 @@ @Slf4j public class UnloadManager implements StateChangeListener { + private final UnloadCounter counter; private final Map> inFlightUnloadRequest; - public UnloadManager() { + public UnloadManager(UnloadCounter counter) { + this.counter = counter; this.inFlightUnloadRequest = new ConcurrentHashMap<>(); } @@ -43,14 +49,8 @@ private void complete(String serviceUnit, Throwable ex) { if (!future.isDone()) { if (ex != null) { future.completeExceptionally(ex); - if (log.isDebugEnabled()) { - log.debug("Complete exceptionally unload bundle: {}", serviceUnit, ex); - } } else { future.complete(null); - if (log.isDebugEnabled()) { - log.debug("Complete unload bundle: {}", serviceUnit); - } } } return null; @@ -59,6 +59,7 @@ private void complete(String serviceUnit, Throwable ex) { public CompletableFuture waitAsync(CompletableFuture eventPubFuture, String bundle, + UnloadDecision decision, long timeout, TimeUnit timeoutUnit) { @@ -74,7 +75,15 @@ public CompletableFuture waitAsync(CompletableFuture eventPubFuture, } }); return future; - })); + })).whenComplete((__, ex) -> { + if (ex != null) { + counter.update(Failure, Unknown); + log.warn("Failed to unload bundle: {}", bundle, ex); + return; + } + log.info("Complete unload bundle: {}", bundle); + counter.update(decision); + }); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadCounter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadCounter.java index e2a51b1248967..37483b58b53e1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadCounter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadCounter.java @@ -21,6 +21,7 @@ import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure; import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Skip; import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Success; +import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Admin; import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Balanced; import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.CoolDown; import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoBrokers; @@ -30,11 +31,13 @@ import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Overloaded; import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Underloaded; import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown; +import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.commons.lang3.mutable.MutableLong; +import java.util.concurrent.atomic.AtomicLong; +import lombok.Getter; import org.apache.pulsar.common.stats.Metrics; /** @@ -45,36 +48,63 @@ public class UnloadCounter { long unloadBrokerCount = 0; long unloadBundleCount = 0; - final Map> breakdownCounters; + @Getter + @VisibleForTesting + final Map> breakdownCounters; + @Getter + @VisibleForTesting double loadAvg; + @Getter + @VisibleForTesting double loadStd; + private volatile long updatedAt = 0; + public UnloadCounter() { breakdownCounters = Map.of( Success, Map.of( - Overloaded, new MutableLong(), - Underloaded, new MutableLong()), + Overloaded, new AtomicLong(), + Underloaded, new AtomicLong(), + Admin, new AtomicLong()), Skip, Map.of( - Balanced, new MutableLong(), - NoBundles, new MutableLong(), - CoolDown, new MutableLong(), - OutDatedData, new MutableLong(), - NoLoadData, new MutableLong(), - NoBrokers, new MutableLong(), - Unknown, new MutableLong()), + Balanced, new AtomicLong(), + NoBundles, new AtomicLong(), + CoolDown, new AtomicLong(), + OutDatedData, new AtomicLong(), + NoLoadData, new AtomicLong(), + NoBrokers, new AtomicLong(), + Unknown, new AtomicLong()), Failure, Map.of( - Unknown, new MutableLong()) + Unknown, new AtomicLong()) ); } public void update(UnloadDecision decision) { - var unloads = decision.getUnloads(); - unloadBrokerCount += unloads.keySet().size(); - unloadBundleCount += unloads.values().size(); - breakdownCounters.get(decision.getLabel()).get(decision.getReason()).increment(); - loadAvg = decision.loadAvg; - loadStd = decision.loadStd; + if (decision.getLabel() == Success) { + unloadBundleCount++; + } + breakdownCounters.get(decision.getLabel()).get(decision.getReason()).incrementAndGet(); + updatedAt = System.currentTimeMillis(); + } + + public void update(UnloadDecision.Label label, UnloadDecision.Reason reason) { + if (label == Success) { + unloadBundleCount++; + } + breakdownCounters.get(label).get(reason).incrementAndGet(); + updatedAt = System.currentTimeMillis(); + } + + public void updateLoadData(double loadAvg, double loadStd) { + this.loadAvg = loadAvg; + this.loadStd = loadStd; + updatedAt = System.currentTimeMillis(); + } + + public void updateUnloadBrokerCount(int unloadBrokerCount) { + this.unloadBrokerCount += unloadBrokerCount; + updatedAt = System.currentTimeMillis(); } public List toMetrics(String advertisedBrokerAddress) { @@ -125,4 +155,8 @@ public List toMetrics(String advertisedBrokerAddress) { return metrics; } -} \ No newline at end of file + + public long updatedAt() { + return updatedAt; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadDecision.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadDecision.java index 67503db34eee7..e1087ab6e53ba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadDecision.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadDecision.java @@ -18,29 +18,21 @@ */ package org.apache.pulsar.broker.loadbalance.extensions.models; -import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure; import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Skip; import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Success; -import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Balanced; -import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoBundles; -import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoLoadData; -import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Overloaded; -import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Underloaded; -import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Multimap; +import lombok.AllArgsConstructor; import lombok.Data; /** * Defines the information required to unload or transfer a service unit(e.g. bundle). */ @Data +@AllArgsConstructor public class UnloadDecision { - Multimap unloads; + Unload unload; Label label; Reason reason; - Double loadAvg; - Double loadStd; + public enum Label { Success, Skip, @@ -55,39 +47,20 @@ public enum Reason { OutDatedData, NoLoadData, NoBrokers, + Admin, Unknown } public UnloadDecision() { - unloads = ArrayListMultimap.create(); + unload = null; label = null; reason = null; - loadAvg = null; - loadStd = null; } public void clear() { - unloads.clear(); + unload = null; label = null; reason = null; - loadAvg = null; - loadStd = null; - } - - public void skip(int numOfOverloadedBrokers, - int numOfUnderloadedBrokers, - int numOfBrokersWithEmptyLoadData, - int numOfBrokersWithFewBundles) { - label = Skip; - if (numOfOverloadedBrokers == 0 && numOfUnderloadedBrokers == 0) { - reason = Balanced; - } else if (numOfBrokersWithEmptyLoadData > 0) { - reason = NoLoadData; - } else if (numOfBrokersWithFewBundles > 0) { - reason = NoBundles; - } else { - reason = Unknown; - } } public void skip(Reason reason) { @@ -95,22 +68,9 @@ public void skip(Reason reason) { this.reason = reason; } - public void succeed( - int numOfOverloadedBrokers, - int numOfUnderloadedBrokers) { - - label = Success; - if (numOfOverloadedBrokers > numOfUnderloadedBrokers) { - reason = Overloaded; - } else { - reason = Underloaded; - } - } - - - public void fail() { - label = Failure; - reason = Unknown; + public void succeed(Reason reason) { + this.label = Success; + this.reason = reason; } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceUnloadStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceUnloadStrategy.java index b4dc92d92187d..42af396abcad4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceUnloadStrategy.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceUnloadStrategy.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.loadbalance.extensions.scheduler; import java.util.Map; +import java.util.Set; import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision; @@ -38,8 +39,8 @@ public interface NamespaceUnloadStrategy { * @param recentlyUnloadedBrokers The recently unloaded brokers. * @return unloadDecision containing a list of the bundles that should be unloaded. */ - UnloadDecision findBundlesForUnloading(LoadManagerContext context, - Map recentlyUnloadedBundles, - Map recentlyUnloadedBrokers); + Set findBundlesForUnloading(LoadManagerContext context, + Map recentlyUnloadedBundles, + Map recentlyUnloadedBrokers); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java index 9f9582df2cc28..f1a9bf56699b1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java @@ -18,12 +18,20 @@ */ package org.apache.pulsar.broker.loadbalance.extensions.scheduler; +import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure; +import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Skip; +import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Balanced; import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.CoolDown; import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoBrokers; +import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoBundles; +import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoLoadData; import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.OutDatedData; +import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Overloaded; +import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Underloaded; import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.MinMaxPriorityQueue; +import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -39,6 +47,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData; import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; +import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter; import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision; import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper; import org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper; @@ -80,19 +89,26 @@ public class TransferShedder implements NamespaceUnloadStrategy { private final IsolationPoliciesHelper isolationPoliciesHelper; private final AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper; - private final UnloadDecision decision = new UnloadDecision(); + private final Set decisionCache; + private final UnloadCounter counter; @VisibleForTesting - public TransferShedder(){ + public TransferShedder(UnloadCounter counter){ this.pulsar = null; + this.decisionCache = new HashSet<>(); this.allocationPolicies = null; + this.counter = counter; this.isolationPoliciesHelper = null; this.antiAffinityGroupPolicyHelper = null; } - public TransferShedder(PulsarService pulsar, AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper) { + public TransferShedder(PulsarService pulsar, + UnloadCounter counter, + AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper){ this.pulsar = pulsar; + this.decisionCache = new HashSet<>(); this.allocationPolicies = new SimpleResourceAllocationPolicies(pulsar); + this.counter = counter; this.isolationPoliciesHelper = new IsolationPoliciesHelper(allocationPolicies); this.antiAffinityGroupPolicyHelper = antiAffinityGroupPolicyHelper; } @@ -241,13 +257,12 @@ public String toString() { @Override - public UnloadDecision findBundlesForUnloading(LoadManagerContext context, + public Set findBundlesForUnloading(LoadManagerContext context, Map recentlyUnloadedBundles, Map recentlyUnloadedBrokers) { final var conf = context.brokerConfiguration(); - decision.clear(); + decisionCache.clear(); stats.clear(); - var selectedBundlesCache = decision.getUnloads(); try { final var loadStore = context.brokerLoadDataStore(); @@ -255,22 +270,17 @@ public UnloadDecision findBundlesForUnloading(LoadManagerContext context, boolean debugMode = conf.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled(); var skipReason = stats.update(context.brokerLoadDataStore(), recentlyUnloadedBrokers, conf); - if (!skipReason.isEmpty()) { - decision.skip(skipReason.get()); - log.warn("Failed to update load stat. Reason:{}. Stop unloading.", decision.getReason()); - return decision; + if (skipReason.isPresent()) { + log.warn("Failed to update load stat. Reason:{}. Stop unloading.", skipReason.get()); + counter.update(Skip, skipReason.get()); + return decisionCache; } - decision.setLoadAvg(stats.avg); - decision.setLoadStd(stats.std); + counter.updateLoadData(stats.avg, stats.std); if (debugMode) { log.info("brokers' load stats:{}", stats); } - // success metrics - int numOfOverloadedBrokers = 0; - int numOfUnderloadedBrokers = 0; - // skip metrics int numOfBrokersWithEmptyLoadData = 0; int numOfBrokersWithFewBundles = 0; @@ -283,9 +293,9 @@ public UnloadDecision findBundlesForUnloading(LoadManagerContext context, availableBrokers = context.brokerRegistry().getAvailableBrokerLookupDataAsync() .get(context.brokerConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); } catch (ExecutionException | InterruptedException | TimeoutException e) { - decision.skip(Unknown); - log.warn("Failed to fetch available brokers. Reason:{}. Stop unloading.", decision.getReason(), e); - return decision; + counter.update(Skip, Unknown); + log.warn("Failed to fetch available brokers. Reason: Unknown. Stop unloading.", e); + return decisionCache; } while (true) { @@ -295,6 +305,7 @@ public UnloadDecision findBundlesForUnloading(LoadManagerContext context, } break; } + UnloadDecision.Reason reason; if (stats.std() <= targetStd) { if (hasMsgThroughput(context, stats.minBrokers.peekLast())) { if (debugMode) { @@ -303,10 +314,10 @@ public UnloadDecision findBundlesForUnloading(LoadManagerContext context, } break; } else { - numOfUnderloadedBrokers++; + reason = Underloaded; } } else { - numOfOverloadedBrokers++; + reason = Overloaded; } String maxBroker = stats.maxBrokers().pollLast(); @@ -365,13 +376,16 @@ && isTransferable(context, availableBrokers, if (remainingTopBundles > 1 && (trafficMarkedToOffload < offloadThroughput || !atLeastOneBundleSelected)) { + Unload unload; if (transfer) { - selectedBundlesCache.put(maxBroker, - new Unload(maxBroker, bundle, Optional.of(minBroker))); + unload = new Unload(maxBroker, bundle, Optional.of(minBroker)); } else { - selectedBundlesCache.put(maxBroker, - new Unload(maxBroker, bundle)); + unload = new Unload(maxBroker, bundle); } + var decision = new UnloadDecision(); + decision.setUnload(unload); + decision.succeed(reason); + decisionCache.add(decision); trafficMarkedToOffload += throughput; atLeastOneBundleSelected = true; remainingTopBundles--; @@ -403,26 +417,24 @@ && isTransferable(context, availableBrokers, } if (debugMode) { - log.info("selectedBundlesCache:{}", selectedBundlesCache); + log.info("decisionCache:{}", decisionCache); } - - if (decision.getUnloads().isEmpty()) { - decision.skip( - numOfOverloadedBrokers, - numOfUnderloadedBrokers, - numOfBrokersWithEmptyLoadData, - numOfBrokersWithFewBundles); - } else { - decision.succeed( - numOfOverloadedBrokers, - numOfUnderloadedBrokers); + if (decisionCache.isEmpty()) { + UnloadDecision.Reason reason; + if (numOfBrokersWithEmptyLoadData > 0) { + reason = NoLoadData; + } else if (numOfBrokersWithFewBundles > 0) { + reason = NoBundles; + } else { + reason = Balanced; + } + counter.update(Skip, reason); } } catch (Throwable e) { log.error("Failed to process unloading. ", e); - decision.fail(); + this.counter.update(Failure, Unknown); } - - return decision; + return decisionCache; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java index bc3c8eb6a94fd..b1f1afa9da098 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java @@ -18,21 +18,29 @@ */ package org.apache.pulsar.broker.loadbalance.extensions.scheduler; +import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Success; import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager; +import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; +import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter; import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision; +import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.Reflections; @@ -43,6 +51,8 @@ public class UnloadScheduler implements LoadManagerScheduler { private final ScheduledExecutorService loadManagerExecutor; + private final PulsarService pulsar; + private final UnloadManager unloadManager; private final LoadManagerContext context; @@ -51,32 +61,45 @@ public class UnloadScheduler implements LoadManagerScheduler { private final ServiceConfiguration conf; + private final UnloadCounter counter; + + private final AtomicReference> unloadMetrics; + + private long counterLastUpdatedAt = 0; + private volatile ScheduledFuture task; private final Map recentlyUnloadedBundles; private final Map recentlyUnloadedBrokers; - private volatile CompletableFuture currentRunningFuture = null; - - public UnloadScheduler(ScheduledExecutorService loadManagerExecutor, + public UnloadScheduler(PulsarService pulsar, + ScheduledExecutorService loadManagerExecutor, UnloadManager unloadManager, LoadManagerContext context, - ServiceUnitStateChannel channel) { - this(loadManagerExecutor, unloadManager, context, - channel, createNamespaceUnloadStrategy(context.brokerConfiguration())); + ServiceUnitStateChannel channel, + UnloadCounter counter, + AtomicReference> unloadMetrics) { + this(pulsar, loadManagerExecutor, unloadManager, context, channel, + createNamespaceUnloadStrategy(pulsar, counter), counter, unloadMetrics); } @VisibleForTesting - protected UnloadScheduler(ScheduledExecutorService loadManagerExecutor, + protected UnloadScheduler(PulsarService pulsar, + ScheduledExecutorService loadManagerExecutor, UnloadManager unloadManager, LoadManagerContext context, ServiceUnitStateChannel channel, - NamespaceUnloadStrategy strategy) { + NamespaceUnloadStrategy strategy, + UnloadCounter counter, + AtomicReference> unloadMetrics) { + this.pulsar = pulsar; this.namespaceUnloadStrategy = strategy; this.recentlyUnloadedBundles = new HashMap<>(); this.recentlyUnloadedBrokers = new HashMap<>(); this.loadManagerExecutor = loadManagerExecutor; + this.counter = counter; + this.unloadMetrics = unloadMetrics; this.unloadManager = unloadManager; this.context = context; this.conf = context.brokerConfiguration(); @@ -96,62 +119,73 @@ public synchronized void execute() { } return; } - if (currentRunningFuture != null && !currentRunningFuture.isDone()) { - if (debugMode) { - log.info("Auto namespace unload is running. Skipping."); - } - return; - } // Remove bundles who have been unloaded for longer than the grace period from the recently unloaded map. final long timeout = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(conf.getLoadBalancerSheddingGracePeriodMinutes()); recentlyUnloadedBundles.keySet().removeIf(e -> recentlyUnloadedBundles.get(e) < timeout); - this.currentRunningFuture = channel.isChannelOwnerAsync().thenCompose(isChannelOwner -> { - if (!isChannelOwner) { - if (debugMode) { - log.info("Current broker is not channel owner. Skipping."); + long asyncOpTimeoutMs = conf.getNamespaceBundleUnloadingTimeoutMs(); + synchronized (namespaceUnloadStrategy) { + try { + Boolean isChannelOwner = channel.isChannelOwnerAsync().get(asyncOpTimeoutMs, TimeUnit.MILLISECONDS); + if (!isChannelOwner) { + if (debugMode) { + log.info("Current broker is not channel owner. Skipping."); + } + return; } - return CompletableFuture.completedFuture(null); - } - return context.brokerRegistry().getAvailableBrokersAsync().thenCompose(availableBrokers -> { + List availableBrokers = context.brokerRegistry().getAvailableBrokersAsync() + .get(asyncOpTimeoutMs, TimeUnit.MILLISECONDS); if (debugMode) { - log.info("Available brokers: {}", availableBrokers); + log.info("Available brokers: {}", availableBrokers); } if (availableBrokers.size() <= 1) { log.info("Only 1 broker available: no load shedding will be performed. Skipping."); - return CompletableFuture.completedFuture(null); + return; } - final UnloadDecision unloadDecision = namespaceUnloadStrategy + final Set decisions = namespaceUnloadStrategy .findBundlesForUnloading(context, recentlyUnloadedBundles, recentlyUnloadedBrokers); if (debugMode) { log.info("[{}] Unload decision result: {}", - namespaceUnloadStrategy.getClass().getSimpleName(), unloadDecision.toString()); + namespaceUnloadStrategy.getClass().getSimpleName(), decisions); } - if (unloadDecision.getUnloads().isEmpty()) { + if (decisions.isEmpty()) { if (debugMode) { log.info("[{}] Unload decision unloads is empty. Skipping.", namespaceUnloadStrategy.getClass().getSimpleName()); } - return CompletableFuture.completedFuture(null); + return; } List> futures = new ArrayList<>(); - unloadDecision.getUnloads().forEach((broker, unload) -> { - log.info("[{}] Unloading bundle: {}", namespaceUnloadStrategy.getClass().getSimpleName(), unload); - futures.add(unloadManager.waitAsync(channel.publishUnloadEventAsync(unload), unload.serviceUnit(), - conf.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS) - .thenAccept(__ -> { - recentlyUnloadedBundles.put(unload.serviceUnit(), System.currentTimeMillis()); - recentlyUnloadedBrokers.put(unload.sourceBroker(), System.currentTimeMillis()); - })); - }); - return FutureUtil.waitForAll(futures).exceptionally(ex -> { - log.error("[{}] Namespace unload has exception.", - namespaceUnloadStrategy.getClass().getSimpleName(), ex); - return null; + + Set brokers = new HashSet<>(); + decisions.forEach(decision -> { + if (decision.getLabel() == Success) { + Unload unload = decision.getUnload(); + log.info("[{}] Unloading bundle: {}", + namespaceUnloadStrategy.getClass().getSimpleName(), unload); + futures.add(unloadManager.waitAsync(channel.publishUnloadEventAsync(unload), + unload.serviceUnit(), decision, asyncOpTimeoutMs, TimeUnit.MILLISECONDS) + .thenAccept(__ -> { + brokers.add(unload.sourceBroker()); + recentlyUnloadedBundles.put(unload.serviceUnit(), System.currentTimeMillis()); + recentlyUnloadedBrokers.put(unload.sourceBroker(), System.currentTimeMillis()); + })); + } }); - }); - }); + FutureUtil.waitForAll(futures) + .whenComplete((__, ex) -> counter.updateUnloadBrokerCount(brokers.size())) + .get(asyncOpTimeoutMs, TimeUnit.MICROSECONDS); + } catch (Exception ex) { + log.error("[{}] Namespace unload has exception.", + namespaceUnloadStrategy.getClass().getSimpleName(), ex); + } finally { + if (counter.updatedAt() > counterLastUpdatedAt) { + unloadMetrics.set(counter.toMetrics(pulsar.getAdvertisedAddress())); + counterLastUpdatedAt = counter.updatedAt(); + } + } + } } @Override @@ -174,7 +208,9 @@ public void close() { this.recentlyUnloadedBrokers.clear(); } - private static NamespaceUnloadStrategy createNamespaceUnloadStrategy(ServiceConfiguration conf) { + private static NamespaceUnloadStrategy createNamespaceUnloadStrategy(PulsarService pulsar, + UnloadCounter counter) { + ServiceConfiguration conf = pulsar.getConfiguration(); try { return Reflections.createInstance(conf.getLoadBalancerLoadSheddingStrategy(), NamespaceUnloadStrategy.class, Thread.currentThread().getContextClassLoader()); @@ -183,7 +219,7 @@ private static NamespaceUnloadStrategy createNamespaceUnloadStrategy(ServiceConf conf.getLoadBalancerLoadPlacementStrategy(), e); } log.error("create namespace unload strategy failed. using TransferShedder instead."); - return new TransferShedder(); + return new TransferShedder(pulsar, counter); } private boolean isLoadBalancerSheddingEnabled() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index aa8583c6b57b6..062df13408a95 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -561,20 +561,20 @@ public void testGetMetrics() throws Exception { FieldUtils.writeDeclaredField(unloadCounter, "loadStd", 0.3, true); FieldUtils.writeDeclaredField(unloadCounter, "breakdownCounters", Map.of( Success, new LinkedHashMap<>() {{ - put(Overloaded, new MutableLong(1)); - put(Underloaded, new MutableLong(2)); + put(Overloaded, new AtomicLong(1)); + put(Underloaded, new AtomicLong(2)); }}, Skip, new LinkedHashMap<>() {{ - put(Balanced, new MutableLong(3)); - put(NoBundles, new MutableLong(4)); - put(CoolDown, new MutableLong(5)); - put(OutDatedData, new MutableLong(6)); - put(NoLoadData, new MutableLong(7)); - put(NoBrokers, new MutableLong(8)); - put(Unknown, new MutableLong(9)); + put(Balanced, new AtomicLong(3)); + put(NoBundles, new AtomicLong(4)); + put(CoolDown, new AtomicLong(5)); + put(OutDatedData, new AtomicLong(6)); + put(NoLoadData, new AtomicLong(7)); + put(NoBrokers, new AtomicLong(8)); + put(Unknown, new AtomicLong(9)); }}, Failure, Map.of( - Unknown, new MutableLong(10)) + Unknown, new AtomicLong(10)) ), true); unloadMetrics.set(unloadCounter.toMetrics(pulsar.getAdvertisedAddress())); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java index 75ef913b8a851..6a2ae1cc562cc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java @@ -19,6 +19,10 @@ package org.apache.pulsar.broker.loadbalance.extensions.manager; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.VERSION_ID_INIT; +import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure; +import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Success; +import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Admin; +import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -32,6 +36,9 @@ import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; +import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; +import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter; +import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision; import org.apache.pulsar.common.util.FutureUtil; import org.testng.annotations.Test; @@ -41,10 +48,13 @@ public class UnloadManagerTest { @Test public void testEventPubFutureHasException() { - UnloadManager manager = new UnloadManager(); + UnloadCounter counter = new UnloadCounter(); + UnloadManager manager = new UnloadManager(counter); + var unloadDecision = + new UnloadDecision(new Unload("broker-1", "bundle-1"), Success, Admin); CompletableFuture future = manager.waitAsync(FutureUtil.failedFuture(new Exception("test")), - "bundle-1", 10, TimeUnit.SECONDS); + "bundle-1", unloadDecision, 10, TimeUnit.SECONDS); assertTrue(future.isCompletedExceptionally()); try { @@ -53,14 +63,18 @@ public void testEventPubFutureHasException() { } catch (Exception ex) { assertEquals(ex.getCause().getMessage(), "test"); } + assertEquals(counter.getBreakdownCounters().get(Failure).get(Unknown).get(), 1); } @Test public void testTimeout() throws IllegalAccessException { - UnloadManager manager = new UnloadManager(); + UnloadCounter counter = new UnloadCounter(); + UnloadManager manager = new UnloadManager(counter); + var unloadDecision = + new UnloadDecision(new Unload("broker-1", "bundle-1"), Success, Admin); CompletableFuture future = manager.waitAsync(CompletableFuture.completedFuture(null), - "bundle-1", 3, TimeUnit.SECONDS); + "bundle-1", unloadDecision, 3, TimeUnit.SECONDS); Map> inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager); assertEquals(inFlightUnloadRequestMap.size(), 1); @@ -73,14 +87,18 @@ public void testTimeout() throws IllegalAccessException { } assertEquals(inFlightUnloadRequestMap.size(), 0); + assertEquals(counter.getBreakdownCounters().get(Failure).get(Unknown).get(), 1); } @Test public void testSuccess() throws IllegalAccessException, ExecutionException, InterruptedException { - UnloadManager manager = new UnloadManager(); + UnloadCounter counter = new UnloadCounter(); + UnloadManager manager = new UnloadManager(counter); + var unloadDecision = + new UnloadDecision(new Unload("broker-1", "bundle-1"), Success, Admin); CompletableFuture future = manager.waitAsync(CompletableFuture.completedFuture(null), - "bundle-1", 5, TimeUnit.SECONDS); + "bundle-1", unloadDecision, 5, TimeUnit.SECONDS); Map> inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager); assertEquals(inFlightUnloadRequestMap.size(), 1); @@ -109,10 +127,11 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int new ServiceUnitStateData(ServiceUnitState.Free, "broker-1", VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 0); future.get(); + assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 1); // Success with Owned state. future = manager.waitAsync(CompletableFuture.completedFuture(null), - "bundle-1", 5, TimeUnit.SECONDS); + "bundle-1", unloadDecision, 5, TimeUnit.SECONDS); inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager); assertEquals(inFlightUnloadRequestMap.size(), 1); @@ -121,14 +140,19 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int new ServiceUnitStateData(ServiceUnitState.Owned, "broker-1", VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 0); future.get(); + + assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 2); } @Test public void testFailedStage() throws IllegalAccessException { - UnloadManager manager = new UnloadManager(); + UnloadCounter counter = new UnloadCounter(); + UnloadManager manager = new UnloadManager(counter); + var unloadDecision = + new UnloadDecision(new Unload("broker-1", "bundle-1"), Success, Admin); CompletableFuture future = manager.waitAsync(CompletableFuture.completedFuture(null), - "bundle-1", 5, TimeUnit.SECONDS); + "bundle-1", unloadDecision, 5, TimeUnit.SECONDS); Map> inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager); assertEquals(inFlightUnloadRequestMap.size(), 1); @@ -146,14 +170,18 @@ public void testFailedStage() throws IllegalAccessException { } assertEquals(inFlightUnloadRequestMap.size(), 0); + assertEquals(counter.getBreakdownCounters().get(Failure).get(Unknown).get(), 1); } @Test public void testClose() throws IllegalAccessException { - UnloadManager manager = new UnloadManager(); + UnloadCounter counter = new UnloadCounter(); + UnloadManager manager = new UnloadManager(counter); + var unloadDecision = + new UnloadDecision(new Unload("broker-1", "bundle-1"), Success, Admin); CompletableFuture future = manager.waitAsync(CompletableFuture.completedFuture(null), - "bundle-1", 5, TimeUnit.SECONDS); + "bundle-1", unloadDecision,5, TimeUnit.SECONDS); Map> inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager); assertEquals(inFlightUnloadRequestMap.size(), 1); manager.close(); @@ -165,6 +193,7 @@ public void testClose() throws IllegalAccessException { } catch (Exception ex) { assertTrue(ex.getCause() instanceof IllegalStateException); } + assertEquals(counter.getBreakdownCounters().get(Failure).get(Unknown).get(), 1); } private Map> getInFlightUnloadRequestMap(UnloadManager manager) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java index 3955f1ed9af2e..38588cdef9439 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java @@ -45,6 +45,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Random; @@ -64,6 +65,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData; import org.apache.pulsar.broker.loadbalance.extensions.models.TopKBundles; import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; +import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter; import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision; import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper; import org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper; @@ -351,19 +353,19 @@ public void startTableView() throws LoadDataStoreException { @Test public void testEmptyBrokerLoadData() { - TransferShedder transferShedder = new TransferShedder(); + UnloadCounter counter = new UnloadCounter(); + TransferShedder transferShedder = new TransferShedder(counter); var ctx = getContext(); ctx.brokerConfiguration().setLoadBalancerDebugModeEnabled(true); var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of()); - var expected = new UnloadDecision(); - expected.setLabel(Skip); - expected.setReason(NoBrokers); - assertEquals(res, expected); + assertTrue(res.isEmpty()); + assertEquals(counter.getBreakdownCounters().get(Skip).get(NoBrokers).get(), 1); } @Test public void testEmptyTopBundlesLoadData() { - TransferShedder transferShedder = new TransferShedder(); + UnloadCounter counter = new UnloadCounter(); + TransferShedder transferShedder = new TransferShedder(counter); var ctx = getContext(); ctx.brokerConfiguration().setLoadBalancerDebugModeEnabled(true); var brokerLoadDataStore = ctx.brokerLoadDataStore(); @@ -373,21 +375,20 @@ public void testEmptyTopBundlesLoadData() { brokerLoadDataStore.pushAsync("broker3", getCpuLoad(ctx, 20)); var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of()); - var expected = new UnloadDecision(); - expected.setLabel(Skip); - expected.setReason(NoLoadData); - expected.setLoadAvg(0.39999999999999997); - expected.setLoadStd(0.35590260840104376); - assertEquals(res, expected); + assertTrue(res.isEmpty()); + assertEquals(counter.getBreakdownCounters().get(Skip).get(NoLoadData).get(), 1); + assertEquals(counter.getLoadAvg(), 0.39999999999999997); + assertEquals(counter.getLoadStd(), 0.35590260840104376); } @Test public void testOutDatedLoadData() throws IllegalAccessException { - TransferShedder transferShedder = new TransferShedder(); + UnloadCounter counter = new UnloadCounter(); + TransferShedder transferShedder = new TransferShedder(counter); var ctx = setupContext(); var brokerLoadDataStore = ctx.brokerLoadDataStore(); var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of()); - assertEquals(res.getUnloads().size(), 2); + assertEquals(res.size(), 2); FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker1").get(), "updatedAt", 0, true); @@ -398,16 +399,14 @@ public void testOutDatedLoadData() throws IllegalAccessException { res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of()); - - var expected = new UnloadDecision(); - expected.setLabel(Skip); - expected.setReason(OutDatedData); - assertEquals(res, expected); + assertTrue(res.isEmpty()); + assertEquals(counter.getBreakdownCounters().get(Skip).get(OutDatedData).get(), 1); } @Test public void testRecentlyUnloadedBrokers() { - TransferShedder transferShedder = new TransferShedder(); + UnloadCounter counter = new UnloadCounter(); + TransferShedder transferShedder = new TransferShedder(counter); var ctx = setupContext(); Map recentlyUnloadedBrokers = new HashMap<>(); @@ -416,32 +415,27 @@ public void testRecentlyUnloadedBrokers() { recentlyUnloadedBrokers.put("broker1", oldTS); var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), recentlyUnloadedBrokers); - var expected = new UnloadDecision(); - var unloads = expected.getUnloads(); - unloads.put("broker5", - new Unload("broker5", bundleE1, Optional.of("broker1"))); - unloads.put("broker4", - new Unload("broker4", bundleD1, Optional.of("broker2"))); - - expected.setLabel(Success); - expected.setReason(Overloaded); - expected.setLoadAvg(setupLoadAvg); - expected.setLoadStd(setupLoadStd); + var expected = new HashSet(); + expected.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")), + Success, Overloaded)); + expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.of("broker2")), + Success, Overloaded)); assertEquals(res, expected); + assertEquals(counter.getLoadAvg(), setupLoadAvg); + assertEquals(counter.getLoadStd(), setupLoadStd); var now = System.currentTimeMillis(); recentlyUnloadedBrokers.put("broker1", now); res = transferShedder.findBundlesForUnloading(ctx, Map.of(), recentlyUnloadedBrokers); - expected = new UnloadDecision(); - expected.setLabel(Skip); - expected.setReason(CoolDown); - assertEquals(res, expected); + assertTrue(res.isEmpty()); + assertEquals(counter.getBreakdownCounters().get(Skip).get(CoolDown).get(), 1); } @Test public void testRecentlyUnloadedBundles() { - TransferShedder transferShedder = new TransferShedder(); + UnloadCounter counter = new UnloadCounter(); + TransferShedder transferShedder = new TransferShedder(counter); var ctx = setupContext(); Map recentlyUnloadedBundles = new HashMap<>(); var now = System.currentTimeMillis(); @@ -451,35 +445,39 @@ public void testRecentlyUnloadedBundles() { recentlyUnloadedBundles.put(bundleD2, now); var res = transferShedder.findBundlesForUnloading(ctx, recentlyUnloadedBundles, Map.of()); - var expected = new UnloadDecision(); - expected.setLabel(Skip); - expected.skip(NoBundles); - expected.setLoadAvg(setupLoadAvg); - expected.setLoadStd(setupLoadStd); - assertEquals(res, expected); + assertTrue(res.isEmpty()); + assertEquals(counter.getBreakdownCounters().get(Skip).get(NoBundles).get(), 1); + assertEquals(counter.getLoadAvg(), setupLoadAvg); + assertEquals(counter.getLoadStd(), setupLoadStd); } @Test public void testGetAvailableBrokersFailed() { - TransferShedder transferShedder = new TransferShedder(); + var pulsar = getMockPulsar(); + UnloadCounter counter = new UnloadCounter(); + TransferShedder transferShedder = new TransferShedder(pulsar, counter); var ctx = setupContext(); BrokerRegistry registry = ctx.brokerRegistry(); doReturn(FutureUtil.failedFuture(new TimeoutException())).when(registry).getAvailableBrokerLookupDataAsync(); var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of()); - var expected = new UnloadDecision(); - expected.setLabel(Skip); - expected.skip(Unknown); - expected.setLoadAvg(setupLoadAvg); - expected.setLoadStd(setupLoadStd); - assertEquals(res, expected); + assertTrue(res.isEmpty()); + assertEquals(counter.getBreakdownCounters().get(Skip).get(Unknown).get(), 1); + assertEquals(counter.getLoadAvg(), setupLoadAvg); + assertEquals(counter.getLoadStd(), setupLoadStd); } @Test(timeOut = 30 * 1000) - public void testBundlesWithIsolationPolicies() throws IllegalAccessException { - + public void testBundlesWithIsolationPolicies() throws IllegalAccessException, MetadataStoreException { + var pulsar = getMockPulsar(); + UnloadCounter counter = new UnloadCounter(); + TransferShedder transferShedder = new TransferShedder(pulsar, counter); - TransferShedder transferShedder = new TransferShedder(pulsar, antiAffinityGroupPolicyHelper); + var pulsarResourcesMock = mock(PulsarResources.class); + var localPoliciesResourcesMock = mock(LocalPoliciesResources.class); + doReturn(pulsarResourcesMock).when(pulsar).getPulsarResources(); + doReturn(localPoliciesResourcesMock).when(pulsarResourcesMock).getLocalPolicies(); + doReturn(Optional.empty()).when(localPoliciesResourcesMock).getLocalPolicies(any()); var allocationPoliciesSpy = (SimpleResourceAllocationPolicies) spy(FieldUtils.readDeclaredField(transferShedder, "allocationPolicies", true)); @@ -505,6 +503,11 @@ public void testBundlesWithIsolationPolicies() throws IllegalAccessException { // Test unload a has isolation policies broker. ctx.brokerConfiguration().setLoadBalancerTransferEnabled(false); res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of()); + + expected = new HashSet(); + expected.add(new UnloadDecision(new Unload("broker4", + "my-tenant/my-namespaceD/0x7FFFFFF_0xFFFFFFF", Optional.empty()), + Success, Overloaded)); expected = new UnloadDecision(); unloads = expected.getUnloads(); unloads.put("broker4", @@ -514,6 +517,9 @@ public void testBundlesWithIsolationPolicies() throws IllegalAccessException { expected.setLoadAvg(setupLoadAvg); expected.setLoadStd(setupLoadStd); assertEquals(res, expected); + assertEquals(counter.getLoadAvg(), setupLoadAvg); + assertEquals(counter.getLoadStd(), setupLoadStd); + } public BrokerLookupData getLookupData() { @@ -625,7 +631,8 @@ public void testBundlesWithAntiAffinityGroup() throws IllegalAccessException, Me @Test public void testTargetStd() { - TransferShedder transferShedder = new TransferShedder(); + UnloadCounter counter = new UnloadCounter(); + TransferShedder transferShedder = new TransferShedder(counter); var ctx = getContext(); ctx.brokerConfiguration().setLoadBalancerDebugModeEnabled(true); var brokerLoadDataStore = ctx.brokerLoadDataStore(); @@ -641,17 +648,16 @@ public void testTargetStd() { var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of()); - var expected = new UnloadDecision(); - expected.setLabel(Skip); - expected.skip(Balanced); - expected.setLoadAvg(0.2000000063578288); - expected.setLoadStd(0.08164966587949089); - assertEquals(res, expected); + assertTrue(res.isEmpty()); + assertEquals(counter.getBreakdownCounters().get(Skip).get(Balanced).get(), 1); + assertEquals(counter.getLoadAvg(), 0.2000000063578288); + assertEquals(counter.getLoadStd(), 0.08164966587949089); } @Test public void testSingleTopBundlesLoadData() { - TransferShedder transferShedder = new TransferShedder(); + UnloadCounter counter = new UnloadCounter(); + TransferShedder transferShedder = new TransferShedder(counter); var ctx = setupContext(); var topBundlesLoadDataStore = ctx.topBundleLoadDataStore(); topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("my-tenant/my-namespaceA", 1)); @@ -661,38 +667,35 @@ public void testSingleTopBundlesLoadData() { topBundlesLoadDataStore.pushAsync("broker5", getTopBundlesLoad("my-tenant/my-namespaceE", 70)); var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of()); - var expected = new UnloadDecision(); - expected.setLabel(Skip); - expected.skip(NoBundles); - expected.setLoadAvg(setupLoadAvg); - expected.setLoadStd(setupLoadStd); - assertEquals(res, expected); + assertTrue(res.isEmpty()); + assertEquals(counter.getBreakdownCounters().get(Skip).get(NoBundles).get(), 1); + assertEquals(counter.getLoadAvg(), setupLoadAvg); + assertEquals(counter.getLoadStd(), setupLoadStd); } @Test public void testTargetStdAfterTransfer() { - TransferShedder transferShedder = new TransferShedder(); + UnloadCounter counter = new UnloadCounter(); + TransferShedder transferShedder = new TransferShedder(counter); var ctx = setupContext(); var brokerLoadDataStore = ctx.brokerLoadDataStore(); brokerLoadDataStore.pushAsync("broker4", getCpuLoad(ctx, 55)); brokerLoadDataStore.pushAsync("broker5", getCpuLoad(ctx, 65)); var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of()); - var expected = new UnloadDecision(); - var unloads = expected.getUnloads(); - unloads.put("broker5", - new Unload("broker5", bundleE1, Optional.of("broker1"))); - expected.setLabel(Success); - expected.setReason(Overloaded); - expected.setLoadAvg(0.26400000000000007); - expected.setLoadStd(0.27644891028904417); + var expected = new HashSet(); + expected.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")), + Success, Overloaded)); assertEquals(res, expected); + assertEquals(counter.getLoadAvg(), 0.26400000000000007); + assertEquals(counter.getLoadStd(), 0.27644891028904417); } @Test public void testMinBrokerWithZeroTraffic() throws IllegalAccessException { - TransferShedder transferShedder = new TransferShedder(); + UnloadCounter counter = new UnloadCounter(); + TransferShedder transferShedder = new TransferShedder(counter); var ctx = setupContext(); var brokerLoadDataStore = ctx.brokerLoadDataStore(); brokerLoadDataStore.pushAsync("broker4", getCpuLoad(ctx, 55)); @@ -705,64 +708,57 @@ public void testMinBrokerWithZeroTraffic() throws IllegalAccessException { var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of()); - var expected = new UnloadDecision(); - var unloads = expected.getUnloads(); - unloads.put("broker5", - new Unload("broker5", bundleE1, Optional.of("broker1"))); - unloads.put("broker4", - new Unload("broker4", bundleD1, Optional.of("broker2"))); - expected.setLabel(Success); - expected.setReason(Underloaded); - expected.setLoadAvg(0.26400000000000007); - expected.setLoadStd(0.27644891028904417); + var expected = new HashSet(); + expected.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")), + Success, Overloaded)); + expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.of("broker2")), + Success, Underloaded)); assertEquals(res, expected); + assertEquals(counter.getLoadAvg(), 0.26400000000000007); + assertEquals(counter.getLoadStd(), 0.27644891028904417); } @Test public void testMaxNumberOfTransfersPerShedderCycle() { - TransferShedder transferShedder = new TransferShedder(); + UnloadCounter counter = new UnloadCounter(); + TransferShedder transferShedder = new TransferShedder(counter); var ctx = setupContext(); ctx.brokerConfiguration() .setLoadBalancerMaxNumberOfBrokerTransfersPerCycle(10); var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of()); - - var expected = new UnloadDecision(); - var unloads = expected.getUnloads(); - unloads.put("broker5", - new Unload("broker5", bundleE1, Optional.of("broker1"))); - unloads.put("broker4", - new Unload("broker4", bundleD1, Optional.of("broker2"))); - expected.setLabel(Success); - expected.setReason(Overloaded); - expected.setLoadAvg(setupLoadAvg); - expected.setLoadStd(setupLoadStd); + var expected = new HashSet(); + expected.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")), + Success, Overloaded)); + expected.add(new UnloadDecision(new Unload("broker4", bundleE1, Optional.of("broker2")), + Success, Overloaded)); assertEquals(res, expected); + assertEquals(counter.getLoadAvg(), setupLoadAvg); + assertEquals(counter.getLoadStd(), setupLoadStd); } @Test public void testRemainingTopBundles() { - TransferShedder transferShedder = new TransferShedder(); + UnloadCounter counter = new UnloadCounter(); + TransferShedder transferShedder = new TransferShedder(counter); var ctx = setupContext(); var topBundlesLoadDataStore = ctx.topBundleLoadDataStore(); topBundlesLoadDataStore.pushAsync("broker5", getTopBundlesLoad("my-tenant/my-namespaceE", 3000000, 2000000)); var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of()); - var expected = new UnloadDecision(); - var unloads = expected.getUnloads(); - unloads.put("broker5", - new Unload("broker5", bundleE1, Optional.of("broker1"))); - unloads.put("broker4", - new Unload("broker4", bundleD1, Optional.of("broker2"))); - expected.setLabel(Success); - expected.setReason(Overloaded); - expected.setLoadAvg(setupLoadAvg); - expected.setLoadStd(setupLoadStd); + var expected = new HashSet(); + expected.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")), + Success, Overloaded)); + expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.of("broker2")), + Success, Overloaded)); assertEquals(res, expected); + assertEquals(counter.getLoadAvg(), setupLoadAvg); + assertEquals(counter.getLoadStd(), setupLoadStd); } @Test public void testRandomLoad() throws IllegalAccessException { - TransferShedder transferShedder = new TransferShedder(); + UnloadCounter counter = new UnloadCounter(); + TransferShedder transferShedder = new TransferShedder(counter); for (int i = 0; i < 5; i++) { var ctx = setupContext(10); var conf = ctx.brokerConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java index 73d4eb1f18bfb..58e5024b9517b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java @@ -28,23 +28,29 @@ import static org.mockito.Mockito.verify; import com.google.common.collect.Lists; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry; import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager; import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; +import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter; import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision; import org.apache.pulsar.client.util.ExecutorProvider; +import org.apache.pulsar.common.stats.Metrics; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; @Test(groups = "broker") public class UnloadSchedulerTest { @@ -70,23 +76,28 @@ public void tearDown() { @Test(timeOut = 30 * 1000) public void testExecuteSuccess() { + AtomicReference> reference = new AtomicReference(); + UnloadCounter counter = new UnloadCounter(); LoadManagerContext context = setupContext(); BrokerRegistry registry = context.brokerRegistry(); ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class); UnloadManager unloadManager = mock(UnloadManager.class); + PulsarService pulsar = mock(PulsarService.class); NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class); doReturn(CompletableFuture.completedFuture(true)).when(channel).isChannelOwnerAsync(); doReturn(CompletableFuture.completedFuture(Lists.newArrayList("broker-1", "broker-2"))) .when(registry).getAvailableBrokersAsync(); doReturn(CompletableFuture.completedFuture(null)).when(channel).publishUnloadEventAsync(any()); doReturn(CompletableFuture.completedFuture(null)).when(unloadManager) - .waitAsync(any(), any(), anyLong(), any()); + .waitAsync(any(), any(), any(), anyLong(), any()); UnloadDecision decision = new UnloadDecision(); Unload unload = new Unload("broker-1", "bundle-1"); - decision.getUnloads().put("broker-1", unload); - doReturn(decision).when(unloadStrategy).findBundlesForUnloading(any(), any(), any()); + decision.setUnload(unload); + decision.setLabel(UnloadDecision.Label.Success); + doReturn(Set.of(decision)).when(unloadStrategy).findBundlesForUnloading(any(), any(), any()); - UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, unloadManager, context, channel, unloadStrategy); + UnloadScheduler scheduler = new UnloadScheduler(pulsar, loadManagerExecutor, unloadManager, context, + channel, unloadStrategy, counter, reference); scheduler.execute(); @@ -94,7 +105,7 @@ public void testExecuteSuccess() { // Test empty unload. UnloadDecision emptyUnload = new UnloadDecision(); - doReturn(emptyUnload).when(unloadStrategy).findBundlesForUnloading(any(), any(), any()); + doReturn(Set.of(emptyUnload)).when(unloadStrategy).findBundlesForUnloading(any(), any(), any()); scheduler.execute(); @@ -103,26 +114,29 @@ public void testExecuteSuccess() { @Test(timeOut = 30 * 1000) public void testExecuteMoreThenOnceWhenFirstNotDone() throws InterruptedException { + AtomicReference> reference = new AtomicReference(); + UnloadCounter counter = new UnloadCounter(); LoadManagerContext context = setupContext(); BrokerRegistry registry = context.brokerRegistry(); ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class); UnloadManager unloadManager = mock(UnloadManager.class); + PulsarService pulsar = mock(PulsarService.class); NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class); doReturn(CompletableFuture.completedFuture(true)).when(channel).isChannelOwnerAsync(); doAnswer(__ -> CompletableFuture.supplyAsync(() -> { try { // Delay 5 seconds to finish. - TimeUnit.SECONDS.sleep(5); + TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new RuntimeException(e); } return Lists.newArrayList("broker-1", "broker-2"); }, Executors.newFixedThreadPool(1))).when(registry).getAvailableBrokersAsync(); - UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, unloadManager, context, channel, unloadStrategy); - - ExecutorService executorService = Executors.newFixedThreadPool(10); - CountDownLatch latch = new CountDownLatch(10); - for (int i = 0; i < 10; i++) { + UnloadScheduler scheduler = new UnloadScheduler(pulsar, loadManagerExecutor, unloadManager, context, + channel, unloadStrategy, counter, reference); + ExecutorService executorService = Executors.newFixedThreadPool(5); + CountDownLatch latch = new CountDownLatch(5); + for (int i = 0; i < 5; i++) { executorService.execute(() -> { scheduler.execute(); latch.countDown(); @@ -130,18 +144,21 @@ public void testExecuteMoreThenOnceWhenFirstNotDone() throws InterruptedExceptio } latch.await(); - verify(registry, times(1)).getAvailableBrokersAsync(); + verify(registry, times(5)).getAvailableBrokersAsync(); } @Test(timeOut = 30 * 1000) public void testDisableLoadBalancer() { + AtomicReference> reference = new AtomicReference(); + UnloadCounter counter = new UnloadCounter(); LoadManagerContext context = setupContext(); context.brokerConfiguration().setLoadBalancerEnabled(false); ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class); NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class); UnloadManager unloadManager = mock(UnloadManager.class); - UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, unloadManager, context, channel, unloadStrategy); - + PulsarService pulsar = mock(PulsarService.class); + UnloadScheduler scheduler = new UnloadScheduler(pulsar, loadManagerExecutor, unloadManager, context, + channel, unloadStrategy, counter, reference); scheduler.execute(); verify(channel, times(0)).isChannelOwnerAsync(); @@ -155,12 +172,16 @@ public void testDisableLoadBalancer() { @Test(timeOut = 30 * 1000) public void testNotChannelOwner() { + AtomicReference> reference = new AtomicReference(); + UnloadCounter counter = new UnloadCounter(); LoadManagerContext context = setupContext(); context.brokerConfiguration().setLoadBalancerEnabled(false); ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class); NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class); UnloadManager unloadManager = mock(UnloadManager.class); - UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, unloadManager, context, channel, unloadStrategy); + PulsarService pulsar = mock(PulsarService.class); + UnloadScheduler scheduler = new UnloadScheduler(pulsar, loadManagerExecutor, unloadManager, context, + channel, unloadStrategy, counter, reference); doReturn(CompletableFuture.completedFuture(false)).when(channel).isChannelOwnerAsync(); scheduler.execute(); From ad1e2c51d4aeed96767649985d23f218beb1577a Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Sun, 12 Mar 2023 16:16:48 +0800 Subject: [PATCH 2/9] Address comments --- .../extensions/scheduler/UnloadScheduler.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java index b1f1afa9da098..0e180f9c872e9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java @@ -69,6 +69,8 @@ public class UnloadScheduler implements LoadManagerScheduler { private volatile ScheduledFuture task; + private final Set unloadBrokers; + private final Map recentlyUnloadedBundles; private final Map recentlyUnloadedBrokers; @@ -97,6 +99,7 @@ protected UnloadScheduler(PulsarService pulsar, this.namespaceUnloadStrategy = strategy; this.recentlyUnloadedBundles = new HashMap<>(); this.recentlyUnloadedBrokers = new HashMap<>(); + this.unloadBrokers = new HashSet<>(); this.loadManagerExecutor = loadManagerExecutor; this.counter = counter; this.unloadMetrics = unloadMetrics; @@ -157,8 +160,7 @@ public synchronized void execute() { return; } List> futures = new ArrayList<>(); - - Set brokers = new HashSet<>(); + unloadBrokers.clear(); decisions.forEach(decision -> { if (decision.getLabel() == Success) { Unload unload = decision.getUnload(); @@ -167,15 +169,15 @@ public synchronized void execute() { futures.add(unloadManager.waitAsync(channel.publishUnloadEventAsync(unload), unload.serviceUnit(), decision, asyncOpTimeoutMs, TimeUnit.MILLISECONDS) .thenAccept(__ -> { - brokers.add(unload.sourceBroker()); + unloadBrokers.add(unload.sourceBroker()); recentlyUnloadedBundles.put(unload.serviceUnit(), System.currentTimeMillis()); recentlyUnloadedBrokers.put(unload.sourceBroker(), System.currentTimeMillis()); })); } }); FutureUtil.waitForAll(futures) - .whenComplete((__, ex) -> counter.updateUnloadBrokerCount(brokers.size())) - .get(asyncOpTimeoutMs, TimeUnit.MICROSECONDS); + .whenComplete((__, ex) -> counter.updateUnloadBrokerCount(unloadBrokers.size())) + .get(asyncOpTimeoutMs, TimeUnit.MILLISECONDS); } catch (Exception ex) { log.error("[{}] Namespace unload has exception.", namespaceUnloadStrategy.getClass().getSimpleName(), ex); From 7044678d62cff3adae4ebeedc0c9c67090fd344e Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Tue, 14 Mar 2023 09:38:20 +0800 Subject: [PATCH 3/9] Unload bundle instead of clean tableview --- .../extensions/ExtensibleLoadManagerImplTest.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 062df13408a95..5dfde7edf7deb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -722,13 +722,23 @@ public void initialize(PulsarService pulsar) { } - private static void cleanTableView(ServiceUnitStateChannel channel) + private void cleanTableView(ServiceUnitStateChannel channel) throws IllegalAccessException { var tv = (TableViewImpl) FieldUtils.readField(channel, "tableview", true); var cache = (ConcurrentMap) FieldUtils.readField(tv, "data", true); - cache.clear(); + cache.forEach((k, v) -> { + try { + int i = k.lastIndexOf("/"); + String namespace = k.substring(0, i); + String bundle = k.substring(i + 1); + admin.namespaces().unloadNamespaceBundle(namespace, bundle); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); +// cache.clear(); } private void setPrimaryLoadManager() throws IllegalAccessException { From cc8c56b111d32120833a05dbf3bdf2cbf1483862 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Tue, 14 Mar 2023 09:45:02 +0800 Subject: [PATCH 4/9] Rebase onto master --- .../loadbalance/extensions/ExtensibleLoadManagerImplTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 5dfde7edf7deb..6d542510f7f02 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -65,7 +65,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; -import org.apache.commons.lang3.mutable.MutableLong; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; From 7cae712ee8e52e082fce4aa82cdedc920637c664 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Tue, 14 Mar 2023 16:18:19 +0800 Subject: [PATCH 5/9] rebase --- .../extensions/ExtensibleLoadManagerImpl.java | 2 +- .../AntiAffinityGroupPolicyHelper.java | 3 - .../extensions/scheduler/TransferShedder.java | 4 +- .../extensions/scheduler/UnloadScheduler.java | 7 +- .../scheduler/TransferShedderTest.java | 72 +++++++------------ 5 files changed, 33 insertions(+), 55 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index b9c8c1f632d91..f01171ab066b0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -268,7 +268,7 @@ public void start() throws PulsarServerException { this.unloadScheduler = new UnloadScheduler( pulsar, pulsar.getLoadManagerExecutor(), unloadManager, - context, serviceUnitStateChannel, unloadCounter, unloadMetrics); + context, serviceUnitStateChannel, antiAffinityGroupPolicyHelper, unloadCounter, unloadMetrics); this.unloadScheduler.start(); this.splitScheduler = new SplitScheduler( pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/AntiAffinityGroupPolicyHelper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/AntiAffinityGroupPolicyHelper.java index 28acf5fba0ea1..c8332a1d7b5e6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/AntiAffinityGroupPolicyHelper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/AntiAffinityGroupPolicyHelper.java @@ -54,9 +54,6 @@ public boolean canUnload( String bundle, String srcBroker, Optional dstBroker) { - - - try { var antiAffinityGroupOptional = LoadManagerShared.getNamespaceAntiAffinityGroup( pulsar, LoadManagerShared.getNamespaceNameFromBundleName(bundle)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java index f1a9bf56699b1..6885be9c09102 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java @@ -469,7 +469,6 @@ private boolean isTransferable(LoadManagerContext context, if (!antiAffinityGroupPolicyHelper.canUnload(availableBrokers, bundle, srcBroker, dstBroker)) { return false; } - return true; } @@ -483,7 +482,8 @@ private boolean isTransferable(LoadManagerContext context, * @param targetBroker The broker will be transfer to. * @return Can be transfer/unload or not. */ - private boolean canTransferWithIsolationPoliciesToBroker(LoadManagerContext context, + @VisibleForTesting + protected boolean canTransferWithIsolationPoliciesToBroker(LoadManagerContext context, Map availableBrokers, NamespaceBundle namespaceBundle, String currentBroker, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java index 0e180f9c872e9..31310c5c9cc80 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java @@ -40,6 +40,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter; import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision; +import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.Reflections; @@ -80,10 +81,11 @@ public UnloadScheduler(PulsarService pulsar, UnloadManager unloadManager, LoadManagerContext context, ServiceUnitStateChannel channel, + AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper, UnloadCounter counter, AtomicReference> unloadMetrics) { this(pulsar, loadManagerExecutor, unloadManager, context, channel, - createNamespaceUnloadStrategy(pulsar, counter), counter, unloadMetrics); + createNamespaceUnloadStrategy(pulsar, antiAffinityGroupPolicyHelper, counter), counter, unloadMetrics); } @VisibleForTesting @@ -211,6 +213,7 @@ public void close() { } private static NamespaceUnloadStrategy createNamespaceUnloadStrategy(PulsarService pulsar, + AntiAffinityGroupPolicyHelper helper, UnloadCounter counter) { ServiceConfiguration conf = pulsar.getConfiguration(); try { @@ -221,7 +224,7 @@ private static NamespaceUnloadStrategy createNamespaceUnloadStrategy(PulsarServi conf.getLoadBalancerLoadPlacementStrategy(), e); } log.error("create namespace unload strategy failed. using TransferShedder instead."); - return new TransferShedder(pulsar, counter); + return new TransferShedder(pulsar, counter, helper); } private boolean isLoadBalancerSheddingEnabled() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java index 38588cdef9439..a89f33bb98737 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java @@ -455,7 +455,8 @@ public void testRecentlyUnloadedBundles() { public void testGetAvailableBrokersFailed() { var pulsar = getMockPulsar(); UnloadCounter counter = new UnloadCounter(); - TransferShedder transferShedder = new TransferShedder(pulsar, counter); + AntiAffinityGroupPolicyHelper affinityGroupPolicyHelper = mock(AntiAffinityGroupPolicyHelper.class); + TransferShedder transferShedder = new TransferShedder(pulsar, counter, affinityGroupPolicyHelper); var ctx = setupContext(); BrokerRegistry registry = ctx.brokerRegistry(); doReturn(FutureUtil.failedFuture(new TimeoutException())).when(registry).getAvailableBrokerLookupDataAsync(); @@ -468,16 +469,10 @@ public void testGetAvailableBrokersFailed() { } @Test(timeOut = 30 * 1000) - public void testBundlesWithIsolationPolicies() throws IllegalAccessException, MetadataStoreException { + public void testBundlesWithIsolationPolicies() throws IllegalAccessException { var pulsar = getMockPulsar(); UnloadCounter counter = new UnloadCounter(); - TransferShedder transferShedder = new TransferShedder(pulsar, counter); - - var pulsarResourcesMock = mock(PulsarResources.class); - var localPoliciesResourcesMock = mock(LocalPoliciesResources.class); - doReturn(pulsarResourcesMock).when(pulsar).getPulsarResources(); - doReturn(localPoliciesResourcesMock).when(pulsarResourcesMock).getLocalPolicies(); - doReturn(Optional.empty()).when(localPoliciesResourcesMock).getLocalPolicies(any()); + TransferShedder transferShedder = spy(new TransferShedder(pulsar, counter, antiAffinityGroupPolicyHelper)); var allocationPoliciesSpy = (SimpleResourceAllocationPolicies) spy(FieldUtils.readDeclaredField(transferShedder, "allocationPolicies", true)); @@ -485,41 +480,27 @@ public void testBundlesWithIsolationPolicies() throws IllegalAccessException, Me IsolationPoliciesHelper isolationPoliciesHelper = new IsolationPoliciesHelper(allocationPoliciesSpy); FieldUtils.writeDeclaredField(transferShedder, "isolationPoliciesHelper", isolationPoliciesHelper, true); - // Test transfer to a has isolation policies broker. setIsolationPolicies(allocationPoliciesSpy, "my-tenant/my-namespaceE", Set.of("broker5"), Set.of(), Set.of(), 1); var ctx = setupContext(); var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of()); - var expected = new UnloadDecision(); - var unloads = expected.getUnloads(); - unloads.put("broker4", - new Unload("broker4", bundleD1, Optional.of("broker2"))); - expected.setLabel(Success); - expected.setReason(Overloaded); - expected.setLoadAvg(setupLoadAvg); - expected.setLoadStd(setupLoadStd); + var expected = new HashSet(); + expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.of("broker2")), + Success, Overloaded)); assertEquals(res, expected); + assertEquals(counter.getLoadAvg(), setupLoadAvg); + assertEquals(counter.getLoadStd(), setupLoadStd); // Test unload a has isolation policies broker. ctx.brokerConfiguration().setLoadBalancerTransferEnabled(false); res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of()); - expected = new HashSet(); - expected.add(new UnloadDecision(new Unload("broker4", - "my-tenant/my-namespaceD/0x7FFFFFF_0xFFFFFFF", Optional.empty()), + expected = new HashSet<>(); + expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.empty()), Success, Overloaded)); - expected = new UnloadDecision(); - unloads = expected.getUnloads(); - unloads.put("broker4", - new Unload("broker4", bundleD1, Optional.empty())); - expected.setLabel(Success); - expected.setReason(Overloaded); - expected.setLoadAvg(setupLoadAvg); - expected.setLoadStd(setupLoadStd); assertEquals(res, expected); assertEquals(counter.getLoadAvg(), setupLoadAvg); assertEquals(counter.getLoadStd(), setupLoadStd); - } public BrokerLookupData getLookupData() { @@ -596,7 +577,8 @@ private PulsarService getMockPulsar() { @Test public void testBundlesWithAntiAffinityGroup() throws IllegalAccessException, MetadataStoreException { var pulsar = getMockPulsar(); - TransferShedder transferShedder = new TransferShedder(pulsar, antiAffinityGroupPolicyHelper); + var counter = new UnloadCounter(); + TransferShedder transferShedder = new TransferShedder(pulsar, counter, antiAffinityGroupPolicyHelper); var allocationPoliciesSpy = (SimpleResourceAllocationPolicies) spy(FieldUtils.readDeclaredField(transferShedder, "allocationPolicies", true)); doReturn(false).when(allocationPoliciesSpy).areIsolationPoliciesPresent(any()); @@ -609,24 +591,20 @@ public void testBundlesWithAntiAffinityGroup() throws IllegalAccessException, Me doReturn(false).when(antiAffinityGroupPolicyHelper).canUnload(any(), any(), any(), any()); var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of()); - var expected = new UnloadDecision(); - expected.setLabel(Skip); - expected.skip(NoBundles); - expected.setLoadAvg(setupLoadAvg); - expected.setLoadStd(setupLoadStd); - assertEquals(res, expected); + assertTrue(res.isEmpty()); + assertEquals(counter.getBreakdownCounters().get(Skip).get(NoBundles).get(), 1); + assertEquals(counter.getLoadAvg(), setupLoadAvg); + assertEquals(counter.getLoadStd(), setupLoadStd); + doReturn(true).when(antiAffinityGroupPolicyHelper).canUnload(any(), eq(bundleE1), any(), any()); var res2 = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of()); - var expected2 = new UnloadDecision(); - var unloads = expected2.getUnloads(); - unloads.put("broker5", - new Unload("broker5", bundleE1, Optional.of("broker1"))); - expected2.setLabel(Success); - expected2.setReason(Overloaded); - expected2.setLoadAvg(setupLoadAvg); - expected2.setLoadStd(setupLoadStd); - assertEquals(res2, expected2); + var expected2 = new HashSet<>(); + expected2.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")), + Success, Overloaded)); + assertEquals(res, expected2); + assertEquals(counter.getLoadAvg(), setupLoadAvg); + assertEquals(counter.getLoadStd(), setupLoadStd); } @Test @@ -729,7 +707,7 @@ public void testMaxNumberOfTransfersPerShedderCycle() { var expected = new HashSet(); expected.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")), Success, Overloaded)); - expected.add(new UnloadDecision(new Unload("broker4", bundleE1, Optional.of("broker2")), + expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.of("broker2")), Success, Overloaded)); assertEquals(res, expected); assertEquals(counter.getLoadAvg(), setupLoadAvg); From bb9e642ad57fa684b7dfb4ce511310c465518029 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Tue, 14 Mar 2023 16:21:16 +0800 Subject: [PATCH 6/9] revert useless change --- .../loadbalance/extensions/scheduler/TransferShedder.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java index 6885be9c09102..3c67479bcc7cb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java @@ -482,8 +482,7 @@ private boolean isTransferable(LoadManagerContext context, * @param targetBroker The broker will be transfer to. * @return Can be transfer/unload or not. */ - @VisibleForTesting - protected boolean canTransferWithIsolationPoliciesToBroker(LoadManagerContext context, + private boolean canTransferWithIsolationPoliciesToBroker(LoadManagerContext context, Map availableBrokers, NamespaceBundle namespaceBundle, String currentBroker, From e4d2e6d3864a11d2088a810178c51b9e039650f5 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Tue, 14 Mar 2023 16:22:29 +0800 Subject: [PATCH 7/9] Fix code style --- .../extensions/scheduler/UnloadSchedulerTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java index 58e5024b9517b..7d9cf556360e4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java @@ -76,7 +76,7 @@ public void tearDown() { @Test(timeOut = 30 * 1000) public void testExecuteSuccess() { - AtomicReference> reference = new AtomicReference(); + AtomicReference> reference = new AtomicReference<>(); UnloadCounter counter = new UnloadCounter(); LoadManagerContext context = setupContext(); BrokerRegistry registry = context.brokerRegistry(); @@ -114,7 +114,7 @@ public void testExecuteSuccess() { @Test(timeOut = 30 * 1000) public void testExecuteMoreThenOnceWhenFirstNotDone() throws InterruptedException { - AtomicReference> reference = new AtomicReference(); + AtomicReference> reference = new AtomicReference<>(); UnloadCounter counter = new UnloadCounter(); LoadManagerContext context = setupContext(); BrokerRegistry registry = context.brokerRegistry(); @@ -149,7 +149,7 @@ public void testExecuteMoreThenOnceWhenFirstNotDone() throws InterruptedExceptio @Test(timeOut = 30 * 1000) public void testDisableLoadBalancer() { - AtomicReference> reference = new AtomicReference(); + AtomicReference> reference = new AtomicReference<>(); UnloadCounter counter = new UnloadCounter(); LoadManagerContext context = setupContext(); context.brokerConfiguration().setLoadBalancerEnabled(false); @@ -172,7 +172,7 @@ public void testDisableLoadBalancer() { @Test(timeOut = 30 * 1000) public void testNotChannelOwner() { - AtomicReference> reference = new AtomicReference(); + AtomicReference> reference = new AtomicReference<>(); UnloadCounter counter = new UnloadCounter(); LoadManagerContext context = setupContext(); context.brokerConfiguration().setLoadBalancerEnabled(false); From 431d6336dec87e382de0687957eceff2ba8e8f86 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Wed, 15 Mar 2023 11:12:27 +0800 Subject: [PATCH 8/9] Fix test --- .../ExtensibleLoadManagerImplTest.java | 28 ++----------------- 1 file changed, 2 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 6d542510f7f02..e69833536e214 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -64,7 +64,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentMap; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -73,9 +72,7 @@ import org.apache.pulsar.broker.loadbalance.LeaderBroker; import org.apache.pulsar.broker.loadbalance.LeaderElectionService; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; -import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; -import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData; @@ -93,7 +90,6 @@ import org.apache.pulsar.broker.resources.TenantResources; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.impl.TableViewImpl; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; @@ -205,10 +201,9 @@ protected void cleanup() throws Exception { } @BeforeMethod - protected void initializeState() throws IllegalAccessException { + protected void initializeState() throws PulsarAdminException { + admin.namespaces().unload("public/default"); reset(primaryLoadManager, secondaryLoadManager); - cleanTableView(channel1); - cleanTableView(channel2); } @Test @@ -721,25 +716,6 @@ public void initialize(PulsarService pulsar) { } - private void cleanTableView(ServiceUnitStateChannel channel) - throws IllegalAccessException { - var tv = (TableViewImpl) - FieldUtils.readField(channel, "tableview", true); - var cache = (ConcurrentMap) - FieldUtils.readField(tv, "data", true); - cache.forEach((k, v) -> { - try { - int i = k.lastIndexOf("/"); - String namespace = k.substring(0, i); - String bundle = k.substring(i + 1); - admin.namespaces().unloadNamespaceBundle(namespace, bundle); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); -// cache.clear(); - } - private void setPrimaryLoadManager() throws IllegalAccessException { ExtensibleLoadManagerWrapper wrapper = (ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get(); From 3f89bc6427e654608d9f9921f64bbf6f66ba4864 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Thu, 16 Mar 2023 08:32:55 +0800 Subject: [PATCH 9/9] Rebase onto master --- .../loadbalance/extensions/ExtensibleLoadManagerImpl.java | 4 ++-- .../loadbalance/extensions/ExtensibleLoadManagerImplTest.java | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index f01171ab066b0..486c32153589f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -18,10 +18,10 @@ */ package org.apache.pulsar.broker.loadbalance.extensions; -import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Success; -import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Admin; import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Follower; import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Leader; +import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Success; +import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Admin; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.ArrayList; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index e69833536e214..354a27a63c02a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -90,6 +90,7 @@ import org.apache.pulsar.broker.resources.TenantResources; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.TableViewImpl; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId;