From 42c23e6da5a366f66a8ea97838881a106ad4f51a Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Thu, 23 Feb 2023 18:18:44 -0800 Subject: [PATCH 1/6] [improve][broker] PIP-192 Added Split Scheduler --- .../pulsar/broker/ServiceConfiguration.java | 24 ++ .../extensions/ExtensibleLoadManagerImpl.java | 17 +- .../channel/ServiceUnitStateChannelImpl.java | 1 - .../extensions/models/SplitCounter.java | 46 +-- .../extensions/models/SplitDecision.java | 9 - .../extensions/scheduler/SplitScheduler.java | 172 +++++++++++ ...faultNamespaceBundleSplitStrategyImpl.java | 178 +++++++++++ .../NamespaceBundleSplitStrategy.java | 7 +- .../ExtensibleLoadManagerImplTest.java | 31 +- .../scheduler/SplitSchedulerTest.java | 148 +++++++++ ...faultNamespaceBundleSplitStrategyTest.java | 284 ++++++++++++++++++ 11 files changed, 854 insertions(+), 63 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java rename pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/{scheduler => strategy}/NamespaceBundleSplitStrategy.java (86%) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index ec5b0d4042bcd..3c00e905ac723 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2557,6 +2557,30 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se + "(only used in load balancer extension logics)" ) private double loadBalancerBundleLoadReportPercentage = 10; + @FieldContext( + category = CATEGORY_LOAD_BALANCER, + doc = "Service units'(bundles) split interval. Broker periodically checks whether " + + "some service units(e.g. bundles) should split if they become hot-spots. " + + "(only used in load balancer extension logics)" + ) + private int loadBalancerSplitIntervalMinutes = 1; + @FieldContext( + category = CATEGORY_LOAD_BALANCER, + dynamic = true, + doc = "Max number of bundles to split to per cycle. " + + "(only used in load balancer extension logics)" + ) + private int loadBalancerMaxNumberOfBundlesToSplitPerCycle = 10; + @FieldContext( + category = CATEGORY_LOAD_BALANCER, + dynamic = true, + doc = "Threshold to the consecutive count of fulfilled split conditions. " + + "If the split scheduler consecutively finds bundles that meet split conditions " + + "many times bigger than this threshold, the scheduler will trigger splits on the bundles " + + "(if the number of bundles is less than loadBalancerNamespaceMaximumBundles). " + + "(only used in load balancer extension logics)" + ) + private int loadBalancerNamespaceBundleSplitConditionThreshold = 5; @FieldContext( category = CATEGORY_LOAD_BALANCER, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 2bebe203d8750..44a49cb4cd416 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -47,13 +47,13 @@ import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager; import org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter; import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; -import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter; import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision; import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter; import org.apache.pulsar.broker.loadbalance.extensions.reporter.TopBundleLoadDataReporter; import org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler; +import org.apache.pulsar.broker.loadbalance.extensions.scheduler.SplitScheduler; import org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadScheduler; import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore; import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException; @@ -102,7 +102,6 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { @Getter private final List brokerFilterPipeline; - /** * The load data reporter. */ @@ -112,6 +111,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { private ScheduledFuture brokerLoadDataReportTask; private ScheduledFuture topBundlesLoadDataReportTask; + private SplitScheduler splitScheduler; private UnloadManager unloadManager; @@ -182,7 +182,6 @@ public void start() throws PulsarServerException { .brokerLoadDataStore(brokerLoadDataStore) .topBundleLoadDataStore(topBundlesLoadDataStore).build(); - this.brokerLoadDataReporter = new BrokerLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), brokerLoadDataStore); @@ -214,10 +213,12 @@ public void start() throws PulsarServerException { interval, interval, TimeUnit.MILLISECONDS); - // TODO: Start bundle split scheduler. this.unloadScheduler = new UnloadScheduler( pulsar.getLoadManagerExecutor(), unloadManager, context, serviceUnitStateChannel); this.unloadScheduler.start(); + this.splitScheduler = new SplitScheduler( + pulsar, serviceUnitStateChannel, splitCounter, splitMetrics, context); + this.splitScheduler.start(); this.started = true; } @@ -376,6 +377,7 @@ public void close() throws PulsarServerException { this.brokerLoadDataStore.close(); this.topBundlesLoadDataStore.close(); this.unloadScheduler.close(); + this.splitScheduler.close(); } catch (IOException ex) { throw new PulsarServerException(ex); } finally { @@ -407,13 +409,6 @@ private void updateUnloadMetrics(UnloadDecision decision) { this.unloadMetrics.set(unloadCounter.toMetrics(pulsar.getAdvertisedAddress())); } - private void updateSplitMetrics(List decisions) { - for (var decision : decisions) { - splitCounter.update(decision); - } - this.splitMetrics.set(splitCounter.toMetrics(pulsar.getAdvertisedAddress())); - } - public List getMetrics() { List metricsCollection = new ArrayList<>(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 5f24e41dda931..50f821f9f2c27 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -882,7 +882,6 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService, return null; }); } - public void handleMetadataSessionEvent(SessionEvent e) { if (e == SessionReestablished || e == SessionLost) { lastMetadataSessionEvent = e; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitCounter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitCounter.java index 99406412cee2b..ed72b5f586331 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitCounter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitCounter.java @@ -19,10 +19,8 @@ package org.apache.pulsar.broker.loadbalance.extensions.models; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure; -import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Skip; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin; -import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Balanced; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Bandwidth; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.MsgRate; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Sessions; @@ -32,7 +30,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.commons.lang3.mutable.MutableLong; +import java.util.concurrent.atomic.AtomicLong; import org.apache.pulsar.common.stats.Metrics; /** @@ -40,23 +38,20 @@ */ public class SplitCounter { - long splitCount = 0; - - final Map> breakdownCounters; + private long splitCount = 0; + private final Map> breakdownCounters; + private volatile long updatedAt = 0; public SplitCounter() { breakdownCounters = Map.of( Success, Map.of( - Topics, new MutableLong(), - Sessions, new MutableLong(), - MsgRate, new MutableLong(), - Bandwidth, new MutableLong(), - Admin, new MutableLong()), - Skip, Map.of( - Balanced, new MutableLong() - ), + Topics, new AtomicLong(), + Sessions, new AtomicLong(), + MsgRate, new AtomicLong(), + Bandwidth, new AtomicLong(), + Admin, new AtomicLong()), Failure, Map.of( - Unknown, new MutableLong()) + Unknown, new AtomicLong()) ); } @@ -64,7 +59,16 @@ public void update(SplitDecision decision) { if (decision.label == Success) { splitCount++; } - breakdownCounters.get(decision.getLabel()).get(decision.getReason()).increment(); + breakdownCounters.get(decision.getLabel()).get(decision.getReason()).incrementAndGet(); + updatedAt = System.currentTimeMillis(); + } + + public void update(SplitDecision.Label label, SplitDecision.Reason reason) { + if (label == Success) { + splitCount++; + } + breakdownCounters.get(label).get(reason).incrementAndGet(); + updatedAt = System.currentTimeMillis(); } public List toMetrics(String advertisedBrokerAddress) { @@ -77,17 +81,18 @@ public List toMetrics(String advertisedBrokerAddress) { m.put("brk_lb_bundles_split_total", splitCount); metrics.add(m); - for (Map.Entry> etr + + for (Map.Entry> etr : breakdownCounters.entrySet()) { var result = etr.getKey(); - for (Map.Entry counter : etr.getValue().entrySet()) { + for (Map.Entry counter : etr.getValue().entrySet()) { var reason = counter.getKey(); var count = counter.getValue(); Map breakdownDims = new HashMap<>(dimensions); breakdownDims.put("result", result.toString()); breakdownDims.put("reason", reason.toString()); Metrics breakdownMetric = Metrics.create(breakdownDims); - breakdownMetric.put("brk_lb_bundles_split_breakdown_total", count); + breakdownMetric.put("brk_lb_bundles_split_breakdown_total", count.get()); metrics.add(breakdownMetric); } } @@ -95,4 +100,7 @@ public List toMetrics(String advertisedBrokerAddress) { return metrics; } + public long updatedAt() { + return updatedAt; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitDecision.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitDecision.java index a3dede50c1cd8..433d21a5a613e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitDecision.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitDecision.java @@ -19,9 +19,7 @@ package org.apache.pulsar.broker.loadbalance.extensions.models; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure; -import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Skip; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success; -import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Balanced; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; import lombok.Data; @@ -36,7 +34,6 @@ public class SplitDecision { public enum Label { Success, - Skip, Failure } @@ -46,7 +43,6 @@ public enum Reason { MsgRate, Bandwidth, Admin, - Balanced, Unknown } @@ -62,11 +58,6 @@ public void clear() { reason = null; } - public void skip() { - label = Skip; - reason = Balanced; - } - public void succeed(Reason reason) { label = Success; this.reason = reason; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java new file mode 100644 index 0000000000000..d948c5d693f16 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.scheduler; + +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; +import org.apache.pulsar.broker.loadbalance.extensions.strategy.DefaultNamespaceBundleSplitStrategyImpl; +import org.apache.pulsar.broker.loadbalance.extensions.strategy.NamespaceBundleSplitStrategy; +import org.apache.pulsar.common.stats.Metrics; +import org.apache.pulsar.common.util.FutureUtil; + +/** + * Service Unit(e.g. bundles) Split scheduler. + */ +@Slf4j +public class SplitScheduler implements LoadManagerScheduler { + + private final PulsarService pulsar; + + private final ScheduledExecutorService loadManagerExecutor; + + private final LoadManagerContext context; + + private final ServiceConfiguration conf; + + private final ServiceUnitStateChannel serviceUnitStateChannel; + + private final NamespaceBundleSplitStrategy bundleSplitStrategy; + + private final SplitCounter counter; + + private final AtomicReference> splitMetrics; + + private volatile ScheduledFuture task; + + private long counterLastUpdatedAt = 0; + + public SplitScheduler(PulsarService pulsar, + ServiceUnitStateChannel serviceUnitStateChannel, + SplitCounter counter, + AtomicReference> splitMetrics, + LoadManagerContext context, + NamespaceBundleSplitStrategy bundleSplitStrategy) { + this.pulsar = pulsar; + this.loadManagerExecutor = pulsar.getLoadManagerExecutor(); + this.counter = counter; + this.splitMetrics = splitMetrics; + this.context = context; + this.conf = pulsar.getConfiguration(); + this.bundleSplitStrategy = bundleSplitStrategy; + this.serviceUnitStateChannel = serviceUnitStateChannel; + } + + public SplitScheduler(PulsarService pulsar, + ServiceUnitStateChannel serviceUnitStateChannel, + SplitCounter counter, + AtomicReference> splitMetrics, + LoadManagerContext context) { + this(pulsar, serviceUnitStateChannel, counter, splitMetrics, context, + new DefaultNamespaceBundleSplitStrategyImpl(counter)); + } + + @Override + public void execute() { + boolean debugMode = conf.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled(); + if (debugMode) { + log.info("Load balancer enabled: {}, Split enabled: {}.", + conf.isLoadBalancerEnabled(), conf.isLoadBalancerAutoBundleSplitEnabled()); + } + + if (!isLoadBalancerAutoBundleSplitEnabled()) { + if (debugMode) { + log.info("The load balancer or load balancer split already disabled. Skipping."); + } + return; + } + + synchronized (bundleSplitStrategy) { + final Set decisions = bundleSplitStrategy.findBundlesToSplit(context, pulsar); + if (!decisions.isEmpty()) { + List> futures = new ArrayList<>(); + for (SplitDecision decision : decisions) { + if (decision.getLabel() == Success) { + var split = decision.getSplit(); + futures.add(serviceUnitStateChannel.publishSplitEventAsync(split) + .whenComplete((__, e) -> { + if (e == null) { + counter.update(decision); + log.info("Published Split Event for {}", split); + } else { + counter.update(Failure, Unknown); + log.error("Failed to publish Split Event for {}", split); + } + })); + } + } + FutureUtil.waitForAll(futures).exceptionally(ex -> { + log.error("Failed to wait for split events to persist.", ex); + counter.update(Failure, Unknown); + return null; + }); + } else { + if (debugMode) { + log.info("BundleSplitStrategy returned no bundles to split."); + } + } + } + + if (counter.updatedAt() > counterLastUpdatedAt) { + splitMetrics.set(counter.toMetrics(pulsar.getAdvertisedAddress())); + counterLastUpdatedAt = counter.updatedAt(); + } + } + + @Override + public void start() { + long interval = TimeUnit.MINUTES + .toMillis(conf.getLoadBalancerSplitIntervalMinutes()); + task = loadManagerExecutor.scheduleAtFixedRate(() -> { + try { + execute(); + } catch (Throwable e) { + log.error("Failed to run the split job.", e); + } + }, interval, interval, TimeUnit.MILLISECONDS); + } + + @Override + public void close() { + if (task != null) { + task.cancel(false); + task = null; + } + } + + private boolean isLoadBalancerAutoBundleSplitEnabled() { + return conf.isLoadBalancerEnabled() && conf.isLoadBalancerAutoBundleSplitEnabled(); + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java new file mode 100644 index 0000000000000..3de585d4b194c --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.strategy; + +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Bandwidth; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.MsgRate; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Sessions; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Topics; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; +import org.apache.pulsar.broker.loadbalance.extensions.models.Split; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; +import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; +import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; + +/** + * Determines which bundles should be split based on various thresholds. + * + * Migrate from {@link org.apache.pulsar.broker.loadbalance.impl.BundleSplitterTask} + */ +@Slf4j +public class DefaultNamespaceBundleSplitStrategyImpl implements NamespaceBundleSplitStrategy { + private final Set decisionCache; + private final Map namespaceBundleCount; + private final Map bundleHighTrafficFrequency; + private final SplitCounter counter; + + public DefaultNamespaceBundleSplitStrategyImpl(SplitCounter counter) { + decisionCache = new HashSet<>(); + namespaceBundleCount = new HashMap<>(); + bundleHighTrafficFrequency = new HashMap<>(); + this.counter = counter; + + } + + @Override + public Set findBundlesToSplit(LoadManagerContext context, PulsarService pulsar) { + decisionCache.clear(); + namespaceBundleCount.clear(); + final ServiceConfiguration conf = pulsar.getConfiguration(); + int maxBundleCount = conf.getLoadBalancerNamespaceMaximumBundles(); + long maxBundleTopics = conf.getLoadBalancerNamespaceBundleMaxTopics(); + long maxBundleSessions = conf.getLoadBalancerNamespaceBundleMaxSessions(); + long maxBundleMsgRate = conf.getLoadBalancerNamespaceBundleMaxMsgRate(); + long maxBundleBandwidth = conf.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * LoadManagerShared.MIBI; + long maxSplitCount = conf.getLoadBalancerMaxNumberOfBundlesToSplitPerCycle(); + long splitConditionThreshold = conf.getLoadBalancerNamespaceBundleSplitConditionThreshold(); + boolean debug = log.isDebugEnabled() || conf.isLoadBalancerDebugModeEnabled(); + + Map bundleStatsMap = pulsar.getBrokerService().getBundleStats(); + NamespaceBundleFactory namespaceBundleFactory = + pulsar.getNamespaceService().getNamespaceBundleFactory(); + + // clean bundleHighTrafficFrequency + var bundleHighTrafficIterator = + bundleHighTrafficFrequency.entrySet().iterator(); + while (bundleHighTrafficIterator.hasNext()) { + String bundle = bundleHighTrafficIterator.next().getKey(); + if (!bundleStatsMap.containsKey(bundle)) { + bundleHighTrafficIterator.remove(); + } + } + + for (var entry : bundleStatsMap.entrySet()) { + final String bundle = entry.getKey(); + final NamespaceBundleStats stats = entry.getValue(); + if (stats.topics < 2) { + if (debug) { + log.info("The count of topics on the bundle {} is less than 2, skip split!", bundle); + } + continue; + } + + final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle); + final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle); + if (!namespaceBundleFactory + .canSplitBundle(namespaceBundleFactory.getBundle(namespaceName, bundleRange))) { + if (debug) { + log.info("Can't split the bundle:{}. invalid bundle range:{}. ", bundle, bundleRange); + } + counter.update(Failure, Unknown); + continue; + } + + double totalMessageRate = stats.msgRateIn + stats.msgRateOut; + double totalMessageThroughput = stats.msgThroughputIn + stats.msgThroughputOut; + int totalSessionCount = stats.consumerCount + stats.producerCount; + SplitDecision.Reason reason = Unknown; + if (stats.topics > maxBundleTopics) { + reason = Topics; + } else if (maxBundleSessions > 0 && (totalSessionCount > maxBundleSessions)) { + reason = Sessions; + } else if (totalMessageRate > maxBundleMsgRate) { + reason = MsgRate; + } else if (totalMessageThroughput > maxBundleBandwidth) { + reason = Bandwidth; + } + + if (reason != Unknown) { + bundleHighTrafficFrequency.put(bundle, bundleHighTrafficFrequency.getOrDefault(bundle, 0) + 1); + } else { + bundleHighTrafficFrequency.remove(bundle); + } + + if (bundleHighTrafficFrequency.getOrDefault(bundle, 0) > splitConditionThreshold) { + final String namespace = LoadManagerShared.getNamespaceNameFromBundleName(bundle); + try { + final int bundleCount = pulsar.getNamespaceService() + .getBundleCount(NamespaceName.get(namespace)); + if ((bundleCount + namespaceBundleCount.getOrDefault(namespace, 0)) + < maxBundleCount) { + if (debug) { + log.info("The bundle {} is considered to split. Topics: {}/{}, Sessions: ({}+{})/{}, " + + "Message Rate: {}/{} (msgs/s), Message Throughput: {}/{} (MB/s)", + bundle, stats.topics, maxBundleTopics, stats.producerCount, stats.consumerCount, + maxBundleSessions, totalMessageRate, maxBundleMsgRate, + totalMessageThroughput / LoadManagerShared.MIBI, + maxBundleBandwidth / LoadManagerShared.MIBI); + } + var decision = new SplitDecision(); + decision.setSplit(new Split(bundle, context.brokerRegistry().getBrokerId(), new HashMap<>())); + decision.succeed(reason); + decisionCache.add(decision); + int bundleNum = namespaceBundleCount.getOrDefault(namespace, 0); + namespaceBundleCount.put(namespace, bundleNum + 1); + bundleHighTrafficFrequency.remove(bundle); + // Clear namespace bundle-cache + namespaceBundleFactory.invalidateBundleCache(NamespaceName.get(namespaceName)); + if (decisionCache.size() == maxSplitCount) { + if (debug) { + log.info("Too many bundles to split in this split cycle {} / {}. Stop.", + decisionCache.size(), maxSplitCount); + } + break; + } + } else { + if (debug) { + log.info( + "Could not split namespace bundle {} because namespace {} has too many bundles:" + + "{}", bundle, namespace, bundleCount); + } + } + } catch (Exception e) { + counter.update(Failure, Unknown); + log.warn("Error while computing bundle splits for namespace {}", namespace, e); + } + } + } + return decisionCache; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceBundleSplitStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/NamespaceBundleSplitStrategy.java similarity index 86% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceBundleSplitStrategy.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/NamespaceBundleSplitStrategy.java index 88bd7f0b08780..14023f1b5b01d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceBundleSplitStrategy.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/NamespaceBundleSplitStrategy.java @@ -16,11 +16,12 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.loadbalance.extensions.scheduler; +package org.apache.pulsar.broker.loadbalance.extensions.strategy; import java.util.Set; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; -import org.apache.pulsar.broker.loadbalance.extensions.models.Split; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; /** * Determines which bundles should be split based on various thresholds. @@ -35,5 +36,5 @@ public interface NamespaceBundleSplitStrategy { * @param context The context used for decisions. * @return A set of the bundles that should be split. */ - Set findBundlesToSplit(LoadManagerContext context); + Set findBundlesToSplit(LoadManagerContext context, PulsarService pulsar); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index ec82f5c383e2e..5273e28208df8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -431,24 +431,16 @@ Unknown, new MutableLong(10)) FieldUtils.readDeclaredField(primaryLoadManager, "splitMetrics", true); SplitCounter splitCounter = new SplitCounter(); FieldUtils.writeDeclaredField(splitCounter, "splitCount", 35l, true); - FieldUtils.writeDeclaredField(splitCounter, "breakdownCounters", new LinkedHashMap<>() { - { - put(SplitDecision.Label.Success, new LinkedHashMap<>() { - { - put(Topics, new MutableLong(1)); - put(Sessions, new MutableLong(2)); - put(MsgRate, new MutableLong(3)); - put(Bandwidth, new MutableLong(4)); - put(Admin, new MutableLong(5)); - } - }); - put(SplitDecision.Label.Skip, Map.of( - SplitDecision.Reason.Balanced, new MutableLong(6) - )); - put(SplitDecision.Label.Failure, Map.of( - SplitDecision.Reason.Unknown, new MutableLong(7))); - } - }, true); + FieldUtils.writeDeclaredField(splitCounter, "breakdownCounters", Map.of( + SplitDecision.Label.Success, Map.of( + Topics, new AtomicLong(1), + Sessions, new AtomicLong(2), + MsgRate, new AtomicLong(3), + Bandwidth, new AtomicLong(4), + Admin, new AtomicLong(5)), + SplitDecision.Label.Failure, Map.of( + SplitDecision.Reason.Unknown, new AtomicLong(6)) + ), true); splitMetrics.set(splitCounter.toMetrics(pulsar.getAdvertisedAddress())); } @@ -523,8 +515,7 @@ SplitDecision.Reason.Balanced, new MutableLong(6) dimensions=[{broker=localhost, metric=bundlesSplit, reason=MsgRate, result=Success}], metrics=[{brk_lb_bundles_split_breakdown_total=3}] dimensions=[{broker=localhost, metric=bundlesSplit, reason=Bandwidth, result=Success}], metrics=[{brk_lb_bundles_split_breakdown_total=4}] dimensions=[{broker=localhost, metric=bundlesSplit, reason=Admin, result=Success}], metrics=[{brk_lb_bundles_split_breakdown_total=5}] - dimensions=[{broker=localhost, metric=bundlesSplit, reason=Balanced, result=Skip}], metrics=[{brk_lb_bundles_split_breakdown_total=6}] - dimensions=[{broker=localhost, metric=bundlesSplit, reason=Unknown, result=Failure}], metrics=[{brk_lb_bundles_split_breakdown_total=7}] + dimensions=[{broker=localhost, metric=bundlesSplit, reason=Unknown, result=Failure}], metrics=[{brk_lb_bundles_split_breakdown_total=6}] dimensions=[{broker=localhost, metric=assign, result=Empty}], metrics=[{brk_lb_assign_broker_breakdown_total=2}] dimensions=[{broker=localhost, metric=assign, result=Skip}], metrics=[{brk_lb_assign_broker_breakdown_total=3}] dimensions=[{broker=localhost, metric=assign, result=Success}], metrics=[{brk_lb_assign_broker_breakdown_total=1}] diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java new file mode 100644 index 0000000000000..4f245fe3626ba --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.scheduler; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; +import org.apache.pulsar.broker.loadbalance.extensions.models.Split; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; +import org.apache.pulsar.broker.loadbalance.extensions.strategy.NamespaceBundleSplitStrategy; +import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.common.stats.Metrics; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class SplitSchedulerTest { + + PulsarService pulsar; + ServiceConfiguration config; + NamespaceBundleFactory namespaceBundleFactory; + LoadManagerContext context; + ServiceUnitStateChannel channel; + NamespaceBundleSplitStrategy strategy; + String bundle1 = "tenant/namespace/0x00000000_0xFFFFFFFF"; + String bundle2 = "tenant/namespace/0x00000000_0x0FFFFFFF"; + String broker = "broker-1"; + SplitDecision decision1; + SplitDecision decision2; + + @BeforeMethod + public void setUp() { + + config = new ServiceConfiguration(); + config.setLoadBalancerDebugModeEnabled(true); + + pulsar = mock(PulsarService.class); + namespaceBundleFactory = mock(NamespaceBundleFactory.class); + context = mock(LoadManagerContext.class); + channel = mock(ServiceUnitStateChannel.class); + strategy = mock(NamespaceBundleSplitStrategy.class); + + doReturn(config).when(pulsar).getConfiguration(); + doReturn(true).when(namespaceBundleFactory).canSplitBundle(any()); + doReturn(CompletableFuture.completedFuture(null)).when(channel).publishSplitEventAsync(any()); + + decision1 = new SplitDecision(); + decision1.setSplit(new Split(bundle1, broker, new HashMap<>())); + decision1.succeed(SplitDecision.Reason.MsgRate); + + decision2 = new SplitDecision(); + decision2.setSplit(new Split(bundle2, broker, new HashMap<>())); + decision2.succeed(SplitDecision.Reason.Sessions); + Set decisions = Set.of(decision1, decision2); + doReturn(decisions).when(strategy).findBundlesToSplit(any(), any()); + } + + @Test(timeOut = 30 * 1000) + public void testExecuteSuccess() { + AtomicReference> reference = new AtomicReference(); + SplitCounter counter = new SplitCounter(); + SplitScheduler scheduler = new SplitScheduler(pulsar, channel, counter, reference, context, strategy); + + scheduler.execute(); + + var counterExpected = new SplitCounter(); + counterExpected.update(decision1); + counterExpected.update(decision2); + verify(channel, times(1)).publishSplitEventAsync(eq(decision1.getSplit())); + verify(channel, times(1)).publishSplitEventAsync(eq(decision2.getSplit())); + + assertEquals(reference.get(), counterExpected.toMetrics(pulsar.getAdvertisedAddress())); + + // Test empty splits. + Set emptyUnload = Set.of(); + doReturn(emptyUnload).when(strategy).findBundlesToSplit(any(), any()); + + scheduler.execute(); + verify(channel, times(2)).publishSplitEventAsync(any()); + assertEquals(reference.get(), counterExpected.toMetrics(pulsar.getAdvertisedAddress())); + } + + @Test(timeOut = 30 * 1000) + public void testExecuteFailure() { + AtomicReference> reference = new AtomicReference(); + SplitCounter counter = new SplitCounter(); + SplitScheduler scheduler = new SplitScheduler(pulsar, channel, counter, reference, context, strategy); + + scheduler.execute(); + + doReturn(CompletableFuture.failedFuture(new RuntimeException())).when(channel).publishSplitEventAsync(any()); + var counterExpected = new SplitCounter(); + counterExpected.update(SplitDecision.Label.Failure, SplitDecision.Reason.Unknown); + counterExpected.update(SplitDecision.Label.Failure, SplitDecision.Reason.Unknown); + verify(channel, times(1)).publishSplitEventAsync(eq(decision1.getSplit())); + verify(channel, times(1)).publishSplitEventAsync(eq(decision2.getSplit())); + + assertEquals(reference.get(), counterExpected.toMetrics(pulsar.getAdvertisedAddress())); + } + + + @Test(timeOut = 30 * 1000) + public void testDisableLoadBalancer() { + + config.setLoadBalancerEnabled(false); + SplitScheduler scheduler = new SplitScheduler(pulsar, channel, null, null, context, strategy); + + scheduler.execute(); + + verify(strategy, times(0)).findBundlesToSplit(any(), any()); + + config.setLoadBalancerEnabled(true); + config.setLoadBalancerAutoBundleSplitEnabled(false); + scheduler.execute(); + + verify(strategy, times(0)).findBundlesToSplit(any(), any()); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java new file mode 100644 index 0000000000000..71606bb85a3fe --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.strategy; + +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry; +import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; +import org.apache.pulsar.broker.loadbalance.extensions.models.Split; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; +import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.PulsarStats; +import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class DefaultNamespaceBundleSplitStrategyTest { + + PulsarService pulsar; + BrokerService brokerService; + PulsarStats pulsarStats; + Map bundleStats; + ServiceConfiguration config; + NamespaceBundleFactory namespaceBundleFactory; + NamespaceService namespaceService; + + LoadManagerContext loadManagerContext; + + BrokerRegistry brokerRegistry; + + String bundle1 = "tenant/namespace/0x00000000_0xFFFFFFFF"; + String bundle2 = "tenant/namespace/0x00000000_0x0FFFFFFF"; + + String broker = "broker-1"; + + @BeforeMethod + void setup() { + config = new ServiceConfiguration(); + config.setLoadBalancerDebugModeEnabled(true); + config.setLoadBalancerNamespaceMaximumBundles(100); + config.setLoadBalancerNamespaceBundleMaxTopics(100); + config.setLoadBalancerNamespaceBundleMaxSessions(100); + config.setLoadBalancerNamespaceBundleMaxMsgRate(100); + config.setLoadBalancerNamespaceBundleMaxBandwidthMbytes(100); + config.setLoadBalancerMaxNumberOfBundlesToSplitPerCycle(1); + config.setLoadBalancerNamespaceBundleSplitConditionThreshold(3); + + pulsar = mock(PulsarService.class); + brokerService = mock(BrokerService.class); + pulsarStats = mock(PulsarStats.class); + namespaceService = mock(NamespaceService.class); + namespaceBundleFactory = mock(NamespaceBundleFactory.class); + loadManagerContext = mock(LoadManagerContext.class); + brokerRegistry = mock(BrokerRegistry.class); + + + + doReturn(brokerService).when(pulsar).getBrokerService(); + doReturn(config).when(pulsar).getConfiguration(); + doReturn(pulsarStats).when(brokerService).getPulsarStats(); + doReturn(namespaceService).when(pulsar).getNamespaceService(); + doReturn(namespaceBundleFactory).when(namespaceService).getNamespaceBundleFactory(); + doReturn(true).when(namespaceBundleFactory).canSplitBundle(any()); + doReturn(brokerRegistry).when(loadManagerContext).brokerRegistry(); + doReturn(broker).when(brokerRegistry).getBrokerId(); + + + bundleStats = new LinkedHashMap<>(); + NamespaceBundleStats stats1 = new NamespaceBundleStats(); + stats1.topics = 5; + bundleStats.put(bundle1, stats1); + NamespaceBundleStats stats2 = new NamespaceBundleStats(); + stats2.topics = 5; + bundleStats.put(bundle2, stats2); + doReturn(bundleStats).when(brokerService).getBundleStats(); + } + + public void testNamespaceBundleSplitConditionThreshold() { + config.setLoadBalancerNamespaceBundleSplitConditionThreshold(0); + bundleStats.values().forEach(v -> v.msgRateIn = config.getLoadBalancerNamespaceBundleMaxMsgRate() + 1); + var strategy = new DefaultNamespaceBundleSplitStrategyImpl(new SplitCounter()); + var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar); + assertEquals(actual.size(), 1); + } + + + public void testNotEnoughTopics() { + config.setLoadBalancerNamespaceBundleSplitConditionThreshold(0); + bundleStats.values().forEach(v -> v.msgRateIn = config.getLoadBalancerNamespaceBundleMaxMsgRate() + 1); + var strategy = new DefaultNamespaceBundleSplitStrategyImpl(new SplitCounter()); + bundleStats.values().forEach(v -> v.topics = 1); + var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar); + var expected = Set.of(); + assertEquals(actual, expected); + } + + public void testNamespaceMaximumBundles() throws Exception { + config.setLoadBalancerNamespaceBundleSplitConditionThreshold(0); + bundleStats.values().forEach(v -> v.msgRateIn = config.getLoadBalancerNamespaceBundleMaxMsgRate() + 1); + var strategy = new DefaultNamespaceBundleSplitStrategyImpl(new SplitCounter()); + doReturn(config.getLoadBalancerNamespaceMaximumBundles()).when(namespaceService).getBundleCount(any()); + var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar); + var expected = Set.of(); + assertEquals(actual, expected); + } + + public void testEmptyBundleStats() { + config.setLoadBalancerNamespaceBundleSplitConditionThreshold(0); + bundleStats.values().forEach(v -> v.msgRateIn = config.getLoadBalancerNamespaceBundleMaxMsgRate() + 1); + var strategy = new DefaultNamespaceBundleSplitStrategyImpl(new SplitCounter()); + bundleStats.clear(); + var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar); + var expected = Set.of(); + assertEquals(actual, expected); + } + + public void testError() throws Exception { + var counter = spy(new SplitCounter()); + config.setLoadBalancerNamespaceBundleSplitConditionThreshold(0); + bundleStats.values().forEach(v -> v.msgRateIn = config.getLoadBalancerNamespaceBundleMaxMsgRate() + 1); + var strategy = new DefaultNamespaceBundleSplitStrategyImpl(counter); + doThrow(new RuntimeException()).when(namespaceService).getBundleCount(any()); + var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar); + var expected = Set.of(); + assertEquals(actual, expected); + verify(counter, times(2)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } + + public void testMaxMsgRate() { + var counter = spy(new SplitCounter()); + var strategy = new DefaultNamespaceBundleSplitStrategyImpl(counter); + int threshold = config.getLoadBalancerNamespaceBundleSplitConditionThreshold(); + bundleStats.values().forEach(v -> { + v.msgRateOut = config.getLoadBalancerNamespaceBundleMaxMsgRate() / 2 + 1; + v.msgRateIn = config.getLoadBalancerNamespaceBundleMaxMsgRate() / 2 + 1; + }); + for (int i = 0; i < threshold + 2; i++) { + var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar); + if (i == threshold) { + SplitDecision decision1 = new SplitDecision(); + decision1.setSplit(new Split(bundle1, broker, new HashMap<>())); + decision1.succeed(SplitDecision.Reason.MsgRate); + + assertEquals(actual, Set.of(decision1)); + verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } else if (i == threshold + 1) { + SplitDecision decision1 = new SplitDecision(); + decision1.setSplit(new Split(bundle2, broker, new HashMap<>())); + decision1.succeed(SplitDecision.Reason.MsgRate); + + assertEquals(actual, Set.of(decision1)); + verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } else { + assertEquals(actual, Set.of()); + verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } + } + } + + public void testMaxTopics() { + var counter = spy(new SplitCounter()); + var strategy = new DefaultNamespaceBundleSplitStrategyImpl(counter); + int threshold = config.getLoadBalancerNamespaceBundleSplitConditionThreshold(); + bundleStats.values().forEach(v -> v.topics = config.getLoadBalancerNamespaceBundleMaxTopics() + 1); + for (int i = 0; i < threshold + 2; i++) { + var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar); + if (i == threshold) { + SplitDecision decision1 = new SplitDecision(); + decision1.setSplit(new Split(bundle1, broker, new HashMap<>())); + decision1.succeed(SplitDecision.Reason.Topics); + + assertEquals(actual, Set.of(decision1)); + verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } else if (i == threshold + 1) { + SplitDecision decision1 = new SplitDecision(); + decision1.setSplit(new Split(bundle2, broker, new HashMap<>())); + decision1.succeed(SplitDecision.Reason.Topics); + + assertEquals(actual, Set.of(decision1)); + verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } else { + assertEquals(actual, Set.of()); + verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } + } + } + + public void testMaxSessions() { + var counter = spy(new SplitCounter()); + var strategy = new DefaultNamespaceBundleSplitStrategyImpl(counter); + int threshold = config.getLoadBalancerNamespaceBundleSplitConditionThreshold(); + bundleStats.values().forEach(v -> { + v.producerCount = config.getLoadBalancerNamespaceBundleMaxSessions() / 2 + 1; + v.consumerCount = config.getLoadBalancerNamespaceBundleMaxSessions() / 2 + 1; + }); + for (int i = 0; i < threshold + 2; i++) { + var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar); + if (i == threshold) { + SplitDecision decision1 = new SplitDecision(); + decision1.setSplit(new Split(bundle1, broker, new HashMap<>())); + decision1.succeed(SplitDecision.Reason.Sessions); + + assertEquals(actual, Set.of(decision1)); + verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } else if (i == threshold + 1) { + SplitDecision decision1 = new SplitDecision(); + decision1.setSplit(new Split(bundle2, broker, new HashMap<>())); + decision1.succeed(SplitDecision.Reason.Sessions); + + assertEquals(actual, Set.of(decision1)); + verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } else { + assertEquals(actual, Set.of()); + verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } + } + } + + public void testMaxBandwidthMbytes() { + var counter = spy(new SplitCounter()); + var strategy = new DefaultNamespaceBundleSplitStrategyImpl(counter); + int threshold = config.getLoadBalancerNamespaceBundleSplitConditionThreshold(); + bundleStats.values().forEach(v -> { + v.msgThroughputOut = config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * 1024 * 1024 / 2 + 1; + v.msgThroughputIn = config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * 1024 * 1024 / 2 + 1; + }); + for (int i = 0; i < threshold + 2; i++) { + var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar); + if (i == threshold) { + SplitDecision decision1 = new SplitDecision(); + decision1.setSplit(new Split(bundle1, broker, new HashMap<>())); + decision1.succeed(SplitDecision.Reason.Bandwidth); + + assertEquals(actual, Set.of(decision1)); + verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } else if (i == threshold + 1) { + SplitDecision decision1 = new SplitDecision(); + decision1.setSplit(new Split(bundle2, broker, new HashMap<>())); + decision1.succeed(SplitDecision.Reason.Bandwidth); + + assertEquals(actual, Set.of(decision1)); + verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } else { + assertEquals(actual, Set.of()); + verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } + } + } + +} From c8563446d9d6313c0df5a82783e652417e5d0f82 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Tue, 7 Mar 2023 18:13:56 -0800 Subject: [PATCH 2/6] Added SplitManager --- .../extensions/ExtensibleLoadManagerImpl.java | 7 +- .../channel/ServiceUnitStateChannelImpl.java | 1 + .../extensions/manager/SplitManager.java | 121 +++++++++++ .../extensions/scheduler/SplitScheduler.java | 47 +++-- ...faultNamespaceBundleSplitStrategyImpl.java | 9 +- .../extensions/manager/SplitManagerTest.java | 193 ++++++++++++++++++ .../scheduler/SplitSchedulerTest.java | 20 +- 7 files changed, 368 insertions(+), 30 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 44a49cb4cd416..451e26e5519ea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -44,6 +44,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter; import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerMaxTopicCountFilter; import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerVersionFilter; +import org.apache.pulsar.broker.loadbalance.extensions.manager.SplitManager; import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager; import org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter; import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; @@ -115,6 +116,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { private UnloadManager unloadManager; + private SplitManager splitManager; + private boolean started = false; private final AssignCounter assignCounter = new AssignCounter(); @@ -164,7 +167,9 @@ public void start() throws PulsarServerException { this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar); this.brokerRegistry.start(); this.unloadManager = new UnloadManager(); + this.splitManager = new SplitManager(splitCounter); this.serviceUnitStateChannel.listen(unloadManager); + this.serviceUnitStateChannel.listen(splitManager); this.serviceUnitStateChannel.start(); try { @@ -217,7 +222,7 @@ public void start() throws PulsarServerException { pulsar.getLoadManagerExecutor(), unloadManager, context, serviceUnitStateChannel); this.unloadScheduler.start(); this.splitScheduler = new SplitScheduler( - pulsar, serviceUnitStateChannel, splitCounter, splitMetrics, context); + pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context); this.splitScheduler.start(); this.started = true; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 50f821f9f2c27..5f24e41dda931 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -882,6 +882,7 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService, return null; }); } + public void handleMetadataSessionEvent(SessionEvent e) { if (e == SessionReestablished || e == SessionLost) { lastMetadataSessionEvent = e; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java new file mode 100644 index 0000000000000..f7c6ae5e792cb --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.manager; + +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; + +/** + * Split manager. + */ +@Slf4j +public class SplitManager implements StateChangeListener { + + record InFlightSplitRequest(SplitDecision splitDecision, CompletableFuture future) { + } + + private final Map inFlightSplitRequests; + + private final SplitCounter counter; + + public SplitManager(SplitCounter splitCounter) { + this.inFlightSplitRequests = new ConcurrentHashMap<>(); + this.counter = splitCounter; + } + + private void complete(String serviceUnit, Throwable ex) { + inFlightSplitRequests.computeIfPresent(serviceUnit, (__, inFlightSplitRequest) -> { + var future = inFlightSplitRequest.future; + if (!future.isDone()) { + if (ex != null) { + counter.update(Failure, Unknown); + future.completeExceptionally(ex); + if (log.isDebugEnabled()) { + log.debug("Complete exceptionally split bundle: {}", serviceUnit, ex); + } + } else { + counter.update(inFlightSplitRequest.splitDecision); + future.complete(null); + if (log.isDebugEnabled()) { + log.debug("Complete split bundle: {}", serviceUnit); + } + } + } + return null; + }); + } + + public CompletableFuture waitAsync(CompletableFuture eventPubFuture, + String bundle, + SplitDecision decision, + long timeout, + TimeUnit timeoutUnit) { + + return eventPubFuture.thenCompose(__ -> inFlightSplitRequests.computeIfAbsent(bundle, ignore -> { + if (log.isDebugEnabled()) { + log.debug("Handle split bundle: {}, timeout: {} {}", bundle, timeout, timeoutUnit); + } + CompletableFuture future = new CompletableFuture<>(); + future.orTimeout(timeout, timeoutUnit).whenComplete((v, ex) -> { + if (ex != null) { + inFlightSplitRequests.remove(bundle); + log.warn("Failed to wait for split for serviceUnit: {}", bundle, ex); + } + }); + return new InFlightSplitRequest(decision, future); + }).future); + } + + @Override + public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) { + ServiceUnitState state = ServiceUnitStateData.state(data); + if (t != null && inFlightSplitRequests.containsKey(serviceUnit)) { + this.complete(serviceUnit, t); + return; + } + switch (state) { + case Deleted, Owned, Init -> this.complete(serviceUnit, t); + default -> { + if (log.isDebugEnabled()) { + log.debug("Handling {} for service unit {}", data, serviceUnit); + } + } + } + } + + public void close() { + inFlightSplitRequests.forEach((bundle, inFlightSplitRequest) -> { + if (!inFlightSplitRequest.future.isDone()) { + String msg = String.format("Splitting bundle: %s, but the manager already closed.", bundle); + log.warn(msg); + inFlightSplitRequest.future.completeExceptionally(new IllegalStateException(msg)); + } + }); + inFlightSplitRequests.clear(); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java index d948c5d693f16..bc192955d1f2e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java @@ -34,6 +34,7 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; +import org.apache.pulsar.broker.loadbalance.extensions.manager.SplitManager; import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; import org.apache.pulsar.broker.loadbalance.extensions.strategy.DefaultNamespaceBundleSplitStrategyImpl; @@ -61,6 +62,8 @@ public class SplitScheduler implements LoadManagerScheduler { private final SplitCounter counter; + private final SplitManager splitManager; + private final AtomicReference> splitMetrics; private volatile ScheduledFuture task; @@ -69,12 +72,14 @@ public class SplitScheduler implements LoadManagerScheduler { public SplitScheduler(PulsarService pulsar, ServiceUnitStateChannel serviceUnitStateChannel, + SplitManager splitManager, SplitCounter counter, AtomicReference> splitMetrics, LoadManagerContext context, NamespaceBundleSplitStrategy bundleSplitStrategy) { this.pulsar = pulsar; this.loadManagerExecutor = pulsar.getLoadManagerExecutor(); + this.splitManager = splitManager; this.counter = counter; this.splitMetrics = splitMetrics; this.context = context; @@ -85,10 +90,11 @@ public SplitScheduler(PulsarService pulsar, public SplitScheduler(PulsarService pulsar, ServiceUnitStateChannel serviceUnitStateChannel, + SplitManager splitManager, SplitCounter counter, AtomicReference> splitMetrics, LoadManagerContext context) { - this(pulsar, serviceUnitStateChannel, counter, splitMetrics, context, + this(pulsar, serviceUnitStateChannel, splitManager, counter, splitMetrics, context, new DefaultNamespaceBundleSplitStrategyImpl(counter)); } @@ -110,27 +116,36 @@ public void execute() { synchronized (bundleSplitStrategy) { final Set decisions = bundleSplitStrategy.findBundlesToSplit(context, pulsar); if (!decisions.isEmpty()) { + + // currently following the unloading timeout + var asyncOpTimeoutMs = conf.getNamespaceBundleUnloadingTimeoutMs(); List> futures = new ArrayList<>(); for (SplitDecision decision : decisions) { if (decision.getLabel() == Success) { var split = decision.getSplit(); - futures.add(serviceUnitStateChannel.publishSplitEventAsync(split) - .whenComplete((__, e) -> { - if (e == null) { - counter.update(decision); - log.info("Published Split Event for {}", split); - } else { - counter.update(Failure, Unknown); - log.error("Failed to publish Split Event for {}", split); - } - })); + futures.add( + splitManager.waitAsync( + serviceUnitStateChannel.publishSplitEventAsync(split) + .whenComplete((__, e) -> { + if (e == null) { + log.info("Published Split Event for {}", split); + } else { + counter.update(Failure, Unknown); + log.error("Failed to publish Split Event for {}", split); + } + }), + split.serviceUnit(), + decision, + asyncOpTimeoutMs, TimeUnit.MILLISECONDS) + ); } } - FutureUtil.waitForAll(futures).exceptionally(ex -> { - log.error("Failed to wait for split events to persist.", ex); - counter.update(Failure, Unknown); - return null; - }); + try { + FutureUtil.waitForAll(futures) + .get(asyncOpTimeoutMs, TimeUnit.MILLISECONDS); + } catch (Throwable e) { + log.error("Failed to wait for split events to persist.", e); + } } else { if (debugMode) { log.info("BundleSplitStrategy returned no bundles to split."); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java index 3de585d4b194c..e572fd4161bdb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java @@ -79,14 +79,7 @@ public Set findBundlesToSplit(LoadManagerContext context, PulsarS pulsar.getNamespaceService().getNamespaceBundleFactory(); // clean bundleHighTrafficFrequency - var bundleHighTrafficIterator = - bundleHighTrafficFrequency.entrySet().iterator(); - while (bundleHighTrafficIterator.hasNext()) { - String bundle = bundleHighTrafficIterator.next().getKey(); - if (!bundleStatsMap.containsKey(bundle)) { - bundleHighTrafficIterator.remove(); - } - } + bundleHighTrafficFrequency.keySet().retainAll(bundleStatsMap.keySet()); for (var entry : bundleStatsMap.entrySet()) { final String bundle = entry.getKey(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java new file mode 100644 index 0000000000000..166fb03e968ee --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.manager; + +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.VERSION_ID_INIT; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Sessions; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; +import org.apache.pulsar.common.util.FutureUtil; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class SplitManagerTest { + + String bundle = "bundle-1"; + + String dstBroker = "broker-1"; + + @Test + public void testEventPubFutureHasException() { + SplitManager manager = new SplitManager(new SplitCounter()); + var decision = new SplitDecision(); + CompletableFuture future = + manager.waitAsync(FutureUtil.failedFuture(new Exception("test")), + bundle, decision, 10, TimeUnit.SECONDS); + + assertTrue(future.isCompletedExceptionally()); + try { + future.get(); + fail(); + } catch (Exception ex) { + assertEquals(ex.getCause().getMessage(), "test"); + } + } + + @Test + public void testTimeout() throws IllegalAccessException { + SplitManager manager = new SplitManager(new SplitCounter()); + var decision = new SplitDecision(); + CompletableFuture future = + manager.waitAsync(CompletableFuture.completedFuture(null), + bundle, decision, 3, TimeUnit.SECONDS); + var inFlightUnloadRequests = getinFlightUnloadRequests(manager); + + assertEquals(inFlightUnloadRequests.size(), 1); + + try { + future.get(); + fail(); + } catch (Exception ex) { + assertTrue(ex.getCause() instanceof TimeoutException); + } + + assertEquals(inFlightUnloadRequests.size(), 0); + } + + @Test + public void testSuccess() throws IllegalAccessException, ExecutionException, InterruptedException { + SplitManager manager = new SplitManager(new SplitCounter()); + var decision = new SplitDecision(); + decision.succeed(Sessions); + CompletableFuture future = + manager.waitAsync(CompletableFuture.completedFuture(null), + bundle, decision, 5, TimeUnit.SECONDS); + var inFlightUnloadRequests = getinFlightUnloadRequests(manager); + + assertEquals(inFlightUnloadRequests.size(), 1); + + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Assigning, dstBroker, VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequests.size(), 1); + + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Splitting, dstBroker, VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequests.size(), 1); + + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Releasing, dstBroker, VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequests.size(), 1); + + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequests.size(), 1); + + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Deleted, dstBroker, VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequests.size(), 0); + + // Success with Init state. + future = manager.waitAsync(CompletableFuture.completedFuture(null), + bundle, decision, 5, TimeUnit.SECONDS); + inFlightUnloadRequests = getinFlightUnloadRequests(manager); + assertEquals(inFlightUnloadRequests.size(), 1); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Init, dstBroker, VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequests.size(), 0); + future.get(); + + // Success with Owned state. + future = manager.waitAsync(CompletableFuture.completedFuture(null), + bundle, decision, 5, TimeUnit.SECONDS); + inFlightUnloadRequests = getinFlightUnloadRequests(manager); + assertEquals(inFlightUnloadRequests.size(), 1); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequests.size(), 0); + future.get(); + } + + @Test + public void testFailedStage() throws IllegalAccessException { + SplitManager manager = new SplitManager(new SplitCounter()); + var decision = new SplitDecision(); + CompletableFuture future = + manager.waitAsync(CompletableFuture.completedFuture(null), + bundle, decision, 5, TimeUnit.SECONDS); + var inFlightUnloadRequests = getinFlightUnloadRequests(manager); + + assertEquals(inFlightUnloadRequests.size(), 1); + + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, VERSION_ID_INIT), + new IllegalStateException("Failed stage.")); + + try { + future.get(); + fail(); + } catch (Exception ex) { + assertTrue(ex.getCause() instanceof IllegalStateException); + assertEquals(ex.getCause().getMessage(), "Failed stage."); + } + + assertEquals(inFlightUnloadRequests.size(), 0); + } + + @Test + public void testClose() throws IllegalAccessException { + SplitManager manager = new SplitManager(new SplitCounter()); + var decision = new SplitDecision(); + CompletableFuture future = + manager.waitAsync(CompletableFuture.completedFuture(null), + bundle, decision, 5, TimeUnit.SECONDS); + var inFlightUnloadRequests = getinFlightUnloadRequests(manager); + assertEquals(inFlightUnloadRequests.size(), 1); + manager.close(); + assertEquals(inFlightUnloadRequests.size(), 0); + + try { + future.get(); + fail(); + } catch (Exception ex) { + assertTrue(ex.getCause() instanceof IllegalStateException); + } + } + + private Map getinFlightUnloadRequests(SplitManager manager) + throws IllegalAccessException { + Map inFlightUnloadRequest = + (Map) FieldUtils.readField(manager, "inFlightSplitRequests", true); + + return inFlightUnloadRequest; + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java index 4f245fe3626ba..cb9ecab814331 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java @@ -19,7 +19,9 @@ package org.apache.pulsar.broker.loadbalance.extensions.scheduler; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -34,6 +36,7 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; +import org.apache.pulsar.broker.loadbalance.extensions.manager.SplitManager; import org.apache.pulsar.broker.loadbalance.extensions.models.Split; import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; @@ -89,8 +92,13 @@ public void setUp() { public void testExecuteSuccess() { AtomicReference> reference = new AtomicReference(); SplitCounter counter = new SplitCounter(); - SplitScheduler scheduler = new SplitScheduler(pulsar, channel, counter, reference, context, strategy); - + SplitManager manager = mock(SplitManager.class); + SplitScheduler scheduler = new SplitScheduler(pulsar, channel, manager, counter, reference, context, strategy); + doAnswer((invocation)->{ + var decision = invocation.getArgument(2, SplitDecision.class); + counter.update(decision); + return CompletableFuture.completedFuture(null); + }).when(manager).waitAsync(any(), any(), any(), anyLong(), any()); scheduler.execute(); var counterExpected = new SplitCounter(); @@ -114,11 +122,13 @@ public void testExecuteSuccess() { public void testExecuteFailure() { AtomicReference> reference = new AtomicReference(); SplitCounter counter = new SplitCounter(); - SplitScheduler scheduler = new SplitScheduler(pulsar, channel, counter, reference, context, strategy); + SplitManager manager = new SplitManager(counter); + SplitScheduler scheduler = new SplitScheduler(pulsar, channel, manager, counter, reference, context, strategy); + doReturn(CompletableFuture.failedFuture(new RuntimeException())).when(channel).publishSplitEventAsync(any()); scheduler.execute(); - doReturn(CompletableFuture.failedFuture(new RuntimeException())).when(channel).publishSplitEventAsync(any()); + var counterExpected = new SplitCounter(); counterExpected.update(SplitDecision.Label.Failure, SplitDecision.Reason.Unknown); counterExpected.update(SplitDecision.Label.Failure, SplitDecision.Reason.Unknown); @@ -133,7 +143,7 @@ public void testExecuteFailure() { public void testDisableLoadBalancer() { config.setLoadBalancerEnabled(false); - SplitScheduler scheduler = new SplitScheduler(pulsar, channel, null, null, context, strategy); + SplitScheduler scheduler = new SplitScheduler(pulsar, channel, null, null, null, context, strategy); scheduler.execute(); From 02c927448609162ba40252c7e1d9999572a5788b Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Wed, 8 Mar 2023 09:52:27 -0800 Subject: [PATCH 3/6] Updated pub failure counter --- .../extensions/manager/SplitManager.java | 41 ++++++++++--------- .../extensions/scheduler/SplitScheduler.java | 12 +----- .../scheduler/SplitSchedulerTest.java | 6 +-- 3 files changed, 25 insertions(+), 34 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java index f7c6ae5e792cb..5c9d584f784b2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java @@ -55,15 +55,11 @@ private void complete(String serviceUnit, Throwable ex) { if (ex != null) { counter.update(Failure, Unknown); future.completeExceptionally(ex); - if (log.isDebugEnabled()) { - log.debug("Complete exceptionally split bundle: {}", serviceUnit, ex); - } + log.error("Failed the bundle split event: {}", serviceUnit, ex); } else { counter.update(inFlightSplitRequest.splitDecision); future.complete(null); - if (log.isDebugEnabled()) { - log.debug("Complete split bundle: {}", serviceUnit); - } + log.info("Completed the bundle split event: {}", serviceUnit); } } return null; @@ -75,20 +71,25 @@ public CompletableFuture waitAsync(CompletableFuture eventPubFuture, SplitDecision decision, long timeout, TimeUnit timeoutUnit) { - - return eventPubFuture.thenCompose(__ -> inFlightSplitRequests.computeIfAbsent(bundle, ignore -> { - if (log.isDebugEnabled()) { - log.debug("Handle split bundle: {}, timeout: {} {}", bundle, timeout, timeoutUnit); - } - CompletableFuture future = new CompletableFuture<>(); - future.orTimeout(timeout, timeoutUnit).whenComplete((v, ex) -> { - if (ex != null) { - inFlightSplitRequests.remove(bundle); - log.warn("Failed to wait for split for serviceUnit: {}", bundle, ex); - } - }); - return new InFlightSplitRequest(decision, future); - }).future); + return eventPubFuture + .thenCompose(__ -> inFlightSplitRequests.computeIfAbsent(bundle, ignore -> { + log.info("Published the bundle split event for bundle:{}. " + + "Waiting the split event to complete. Timeout: {} {}", + bundle, timeout, timeoutUnit); + CompletableFuture future = new CompletableFuture<>(); + future.orTimeout(timeout, timeoutUnit).whenComplete((v, ex) -> { + if (ex != null) { + inFlightSplitRequests.remove(bundle); + log.warn("Timed out while waiting for the bundle split event: {}", bundle, ex); + } + }); + return new InFlightSplitRequest(decision, future); + }).future) + .exceptionally(e -> { + log.error("Failed to publish the bundle split event for bundle:{}. Skipping wait.", bundle); + counter.update(Failure, Unknown); + return null; + }); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java index bc192955d1f2e..589df80fc5c14 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java @@ -18,9 +18,7 @@ */ package org.apache.pulsar.broker.loadbalance.extensions.scheduler; -import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success; -import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -125,15 +123,7 @@ public void execute() { var split = decision.getSplit(); futures.add( splitManager.waitAsync( - serviceUnitStateChannel.publishSplitEventAsync(split) - .whenComplete((__, e) -> { - if (e == null) { - log.info("Published Split Event for {}", split); - } else { - counter.update(Failure, Unknown); - log.error("Failed to publish Split Event for {}", split); - } - }), + serviceUnitStateChannel.publishSplitEventAsync(split), split.serviceUnit(), decision, asyncOpTimeoutMs, TimeUnit.MILLISECONDS) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java index cb9ecab814331..7988aa413366f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java @@ -107,7 +107,7 @@ public void testExecuteSuccess() { verify(channel, times(1)).publishSplitEventAsync(eq(decision1.getSplit())); verify(channel, times(1)).publishSplitEventAsync(eq(decision2.getSplit())); - assertEquals(reference.get(), counterExpected.toMetrics(pulsar.getAdvertisedAddress())); + assertEquals(reference.get().toString(), counterExpected.toMetrics(pulsar.getAdvertisedAddress()).toString()); // Test empty splits. Set emptyUnload = Set.of(); @@ -115,7 +115,7 @@ public void testExecuteSuccess() { scheduler.execute(); verify(channel, times(2)).publishSplitEventAsync(any()); - assertEquals(reference.get(), counterExpected.toMetrics(pulsar.getAdvertisedAddress())); + assertEquals(reference.get().toString(), counterExpected.toMetrics(pulsar.getAdvertisedAddress()).toString()); } @Test(timeOut = 30 * 1000) @@ -135,7 +135,7 @@ public void testExecuteFailure() { verify(channel, times(1)).publishSplitEventAsync(eq(decision1.getSplit())); verify(channel, times(1)).publishSplitEventAsync(eq(decision2.getSplit())); - assertEquals(reference.get(), counterExpected.toMetrics(pulsar.getAdvertisedAddress())); + assertEquals(reference.get().toString(), counterExpected.toMetrics(pulsar.getAdvertisedAddress()).toString()); } From 0d88959cdc5ac9c9eb60d46c59c5b07d399604f6 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Wed, 8 Mar 2023 19:08:53 -0800 Subject: [PATCH 4/6] resolved comment --- .../loadbalance/extensions/manager/SplitManager.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java index 5c9d584f784b2..bdc93af27b109 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java @@ -85,10 +85,11 @@ public CompletableFuture waitAsync(CompletableFuture eventPubFuture, }); return new InFlightSplitRequest(decision, future); }).future) - .exceptionally(e -> { - log.error("Failed to publish the bundle split event for bundle:{}. Skipping wait.", bundle); - counter.update(Failure, Unknown); - return null; + .whenComplete((__, ex) -> { + if (ex != null) { + log.error("Failed to publish the bundle split event for bundle:{}. Skipping wait.", bundle); + counter.update(Failure, Unknown); + } }); } From 1ac2868f0a9f263905010fb32d0b8e06c08c6869 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Wed, 8 Mar 2023 21:08:27 -0800 Subject: [PATCH 5/6] fixed failure count --- .../extensions/manager/SplitManager.java | 9 ++--- .../extensions/manager/SplitManagerTest.java | 37 +++++++++++++++++-- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java index bdc93af27b109..e2a706a16b45e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java @@ -53,13 +53,9 @@ private void complete(String serviceUnit, Throwable ex) { var future = inFlightSplitRequest.future; if (!future.isDone()) { if (ex != null) { - counter.update(Failure, Unknown); future.completeExceptionally(ex); - log.error("Failed the bundle split event: {}", serviceUnit, ex); } else { - counter.update(inFlightSplitRequest.splitDecision); future.complete(null); - log.info("Completed the bundle split event: {}", serviceUnit); } } return null; @@ -87,8 +83,11 @@ public CompletableFuture waitAsync(CompletableFuture eventPubFuture, }).future) .whenComplete((__, ex) -> { if (ex != null) { - log.error("Failed to publish the bundle split event for bundle:{}. Skipping wait.", bundle); + log.error("Failed the bundle split event for bundle:{}", bundle, ex); counter.update(Failure, Unknown); + } else { + log.info("Completed the bundle split event for bundle:{}", bundle); + counter.update(decision); } }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java index 166fb03e968ee..af1a1aadeee32 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java @@ -20,6 +20,7 @@ import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.VERSION_ID_INIT; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Sessions; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -47,7 +48,8 @@ public class SplitManagerTest { @Test public void testEventPubFutureHasException() { - SplitManager manager = new SplitManager(new SplitCounter()); + var counter = new SplitCounter(); + SplitManager manager = new SplitManager(counter); var decision = new SplitDecision(); CompletableFuture future = manager.waitAsync(FutureUtil.failedFuture(new Exception("test")), @@ -60,11 +62,16 @@ public void testEventPubFutureHasException() { } catch (Exception ex) { assertEquals(ex.getCause().getMessage(), "test"); } + var counterExpected = new SplitCounter(); + counterExpected.update(SplitDecision.Label.Failure, Unknown); + assertEquals(counter.toMetrics(null).toString(), + counterExpected.toMetrics(null).toString()); } @Test public void testTimeout() throws IllegalAccessException { - SplitManager manager = new SplitManager(new SplitCounter()); + var counter = new SplitCounter(); + SplitManager manager = new SplitManager(counter); var decision = new SplitDecision(); CompletableFuture future = manager.waitAsync(CompletableFuture.completedFuture(null), @@ -81,11 +88,17 @@ public void testTimeout() throws IllegalAccessException { } assertEquals(inFlightUnloadRequests.size(), 0); + var counterExpected = new SplitCounter(); + counterExpected.update(SplitDecision.Label.Failure, Unknown); + assertEquals(counter.toMetrics(null).toString(), + counterExpected.toMetrics(null).toString()); } @Test public void testSuccess() throws IllegalAccessException, ExecutionException, InterruptedException { - SplitManager manager = new SplitManager(new SplitCounter()); + var counter = new SplitCounter(); + SplitManager manager = new SplitManager(counter); + var counterExpected = new SplitCounter(); var decision = new SplitDecision(); decision.succeed(Sessions); CompletableFuture future = @@ -110,10 +123,15 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequests.size(), 1); + assertEquals(counter.toMetrics(null).toString(), + counterExpected.toMetrics(null).toString()); manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Deleted, dstBroker, VERSION_ID_INIT), null); + counterExpected.update(SplitDecision.Label.Success, Sessions); assertEquals(inFlightUnloadRequests.size(), 0); + assertEquals(counter.toMetrics(null).toString(), + counterExpected.toMetrics(null).toString()); // Success with Init state. future = manager.waitAsync(CompletableFuture.completedFuture(null), @@ -123,6 +141,9 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Init, dstBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequests.size(), 0); + counterExpected.update(SplitDecision.Label.Success, Sessions); + assertEquals(counter.toMetrics(null).toString(), + counterExpected.toMetrics(null).toString()); future.get(); // Success with Owned state. @@ -133,12 +154,16 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequests.size(), 0); + counterExpected.update(SplitDecision.Label.Success, Sessions); + assertEquals(counter.toMetrics(null).toString(), + counterExpected.toMetrics(null).toString()); future.get(); } @Test public void testFailedStage() throws IllegalAccessException { - SplitManager manager = new SplitManager(new SplitCounter()); + var counter = new SplitCounter(); + SplitManager manager = new SplitManager(counter); var decision = new SplitDecision(); CompletableFuture future = manager.waitAsync(CompletableFuture.completedFuture(null), @@ -160,6 +185,10 @@ public void testFailedStage() throws IllegalAccessException { } assertEquals(inFlightUnloadRequests.size(), 0); + var counterExpected = new SplitCounter(); + counterExpected.update(SplitDecision.Label.Failure, Unknown); + assertEquals(counter.toMetrics(null).toString(), + counterExpected.toMetrics(null).toString()); } @Test From 0d245a866834c516dab216aab6ad2ef87948c4f1 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Wed, 8 Mar 2023 21:31:48 -0800 Subject: [PATCH 6/6] removed InflightReuqest record --- .../extensions/manager/SplitManager.java | 17 +++++++---------- .../extensions/manager/SplitManagerTest.java | 6 +++--- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java index e2a706a16b45e..71ebbc92a87db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java @@ -36,10 +36,8 @@ @Slf4j public class SplitManager implements StateChangeListener { - record InFlightSplitRequest(SplitDecision splitDecision, CompletableFuture future) { - } - private final Map inFlightSplitRequests; + private final Map> inFlightSplitRequests; private final SplitCounter counter; @@ -49,8 +47,7 @@ public SplitManager(SplitCounter splitCounter) { } private void complete(String serviceUnit, Throwable ex) { - inFlightSplitRequests.computeIfPresent(serviceUnit, (__, inFlightSplitRequest) -> { - var future = inFlightSplitRequest.future; + inFlightSplitRequests.computeIfPresent(serviceUnit, (__, future) -> { if (!future.isDone()) { if (ex != null) { future.completeExceptionally(ex); @@ -79,8 +76,8 @@ public CompletableFuture waitAsync(CompletableFuture eventPubFuture, log.warn("Timed out while waiting for the bundle split event: {}", bundle, ex); } }); - return new InFlightSplitRequest(decision, future); - }).future) + return future; + })) .whenComplete((__, ex) -> { if (ex != null) { log.error("Failed the bundle split event for bundle:{}", bundle, ex); @@ -110,11 +107,11 @@ public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable } public void close() { - inFlightSplitRequests.forEach((bundle, inFlightSplitRequest) -> { - if (!inFlightSplitRequest.future.isDone()) { + inFlightSplitRequests.forEach((bundle, future) -> { + if (!future.isDone()) { String msg = String.format("Splitting bundle: %s, but the manager already closed.", bundle); log.warn(msg); - inFlightSplitRequest.future.completeExceptionally(new IllegalStateException(msg)); + future.completeExceptionally(new IllegalStateException(msg)); } }); inFlightSplitRequests.clear(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java index af1a1aadeee32..3287306ab48ba 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java @@ -211,10 +211,10 @@ public void testClose() throws IllegalAccessException { } } - private Map getinFlightUnloadRequests(SplitManager manager) + private Map> getinFlightUnloadRequests(SplitManager manager) throws IllegalAccessException { - Map inFlightUnloadRequest = - (Map) FieldUtils.readField(manager, "inFlightSplitRequests", true); + var inFlightUnloadRequest = + (Map>) FieldUtils.readField(manager, "inFlightSplitRequests", true); return inFlightUnloadRequest; }