diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index ec5b0d4042bcd..3c00e905ac723 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2557,6 +2557,30 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se + "(only used in load balancer extension logics)" ) private double loadBalancerBundleLoadReportPercentage = 10; + @FieldContext( + category = CATEGORY_LOAD_BALANCER, + doc = "Service units'(bundles) split interval. Broker periodically checks whether " + + "some service units(e.g. bundles) should split if they become hot-spots. " + + "(only used in load balancer extension logics)" + ) + private int loadBalancerSplitIntervalMinutes = 1; + @FieldContext( + category = CATEGORY_LOAD_BALANCER, + dynamic = true, + doc = "Max number of bundles to split to per cycle. " + + "(only used in load balancer extension logics)" + ) + private int loadBalancerMaxNumberOfBundlesToSplitPerCycle = 10; + @FieldContext( + category = CATEGORY_LOAD_BALANCER, + dynamic = true, + doc = "Threshold to the consecutive count of fulfilled split conditions. " + + "If the split scheduler consecutively finds bundles that meet split conditions " + + "many times bigger than this threshold, the scheduler will trigger splits on the bundles " + + "(if the number of bundles is less than loadBalancerNamespaceMaximumBundles). " + + "(only used in load balancer extension logics)" + ) + private int loadBalancerNamespaceBundleSplitConditionThreshold = 5; @FieldContext( category = CATEGORY_LOAD_BALANCER, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 2bebe203d8750..451e26e5519ea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -44,16 +44,17 @@ import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter; import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerMaxTopicCountFilter; import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerVersionFilter; +import org.apache.pulsar.broker.loadbalance.extensions.manager.SplitManager; import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager; import org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter; import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; -import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter; import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision; import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter; import org.apache.pulsar.broker.loadbalance.extensions.reporter.TopBundleLoadDataReporter; import org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler; +import org.apache.pulsar.broker.loadbalance.extensions.scheduler.SplitScheduler; import org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadScheduler; import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore; import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException; @@ -102,7 +103,6 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { @Getter private final List brokerFilterPipeline; - /** * The load data reporter. */ @@ -112,9 +112,12 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { private ScheduledFuture brokerLoadDataReportTask; private ScheduledFuture topBundlesLoadDataReportTask; + private SplitScheduler splitScheduler; private UnloadManager unloadManager; + private SplitManager splitManager; + private boolean started = false; private final AssignCounter assignCounter = new AssignCounter(); @@ -164,7 +167,9 @@ public void start() throws PulsarServerException { this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar); this.brokerRegistry.start(); this.unloadManager = new UnloadManager(); + this.splitManager = new SplitManager(splitCounter); this.serviceUnitStateChannel.listen(unloadManager); + this.serviceUnitStateChannel.listen(splitManager); this.serviceUnitStateChannel.start(); try { @@ -182,7 +187,6 @@ public void start() throws PulsarServerException { .brokerLoadDataStore(brokerLoadDataStore) .topBundleLoadDataStore(topBundlesLoadDataStore).build(); - this.brokerLoadDataReporter = new BrokerLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), brokerLoadDataStore); @@ -214,10 +218,12 @@ public void start() throws PulsarServerException { interval, interval, TimeUnit.MILLISECONDS); - // TODO: Start bundle split scheduler. this.unloadScheduler = new UnloadScheduler( pulsar.getLoadManagerExecutor(), unloadManager, context, serviceUnitStateChannel); this.unloadScheduler.start(); + this.splitScheduler = new SplitScheduler( + pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context); + this.splitScheduler.start(); this.started = true; } @@ -376,6 +382,7 @@ public void close() throws PulsarServerException { this.brokerLoadDataStore.close(); this.topBundlesLoadDataStore.close(); this.unloadScheduler.close(); + this.splitScheduler.close(); } catch (IOException ex) { throw new PulsarServerException(ex); } finally { @@ -407,13 +414,6 @@ private void updateUnloadMetrics(UnloadDecision decision) { this.unloadMetrics.set(unloadCounter.toMetrics(pulsar.getAdvertisedAddress())); } - private void updateSplitMetrics(List decisions) { - for (var decision : decisions) { - splitCounter.update(decision); - } - this.splitMetrics.set(splitCounter.toMetrics(pulsar.getAdvertisedAddress())); - } - public List getMetrics() { List metricsCollection = new ArrayList<>(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java new file mode 100644 index 0000000000000..71ebbc92a87db --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.manager; + +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; + +/** + * Split manager. + */ +@Slf4j +public class SplitManager implements StateChangeListener { + + + private final Map> inFlightSplitRequests; + + private final SplitCounter counter; + + public SplitManager(SplitCounter splitCounter) { + this.inFlightSplitRequests = new ConcurrentHashMap<>(); + this.counter = splitCounter; + } + + private void complete(String serviceUnit, Throwable ex) { + inFlightSplitRequests.computeIfPresent(serviceUnit, (__, future) -> { + if (!future.isDone()) { + if (ex != null) { + future.completeExceptionally(ex); + } else { + future.complete(null); + } + } + return null; + }); + } + + public CompletableFuture waitAsync(CompletableFuture eventPubFuture, + String bundle, + SplitDecision decision, + long timeout, + TimeUnit timeoutUnit) { + return eventPubFuture + .thenCompose(__ -> inFlightSplitRequests.computeIfAbsent(bundle, ignore -> { + log.info("Published the bundle split event for bundle:{}. " + + "Waiting the split event to complete. Timeout: {} {}", + bundle, timeout, timeoutUnit); + CompletableFuture future = new CompletableFuture<>(); + future.orTimeout(timeout, timeoutUnit).whenComplete((v, ex) -> { + if (ex != null) { + inFlightSplitRequests.remove(bundle); + log.warn("Timed out while waiting for the bundle split event: {}", bundle, ex); + } + }); + return future; + })) + .whenComplete((__, ex) -> { + if (ex != null) { + log.error("Failed the bundle split event for bundle:{}", bundle, ex); + counter.update(Failure, Unknown); + } else { + log.info("Completed the bundle split event for bundle:{}", bundle); + counter.update(decision); + } + }); + } + + @Override + public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) { + ServiceUnitState state = ServiceUnitStateData.state(data); + if (t != null && inFlightSplitRequests.containsKey(serviceUnit)) { + this.complete(serviceUnit, t); + return; + } + switch (state) { + case Deleted, Owned, Init -> this.complete(serviceUnit, t); + default -> { + if (log.isDebugEnabled()) { + log.debug("Handling {} for service unit {}", data, serviceUnit); + } + } + } + } + + public void close() { + inFlightSplitRequests.forEach((bundle, future) -> { + if (!future.isDone()) { + String msg = String.format("Splitting bundle: %s, but the manager already closed.", bundle); + log.warn(msg); + future.completeExceptionally(new IllegalStateException(msg)); + } + }); + inFlightSplitRequests.clear(); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitCounter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitCounter.java index 99406412cee2b..ed72b5f586331 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitCounter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitCounter.java @@ -19,10 +19,8 @@ package org.apache.pulsar.broker.loadbalance.extensions.models; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure; -import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Skip; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin; -import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Balanced; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Bandwidth; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.MsgRate; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Sessions; @@ -32,7 +30,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.commons.lang3.mutable.MutableLong; +import java.util.concurrent.atomic.AtomicLong; import org.apache.pulsar.common.stats.Metrics; /** @@ -40,23 +38,20 @@ */ public class SplitCounter { - long splitCount = 0; - - final Map> breakdownCounters; + private long splitCount = 0; + private final Map> breakdownCounters; + private volatile long updatedAt = 0; public SplitCounter() { breakdownCounters = Map.of( Success, Map.of( - Topics, new MutableLong(), - Sessions, new MutableLong(), - MsgRate, new MutableLong(), - Bandwidth, new MutableLong(), - Admin, new MutableLong()), - Skip, Map.of( - Balanced, new MutableLong() - ), + Topics, new AtomicLong(), + Sessions, new AtomicLong(), + MsgRate, new AtomicLong(), + Bandwidth, new AtomicLong(), + Admin, new AtomicLong()), Failure, Map.of( - Unknown, new MutableLong()) + Unknown, new AtomicLong()) ); } @@ -64,7 +59,16 @@ public void update(SplitDecision decision) { if (decision.label == Success) { splitCount++; } - breakdownCounters.get(decision.getLabel()).get(decision.getReason()).increment(); + breakdownCounters.get(decision.getLabel()).get(decision.getReason()).incrementAndGet(); + updatedAt = System.currentTimeMillis(); + } + + public void update(SplitDecision.Label label, SplitDecision.Reason reason) { + if (label == Success) { + splitCount++; + } + breakdownCounters.get(label).get(reason).incrementAndGet(); + updatedAt = System.currentTimeMillis(); } public List toMetrics(String advertisedBrokerAddress) { @@ -77,17 +81,18 @@ public List toMetrics(String advertisedBrokerAddress) { m.put("brk_lb_bundles_split_total", splitCount); metrics.add(m); - for (Map.Entry> etr + + for (Map.Entry> etr : breakdownCounters.entrySet()) { var result = etr.getKey(); - for (Map.Entry counter : etr.getValue().entrySet()) { + for (Map.Entry counter : etr.getValue().entrySet()) { var reason = counter.getKey(); var count = counter.getValue(); Map breakdownDims = new HashMap<>(dimensions); breakdownDims.put("result", result.toString()); breakdownDims.put("reason", reason.toString()); Metrics breakdownMetric = Metrics.create(breakdownDims); - breakdownMetric.put("brk_lb_bundles_split_breakdown_total", count); + breakdownMetric.put("brk_lb_bundles_split_breakdown_total", count.get()); metrics.add(breakdownMetric); } } @@ -95,4 +100,7 @@ public List toMetrics(String advertisedBrokerAddress) { return metrics; } + public long updatedAt() { + return updatedAt; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitDecision.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitDecision.java index a3dede50c1cd8..433d21a5a613e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitDecision.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitDecision.java @@ -19,9 +19,7 @@ package org.apache.pulsar.broker.loadbalance.extensions.models; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure; -import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Skip; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success; -import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Balanced; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; import lombok.Data; @@ -36,7 +34,6 @@ public class SplitDecision { public enum Label { Success, - Skip, Failure } @@ -46,7 +43,6 @@ public enum Reason { MsgRate, Bandwidth, Admin, - Balanced, Unknown } @@ -62,11 +58,6 @@ public void clear() { reason = null; } - public void skip() { - label = Skip; - reason = Balanced; - } - public void succeed(Reason reason) { label = Success; this.reason = reason; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java new file mode 100644 index 0000000000000..589df80fc5c14 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.scheduler; + +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; +import org.apache.pulsar.broker.loadbalance.extensions.manager.SplitManager; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; +import org.apache.pulsar.broker.loadbalance.extensions.strategy.DefaultNamespaceBundleSplitStrategyImpl; +import org.apache.pulsar.broker.loadbalance.extensions.strategy.NamespaceBundleSplitStrategy; +import org.apache.pulsar.common.stats.Metrics; +import org.apache.pulsar.common.util.FutureUtil; + +/** + * Service Unit(e.g. bundles) Split scheduler. + */ +@Slf4j +public class SplitScheduler implements LoadManagerScheduler { + + private final PulsarService pulsar; + + private final ScheduledExecutorService loadManagerExecutor; + + private final LoadManagerContext context; + + private final ServiceConfiguration conf; + + private final ServiceUnitStateChannel serviceUnitStateChannel; + + private final NamespaceBundleSplitStrategy bundleSplitStrategy; + + private final SplitCounter counter; + + private final SplitManager splitManager; + + private final AtomicReference> splitMetrics; + + private volatile ScheduledFuture task; + + private long counterLastUpdatedAt = 0; + + public SplitScheduler(PulsarService pulsar, + ServiceUnitStateChannel serviceUnitStateChannel, + SplitManager splitManager, + SplitCounter counter, + AtomicReference> splitMetrics, + LoadManagerContext context, + NamespaceBundleSplitStrategy bundleSplitStrategy) { + this.pulsar = pulsar; + this.loadManagerExecutor = pulsar.getLoadManagerExecutor(); + this.splitManager = splitManager; + this.counter = counter; + this.splitMetrics = splitMetrics; + this.context = context; + this.conf = pulsar.getConfiguration(); + this.bundleSplitStrategy = bundleSplitStrategy; + this.serviceUnitStateChannel = serviceUnitStateChannel; + } + + public SplitScheduler(PulsarService pulsar, + ServiceUnitStateChannel serviceUnitStateChannel, + SplitManager splitManager, + SplitCounter counter, + AtomicReference> splitMetrics, + LoadManagerContext context) { + this(pulsar, serviceUnitStateChannel, splitManager, counter, splitMetrics, context, + new DefaultNamespaceBundleSplitStrategyImpl(counter)); + } + + @Override + public void execute() { + boolean debugMode = conf.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled(); + if (debugMode) { + log.info("Load balancer enabled: {}, Split enabled: {}.", + conf.isLoadBalancerEnabled(), conf.isLoadBalancerAutoBundleSplitEnabled()); + } + + if (!isLoadBalancerAutoBundleSplitEnabled()) { + if (debugMode) { + log.info("The load balancer or load balancer split already disabled. Skipping."); + } + return; + } + + synchronized (bundleSplitStrategy) { + final Set decisions = bundleSplitStrategy.findBundlesToSplit(context, pulsar); + if (!decisions.isEmpty()) { + + // currently following the unloading timeout + var asyncOpTimeoutMs = conf.getNamespaceBundleUnloadingTimeoutMs(); + List> futures = new ArrayList<>(); + for (SplitDecision decision : decisions) { + if (decision.getLabel() == Success) { + var split = decision.getSplit(); + futures.add( + splitManager.waitAsync( + serviceUnitStateChannel.publishSplitEventAsync(split), + split.serviceUnit(), + decision, + asyncOpTimeoutMs, TimeUnit.MILLISECONDS) + ); + } + } + try { + FutureUtil.waitForAll(futures) + .get(asyncOpTimeoutMs, TimeUnit.MILLISECONDS); + } catch (Throwable e) { + log.error("Failed to wait for split events to persist.", e); + } + } else { + if (debugMode) { + log.info("BundleSplitStrategy returned no bundles to split."); + } + } + } + + if (counter.updatedAt() > counterLastUpdatedAt) { + splitMetrics.set(counter.toMetrics(pulsar.getAdvertisedAddress())); + counterLastUpdatedAt = counter.updatedAt(); + } + } + + @Override + public void start() { + long interval = TimeUnit.MINUTES + .toMillis(conf.getLoadBalancerSplitIntervalMinutes()); + task = loadManagerExecutor.scheduleAtFixedRate(() -> { + try { + execute(); + } catch (Throwable e) { + log.error("Failed to run the split job.", e); + } + }, interval, interval, TimeUnit.MILLISECONDS); + } + + @Override + public void close() { + if (task != null) { + task.cancel(false); + task = null; + } + } + + private boolean isLoadBalancerAutoBundleSplitEnabled() { + return conf.isLoadBalancerEnabled() && conf.isLoadBalancerAutoBundleSplitEnabled(); + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java new file mode 100644 index 0000000000000..e572fd4161bdb --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.strategy; + +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Bandwidth; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.MsgRate; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Sessions; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Topics; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; +import org.apache.pulsar.broker.loadbalance.extensions.models.Split; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; +import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; +import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; + +/** + * Determines which bundles should be split based on various thresholds. + * + * Migrate from {@link org.apache.pulsar.broker.loadbalance.impl.BundleSplitterTask} + */ +@Slf4j +public class DefaultNamespaceBundleSplitStrategyImpl implements NamespaceBundleSplitStrategy { + private final Set decisionCache; + private final Map namespaceBundleCount; + private final Map bundleHighTrafficFrequency; + private final SplitCounter counter; + + public DefaultNamespaceBundleSplitStrategyImpl(SplitCounter counter) { + decisionCache = new HashSet<>(); + namespaceBundleCount = new HashMap<>(); + bundleHighTrafficFrequency = new HashMap<>(); + this.counter = counter; + + } + + @Override + public Set findBundlesToSplit(LoadManagerContext context, PulsarService pulsar) { + decisionCache.clear(); + namespaceBundleCount.clear(); + final ServiceConfiguration conf = pulsar.getConfiguration(); + int maxBundleCount = conf.getLoadBalancerNamespaceMaximumBundles(); + long maxBundleTopics = conf.getLoadBalancerNamespaceBundleMaxTopics(); + long maxBundleSessions = conf.getLoadBalancerNamespaceBundleMaxSessions(); + long maxBundleMsgRate = conf.getLoadBalancerNamespaceBundleMaxMsgRate(); + long maxBundleBandwidth = conf.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * LoadManagerShared.MIBI; + long maxSplitCount = conf.getLoadBalancerMaxNumberOfBundlesToSplitPerCycle(); + long splitConditionThreshold = conf.getLoadBalancerNamespaceBundleSplitConditionThreshold(); + boolean debug = log.isDebugEnabled() || conf.isLoadBalancerDebugModeEnabled(); + + Map bundleStatsMap = pulsar.getBrokerService().getBundleStats(); + NamespaceBundleFactory namespaceBundleFactory = + pulsar.getNamespaceService().getNamespaceBundleFactory(); + + // clean bundleHighTrafficFrequency + bundleHighTrafficFrequency.keySet().retainAll(bundleStatsMap.keySet()); + + for (var entry : bundleStatsMap.entrySet()) { + final String bundle = entry.getKey(); + final NamespaceBundleStats stats = entry.getValue(); + if (stats.topics < 2) { + if (debug) { + log.info("The count of topics on the bundle {} is less than 2, skip split!", bundle); + } + continue; + } + + final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle); + final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle); + if (!namespaceBundleFactory + .canSplitBundle(namespaceBundleFactory.getBundle(namespaceName, bundleRange))) { + if (debug) { + log.info("Can't split the bundle:{}. invalid bundle range:{}. ", bundle, bundleRange); + } + counter.update(Failure, Unknown); + continue; + } + + double totalMessageRate = stats.msgRateIn + stats.msgRateOut; + double totalMessageThroughput = stats.msgThroughputIn + stats.msgThroughputOut; + int totalSessionCount = stats.consumerCount + stats.producerCount; + SplitDecision.Reason reason = Unknown; + if (stats.topics > maxBundleTopics) { + reason = Topics; + } else if (maxBundleSessions > 0 && (totalSessionCount > maxBundleSessions)) { + reason = Sessions; + } else if (totalMessageRate > maxBundleMsgRate) { + reason = MsgRate; + } else if (totalMessageThroughput > maxBundleBandwidth) { + reason = Bandwidth; + } + + if (reason != Unknown) { + bundleHighTrafficFrequency.put(bundle, bundleHighTrafficFrequency.getOrDefault(bundle, 0) + 1); + } else { + bundleHighTrafficFrequency.remove(bundle); + } + + if (bundleHighTrafficFrequency.getOrDefault(bundle, 0) > splitConditionThreshold) { + final String namespace = LoadManagerShared.getNamespaceNameFromBundleName(bundle); + try { + final int bundleCount = pulsar.getNamespaceService() + .getBundleCount(NamespaceName.get(namespace)); + if ((bundleCount + namespaceBundleCount.getOrDefault(namespace, 0)) + < maxBundleCount) { + if (debug) { + log.info("The bundle {} is considered to split. Topics: {}/{}, Sessions: ({}+{})/{}, " + + "Message Rate: {}/{} (msgs/s), Message Throughput: {}/{} (MB/s)", + bundle, stats.topics, maxBundleTopics, stats.producerCount, stats.consumerCount, + maxBundleSessions, totalMessageRate, maxBundleMsgRate, + totalMessageThroughput / LoadManagerShared.MIBI, + maxBundleBandwidth / LoadManagerShared.MIBI); + } + var decision = new SplitDecision(); + decision.setSplit(new Split(bundle, context.brokerRegistry().getBrokerId(), new HashMap<>())); + decision.succeed(reason); + decisionCache.add(decision); + int bundleNum = namespaceBundleCount.getOrDefault(namespace, 0); + namespaceBundleCount.put(namespace, bundleNum + 1); + bundleHighTrafficFrequency.remove(bundle); + // Clear namespace bundle-cache + namespaceBundleFactory.invalidateBundleCache(NamespaceName.get(namespaceName)); + if (decisionCache.size() == maxSplitCount) { + if (debug) { + log.info("Too many bundles to split in this split cycle {} / {}. Stop.", + decisionCache.size(), maxSplitCount); + } + break; + } + } else { + if (debug) { + log.info( + "Could not split namespace bundle {} because namespace {} has too many bundles:" + + "{}", bundle, namespace, bundleCount); + } + } + } catch (Exception e) { + counter.update(Failure, Unknown); + log.warn("Error while computing bundle splits for namespace {}", namespace, e); + } + } + } + return decisionCache; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceBundleSplitStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/NamespaceBundleSplitStrategy.java similarity index 86% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceBundleSplitStrategy.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/NamespaceBundleSplitStrategy.java index 88bd7f0b08780..14023f1b5b01d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceBundleSplitStrategy.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/NamespaceBundleSplitStrategy.java @@ -16,11 +16,12 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.loadbalance.extensions.scheduler; +package org.apache.pulsar.broker.loadbalance.extensions.strategy; import java.util.Set; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; -import org.apache.pulsar.broker.loadbalance.extensions.models.Split; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; /** * Determines which bundles should be split based on various thresholds. @@ -35,5 +36,5 @@ public interface NamespaceBundleSplitStrategy { * @param context The context used for decisions. * @return A set of the bundles that should be split. */ - Set findBundlesToSplit(LoadManagerContext context); + Set findBundlesToSplit(LoadManagerContext context, PulsarService pulsar); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index ec82f5c383e2e..5273e28208df8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -431,24 +431,16 @@ Unknown, new MutableLong(10)) FieldUtils.readDeclaredField(primaryLoadManager, "splitMetrics", true); SplitCounter splitCounter = new SplitCounter(); FieldUtils.writeDeclaredField(splitCounter, "splitCount", 35l, true); - FieldUtils.writeDeclaredField(splitCounter, "breakdownCounters", new LinkedHashMap<>() { - { - put(SplitDecision.Label.Success, new LinkedHashMap<>() { - { - put(Topics, new MutableLong(1)); - put(Sessions, new MutableLong(2)); - put(MsgRate, new MutableLong(3)); - put(Bandwidth, new MutableLong(4)); - put(Admin, new MutableLong(5)); - } - }); - put(SplitDecision.Label.Skip, Map.of( - SplitDecision.Reason.Balanced, new MutableLong(6) - )); - put(SplitDecision.Label.Failure, Map.of( - SplitDecision.Reason.Unknown, new MutableLong(7))); - } - }, true); + FieldUtils.writeDeclaredField(splitCounter, "breakdownCounters", Map.of( + SplitDecision.Label.Success, Map.of( + Topics, new AtomicLong(1), + Sessions, new AtomicLong(2), + MsgRate, new AtomicLong(3), + Bandwidth, new AtomicLong(4), + Admin, new AtomicLong(5)), + SplitDecision.Label.Failure, Map.of( + SplitDecision.Reason.Unknown, new AtomicLong(6)) + ), true); splitMetrics.set(splitCounter.toMetrics(pulsar.getAdvertisedAddress())); } @@ -523,8 +515,7 @@ SplitDecision.Reason.Balanced, new MutableLong(6) dimensions=[{broker=localhost, metric=bundlesSplit, reason=MsgRate, result=Success}], metrics=[{brk_lb_bundles_split_breakdown_total=3}] dimensions=[{broker=localhost, metric=bundlesSplit, reason=Bandwidth, result=Success}], metrics=[{brk_lb_bundles_split_breakdown_total=4}] dimensions=[{broker=localhost, metric=bundlesSplit, reason=Admin, result=Success}], metrics=[{brk_lb_bundles_split_breakdown_total=5}] - dimensions=[{broker=localhost, metric=bundlesSplit, reason=Balanced, result=Skip}], metrics=[{brk_lb_bundles_split_breakdown_total=6}] - dimensions=[{broker=localhost, metric=bundlesSplit, reason=Unknown, result=Failure}], metrics=[{brk_lb_bundles_split_breakdown_total=7}] + dimensions=[{broker=localhost, metric=bundlesSplit, reason=Unknown, result=Failure}], metrics=[{brk_lb_bundles_split_breakdown_total=6}] dimensions=[{broker=localhost, metric=assign, result=Empty}], metrics=[{brk_lb_assign_broker_breakdown_total=2}] dimensions=[{broker=localhost, metric=assign, result=Skip}], metrics=[{brk_lb_assign_broker_breakdown_total=3}] dimensions=[{broker=localhost, metric=assign, result=Success}], metrics=[{brk_lb_assign_broker_breakdown_total=1}] diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java new file mode 100644 index 0000000000000..3287306ab48ba --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.manager; + +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.VERSION_ID_INIT; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Sessions; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; +import org.apache.pulsar.common.util.FutureUtil; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class SplitManagerTest { + + String bundle = "bundle-1"; + + String dstBroker = "broker-1"; + + @Test + public void testEventPubFutureHasException() { + var counter = new SplitCounter(); + SplitManager manager = new SplitManager(counter); + var decision = new SplitDecision(); + CompletableFuture future = + manager.waitAsync(FutureUtil.failedFuture(new Exception("test")), + bundle, decision, 10, TimeUnit.SECONDS); + + assertTrue(future.isCompletedExceptionally()); + try { + future.get(); + fail(); + } catch (Exception ex) { + assertEquals(ex.getCause().getMessage(), "test"); + } + var counterExpected = new SplitCounter(); + counterExpected.update(SplitDecision.Label.Failure, Unknown); + assertEquals(counter.toMetrics(null).toString(), + counterExpected.toMetrics(null).toString()); + } + + @Test + public void testTimeout() throws IllegalAccessException { + var counter = new SplitCounter(); + SplitManager manager = new SplitManager(counter); + var decision = new SplitDecision(); + CompletableFuture future = + manager.waitAsync(CompletableFuture.completedFuture(null), + bundle, decision, 3, TimeUnit.SECONDS); + var inFlightUnloadRequests = getinFlightUnloadRequests(manager); + + assertEquals(inFlightUnloadRequests.size(), 1); + + try { + future.get(); + fail(); + } catch (Exception ex) { + assertTrue(ex.getCause() instanceof TimeoutException); + } + + assertEquals(inFlightUnloadRequests.size(), 0); + var counterExpected = new SplitCounter(); + counterExpected.update(SplitDecision.Label.Failure, Unknown); + assertEquals(counter.toMetrics(null).toString(), + counterExpected.toMetrics(null).toString()); + } + + @Test + public void testSuccess() throws IllegalAccessException, ExecutionException, InterruptedException { + var counter = new SplitCounter(); + SplitManager manager = new SplitManager(counter); + var counterExpected = new SplitCounter(); + var decision = new SplitDecision(); + decision.succeed(Sessions); + CompletableFuture future = + manager.waitAsync(CompletableFuture.completedFuture(null), + bundle, decision, 5, TimeUnit.SECONDS); + var inFlightUnloadRequests = getinFlightUnloadRequests(manager); + + assertEquals(inFlightUnloadRequests.size(), 1); + + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Assigning, dstBroker, VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequests.size(), 1); + + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Splitting, dstBroker, VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequests.size(), 1); + + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Releasing, dstBroker, VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequests.size(), 1); + + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequests.size(), 1); + assertEquals(counter.toMetrics(null).toString(), + counterExpected.toMetrics(null).toString()); + + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Deleted, dstBroker, VERSION_ID_INIT), null); + counterExpected.update(SplitDecision.Label.Success, Sessions); + assertEquals(inFlightUnloadRequests.size(), 0); + assertEquals(counter.toMetrics(null).toString(), + counterExpected.toMetrics(null).toString()); + + // Success with Init state. + future = manager.waitAsync(CompletableFuture.completedFuture(null), + bundle, decision, 5, TimeUnit.SECONDS); + inFlightUnloadRequests = getinFlightUnloadRequests(manager); + assertEquals(inFlightUnloadRequests.size(), 1); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Init, dstBroker, VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequests.size(), 0); + counterExpected.update(SplitDecision.Label.Success, Sessions); + assertEquals(counter.toMetrics(null).toString(), + counterExpected.toMetrics(null).toString()); + future.get(); + + // Success with Owned state. + future = manager.waitAsync(CompletableFuture.completedFuture(null), + bundle, decision, 5, TimeUnit.SECONDS); + inFlightUnloadRequests = getinFlightUnloadRequests(manager); + assertEquals(inFlightUnloadRequests.size(), 1); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequests.size(), 0); + counterExpected.update(SplitDecision.Label.Success, Sessions); + assertEquals(counter.toMetrics(null).toString(), + counterExpected.toMetrics(null).toString()); + future.get(); + } + + @Test + public void testFailedStage() throws IllegalAccessException { + var counter = new SplitCounter(); + SplitManager manager = new SplitManager(counter); + var decision = new SplitDecision(); + CompletableFuture future = + manager.waitAsync(CompletableFuture.completedFuture(null), + bundle, decision, 5, TimeUnit.SECONDS); + var inFlightUnloadRequests = getinFlightUnloadRequests(manager); + + assertEquals(inFlightUnloadRequests.size(), 1); + + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, VERSION_ID_INIT), + new IllegalStateException("Failed stage.")); + + try { + future.get(); + fail(); + } catch (Exception ex) { + assertTrue(ex.getCause() instanceof IllegalStateException); + assertEquals(ex.getCause().getMessage(), "Failed stage."); + } + + assertEquals(inFlightUnloadRequests.size(), 0); + var counterExpected = new SplitCounter(); + counterExpected.update(SplitDecision.Label.Failure, Unknown); + assertEquals(counter.toMetrics(null).toString(), + counterExpected.toMetrics(null).toString()); + } + + @Test + public void testClose() throws IllegalAccessException { + SplitManager manager = new SplitManager(new SplitCounter()); + var decision = new SplitDecision(); + CompletableFuture future = + manager.waitAsync(CompletableFuture.completedFuture(null), + bundle, decision, 5, TimeUnit.SECONDS); + var inFlightUnloadRequests = getinFlightUnloadRequests(manager); + assertEquals(inFlightUnloadRequests.size(), 1); + manager.close(); + assertEquals(inFlightUnloadRequests.size(), 0); + + try { + future.get(); + fail(); + } catch (Exception ex) { + assertTrue(ex.getCause() instanceof IllegalStateException); + } + } + + private Map> getinFlightUnloadRequests(SplitManager manager) + throws IllegalAccessException { + var inFlightUnloadRequest = + (Map>) FieldUtils.readField(manager, "inFlightSplitRequests", true); + + return inFlightUnloadRequest; + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java new file mode 100644 index 0000000000000..7988aa413366f --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.scheduler; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; +import org.apache.pulsar.broker.loadbalance.extensions.manager.SplitManager; +import org.apache.pulsar.broker.loadbalance.extensions.models.Split; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; +import org.apache.pulsar.broker.loadbalance.extensions.strategy.NamespaceBundleSplitStrategy; +import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.common.stats.Metrics; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class SplitSchedulerTest { + + PulsarService pulsar; + ServiceConfiguration config; + NamespaceBundleFactory namespaceBundleFactory; + LoadManagerContext context; + ServiceUnitStateChannel channel; + NamespaceBundleSplitStrategy strategy; + String bundle1 = "tenant/namespace/0x00000000_0xFFFFFFFF"; + String bundle2 = "tenant/namespace/0x00000000_0x0FFFFFFF"; + String broker = "broker-1"; + SplitDecision decision1; + SplitDecision decision2; + + @BeforeMethod + public void setUp() { + + config = new ServiceConfiguration(); + config.setLoadBalancerDebugModeEnabled(true); + + pulsar = mock(PulsarService.class); + namespaceBundleFactory = mock(NamespaceBundleFactory.class); + context = mock(LoadManagerContext.class); + channel = mock(ServiceUnitStateChannel.class); + strategy = mock(NamespaceBundleSplitStrategy.class); + + doReturn(config).when(pulsar).getConfiguration(); + doReturn(true).when(namespaceBundleFactory).canSplitBundle(any()); + doReturn(CompletableFuture.completedFuture(null)).when(channel).publishSplitEventAsync(any()); + + decision1 = new SplitDecision(); + decision1.setSplit(new Split(bundle1, broker, new HashMap<>())); + decision1.succeed(SplitDecision.Reason.MsgRate); + + decision2 = new SplitDecision(); + decision2.setSplit(new Split(bundle2, broker, new HashMap<>())); + decision2.succeed(SplitDecision.Reason.Sessions); + Set decisions = Set.of(decision1, decision2); + doReturn(decisions).when(strategy).findBundlesToSplit(any(), any()); + } + + @Test(timeOut = 30 * 1000) + public void testExecuteSuccess() { + AtomicReference> reference = new AtomicReference(); + SplitCounter counter = new SplitCounter(); + SplitManager manager = mock(SplitManager.class); + SplitScheduler scheduler = new SplitScheduler(pulsar, channel, manager, counter, reference, context, strategy); + doAnswer((invocation)->{ + var decision = invocation.getArgument(2, SplitDecision.class); + counter.update(decision); + return CompletableFuture.completedFuture(null); + }).when(manager).waitAsync(any(), any(), any(), anyLong(), any()); + scheduler.execute(); + + var counterExpected = new SplitCounter(); + counterExpected.update(decision1); + counterExpected.update(decision2); + verify(channel, times(1)).publishSplitEventAsync(eq(decision1.getSplit())); + verify(channel, times(1)).publishSplitEventAsync(eq(decision2.getSplit())); + + assertEquals(reference.get().toString(), counterExpected.toMetrics(pulsar.getAdvertisedAddress()).toString()); + + // Test empty splits. + Set emptyUnload = Set.of(); + doReturn(emptyUnload).when(strategy).findBundlesToSplit(any(), any()); + + scheduler.execute(); + verify(channel, times(2)).publishSplitEventAsync(any()); + assertEquals(reference.get().toString(), counterExpected.toMetrics(pulsar.getAdvertisedAddress()).toString()); + } + + @Test(timeOut = 30 * 1000) + public void testExecuteFailure() { + AtomicReference> reference = new AtomicReference(); + SplitCounter counter = new SplitCounter(); + SplitManager manager = new SplitManager(counter); + SplitScheduler scheduler = new SplitScheduler(pulsar, channel, manager, counter, reference, context, strategy); + doReturn(CompletableFuture.failedFuture(new RuntimeException())).when(channel).publishSplitEventAsync(any()); + + scheduler.execute(); + + + var counterExpected = new SplitCounter(); + counterExpected.update(SplitDecision.Label.Failure, SplitDecision.Reason.Unknown); + counterExpected.update(SplitDecision.Label.Failure, SplitDecision.Reason.Unknown); + verify(channel, times(1)).publishSplitEventAsync(eq(decision1.getSplit())); + verify(channel, times(1)).publishSplitEventAsync(eq(decision2.getSplit())); + + assertEquals(reference.get().toString(), counterExpected.toMetrics(pulsar.getAdvertisedAddress()).toString()); + } + + + @Test(timeOut = 30 * 1000) + public void testDisableLoadBalancer() { + + config.setLoadBalancerEnabled(false); + SplitScheduler scheduler = new SplitScheduler(pulsar, channel, null, null, null, context, strategy); + + scheduler.execute(); + + verify(strategy, times(0)).findBundlesToSplit(any(), any()); + + config.setLoadBalancerEnabled(true); + config.setLoadBalancerAutoBundleSplitEnabled(false); + scheduler.execute(); + + verify(strategy, times(0)).findBundlesToSplit(any(), any()); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java new file mode 100644 index 0000000000000..71606bb85a3fe --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.strategy; + +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry; +import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; +import org.apache.pulsar.broker.loadbalance.extensions.models.Split; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; +import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.PulsarStats; +import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class DefaultNamespaceBundleSplitStrategyTest { + + PulsarService pulsar; + BrokerService brokerService; + PulsarStats pulsarStats; + Map bundleStats; + ServiceConfiguration config; + NamespaceBundleFactory namespaceBundleFactory; + NamespaceService namespaceService; + + LoadManagerContext loadManagerContext; + + BrokerRegistry brokerRegistry; + + String bundle1 = "tenant/namespace/0x00000000_0xFFFFFFFF"; + String bundle2 = "tenant/namespace/0x00000000_0x0FFFFFFF"; + + String broker = "broker-1"; + + @BeforeMethod + void setup() { + config = new ServiceConfiguration(); + config.setLoadBalancerDebugModeEnabled(true); + config.setLoadBalancerNamespaceMaximumBundles(100); + config.setLoadBalancerNamespaceBundleMaxTopics(100); + config.setLoadBalancerNamespaceBundleMaxSessions(100); + config.setLoadBalancerNamespaceBundleMaxMsgRate(100); + config.setLoadBalancerNamespaceBundleMaxBandwidthMbytes(100); + config.setLoadBalancerMaxNumberOfBundlesToSplitPerCycle(1); + config.setLoadBalancerNamespaceBundleSplitConditionThreshold(3); + + pulsar = mock(PulsarService.class); + brokerService = mock(BrokerService.class); + pulsarStats = mock(PulsarStats.class); + namespaceService = mock(NamespaceService.class); + namespaceBundleFactory = mock(NamespaceBundleFactory.class); + loadManagerContext = mock(LoadManagerContext.class); + brokerRegistry = mock(BrokerRegistry.class); + + + + doReturn(brokerService).when(pulsar).getBrokerService(); + doReturn(config).when(pulsar).getConfiguration(); + doReturn(pulsarStats).when(brokerService).getPulsarStats(); + doReturn(namespaceService).when(pulsar).getNamespaceService(); + doReturn(namespaceBundleFactory).when(namespaceService).getNamespaceBundleFactory(); + doReturn(true).when(namespaceBundleFactory).canSplitBundle(any()); + doReturn(brokerRegistry).when(loadManagerContext).brokerRegistry(); + doReturn(broker).when(brokerRegistry).getBrokerId(); + + + bundleStats = new LinkedHashMap<>(); + NamespaceBundleStats stats1 = new NamespaceBundleStats(); + stats1.topics = 5; + bundleStats.put(bundle1, stats1); + NamespaceBundleStats stats2 = new NamespaceBundleStats(); + stats2.topics = 5; + bundleStats.put(bundle2, stats2); + doReturn(bundleStats).when(brokerService).getBundleStats(); + } + + public void testNamespaceBundleSplitConditionThreshold() { + config.setLoadBalancerNamespaceBundleSplitConditionThreshold(0); + bundleStats.values().forEach(v -> v.msgRateIn = config.getLoadBalancerNamespaceBundleMaxMsgRate() + 1); + var strategy = new DefaultNamespaceBundleSplitStrategyImpl(new SplitCounter()); + var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar); + assertEquals(actual.size(), 1); + } + + + public void testNotEnoughTopics() { + config.setLoadBalancerNamespaceBundleSplitConditionThreshold(0); + bundleStats.values().forEach(v -> v.msgRateIn = config.getLoadBalancerNamespaceBundleMaxMsgRate() + 1); + var strategy = new DefaultNamespaceBundleSplitStrategyImpl(new SplitCounter()); + bundleStats.values().forEach(v -> v.topics = 1); + var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar); + var expected = Set.of(); + assertEquals(actual, expected); + } + + public void testNamespaceMaximumBundles() throws Exception { + config.setLoadBalancerNamespaceBundleSplitConditionThreshold(0); + bundleStats.values().forEach(v -> v.msgRateIn = config.getLoadBalancerNamespaceBundleMaxMsgRate() + 1); + var strategy = new DefaultNamespaceBundleSplitStrategyImpl(new SplitCounter()); + doReturn(config.getLoadBalancerNamespaceMaximumBundles()).when(namespaceService).getBundleCount(any()); + var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar); + var expected = Set.of(); + assertEquals(actual, expected); + } + + public void testEmptyBundleStats() { + config.setLoadBalancerNamespaceBundleSplitConditionThreshold(0); + bundleStats.values().forEach(v -> v.msgRateIn = config.getLoadBalancerNamespaceBundleMaxMsgRate() + 1); + var strategy = new DefaultNamespaceBundleSplitStrategyImpl(new SplitCounter()); + bundleStats.clear(); + var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar); + var expected = Set.of(); + assertEquals(actual, expected); + } + + public void testError() throws Exception { + var counter = spy(new SplitCounter()); + config.setLoadBalancerNamespaceBundleSplitConditionThreshold(0); + bundleStats.values().forEach(v -> v.msgRateIn = config.getLoadBalancerNamespaceBundleMaxMsgRate() + 1); + var strategy = new DefaultNamespaceBundleSplitStrategyImpl(counter); + doThrow(new RuntimeException()).when(namespaceService).getBundleCount(any()); + var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar); + var expected = Set.of(); + assertEquals(actual, expected); + verify(counter, times(2)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } + + public void testMaxMsgRate() { + var counter = spy(new SplitCounter()); + var strategy = new DefaultNamespaceBundleSplitStrategyImpl(counter); + int threshold = config.getLoadBalancerNamespaceBundleSplitConditionThreshold(); + bundleStats.values().forEach(v -> { + v.msgRateOut = config.getLoadBalancerNamespaceBundleMaxMsgRate() / 2 + 1; + v.msgRateIn = config.getLoadBalancerNamespaceBundleMaxMsgRate() / 2 + 1; + }); + for (int i = 0; i < threshold + 2; i++) { + var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar); + if (i == threshold) { + SplitDecision decision1 = new SplitDecision(); + decision1.setSplit(new Split(bundle1, broker, new HashMap<>())); + decision1.succeed(SplitDecision.Reason.MsgRate); + + assertEquals(actual, Set.of(decision1)); + verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } else if (i == threshold + 1) { + SplitDecision decision1 = new SplitDecision(); + decision1.setSplit(new Split(bundle2, broker, new HashMap<>())); + decision1.succeed(SplitDecision.Reason.MsgRate); + + assertEquals(actual, Set.of(decision1)); + verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } else { + assertEquals(actual, Set.of()); + verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } + } + } + + public void testMaxTopics() { + var counter = spy(new SplitCounter()); + var strategy = new DefaultNamespaceBundleSplitStrategyImpl(counter); + int threshold = config.getLoadBalancerNamespaceBundleSplitConditionThreshold(); + bundleStats.values().forEach(v -> v.topics = config.getLoadBalancerNamespaceBundleMaxTopics() + 1); + for (int i = 0; i < threshold + 2; i++) { + var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar); + if (i == threshold) { + SplitDecision decision1 = new SplitDecision(); + decision1.setSplit(new Split(bundle1, broker, new HashMap<>())); + decision1.succeed(SplitDecision.Reason.Topics); + + assertEquals(actual, Set.of(decision1)); + verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } else if (i == threshold + 1) { + SplitDecision decision1 = new SplitDecision(); + decision1.setSplit(new Split(bundle2, broker, new HashMap<>())); + decision1.succeed(SplitDecision.Reason.Topics); + + assertEquals(actual, Set.of(decision1)); + verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } else { + assertEquals(actual, Set.of()); + verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } + } + } + + public void testMaxSessions() { + var counter = spy(new SplitCounter()); + var strategy = new DefaultNamespaceBundleSplitStrategyImpl(counter); + int threshold = config.getLoadBalancerNamespaceBundleSplitConditionThreshold(); + bundleStats.values().forEach(v -> { + v.producerCount = config.getLoadBalancerNamespaceBundleMaxSessions() / 2 + 1; + v.consumerCount = config.getLoadBalancerNamespaceBundleMaxSessions() / 2 + 1; + }); + for (int i = 0; i < threshold + 2; i++) { + var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar); + if (i == threshold) { + SplitDecision decision1 = new SplitDecision(); + decision1.setSplit(new Split(bundle1, broker, new HashMap<>())); + decision1.succeed(SplitDecision.Reason.Sessions); + + assertEquals(actual, Set.of(decision1)); + verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } else if (i == threshold + 1) { + SplitDecision decision1 = new SplitDecision(); + decision1.setSplit(new Split(bundle2, broker, new HashMap<>())); + decision1.succeed(SplitDecision.Reason.Sessions); + + assertEquals(actual, Set.of(decision1)); + verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } else { + assertEquals(actual, Set.of()); + verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } + } + } + + public void testMaxBandwidthMbytes() { + var counter = spy(new SplitCounter()); + var strategy = new DefaultNamespaceBundleSplitStrategyImpl(counter); + int threshold = config.getLoadBalancerNamespaceBundleSplitConditionThreshold(); + bundleStats.values().forEach(v -> { + v.msgThroughputOut = config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * 1024 * 1024 / 2 + 1; + v.msgThroughputIn = config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * 1024 * 1024 / 2 + 1; + }); + for (int i = 0; i < threshold + 2; i++) { + var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar); + if (i == threshold) { + SplitDecision decision1 = new SplitDecision(); + decision1.setSplit(new Split(bundle1, broker, new HashMap<>())); + decision1.succeed(SplitDecision.Reason.Bandwidth); + + assertEquals(actual, Set.of(decision1)); + verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } else if (i == threshold + 1) { + SplitDecision decision1 = new SplitDecision(); + decision1.setSplit(new Split(bundle2, broker, new HashMap<>())); + decision1.succeed(SplitDecision.Reason.Bandwidth); + + assertEquals(actual, Set.of(decision1)); + verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } else { + assertEquals(actual, Set.of()); + verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown)); + } + } + } + +}