diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 30f01ece7de11..5be675f7b636c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -58,6 +58,7 @@ import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.loadbalance.LeaderBroker; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; import org.apache.pulsar.broker.service.Subscription; @@ -983,8 +984,17 @@ public CompletableFuture setNamespaceBundleAffinityAsync(String bundleRang } return CompletableFuture.completedFuture(null); }) - .thenCompose(__ -> validateLeaderBrokerAsync()) + .thenCompose(__ -> { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) { + return CompletableFuture.completedFuture(null); + } + return validateLeaderBrokerAsync(); + }) .thenAccept(__ -> { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) { + return; + } + // For ExtensibleLoadManager, this operation will be ignored. pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleRange, destinationBroker); }); } @@ -1036,10 +1046,11 @@ public CompletableFuture internalUnloadNamespaceBundleAsync(String bundleR namespaceName, bundleRange); return CompletableFuture.completedFuture(null); } + Optional destinationBrokerOpt = Optional.ofNullable(destinationBroker); return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, authoritative, true) - .thenCompose(nsBundle -> - pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle)); + .thenCompose(nsBundle -> pulsar().getNamespaceService() + .unloadNamespaceBundle(nsBundle, destinationBrokerOpt)); })); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 650c12af57391..2bebe203d8750 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -35,6 +35,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.BrokerFilterException; +import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData; @@ -43,9 +44,11 @@ import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter; import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerMaxTopicCountFilter; import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerVersionFilter; +import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager; import org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter; import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; +import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter; import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision; import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter; @@ -110,6 +113,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { private ScheduledFuture brokerLoadDataReportTask; private ScheduledFuture topBundlesLoadDataReportTask; + private UnloadManager unloadManager; + private boolean started = false; private final AssignCounter assignCounter = new AssignCounter(); @@ -143,6 +148,13 @@ public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration conf) { return ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName()); } + public static ExtensibleLoadManagerImpl get(LoadManager loadManager) { + if (!(loadManager instanceof ExtensibleLoadManagerWrapper loadManagerWrapper)) { + throw new IllegalArgumentException("The load manager should be 'ExtensibleLoadManagerWrapper'."); + } + return loadManagerWrapper.get(); + } + @Override public void start() throws PulsarServerException { if (this.started) { @@ -151,6 +163,8 @@ public void start() throws PulsarServerException { this.brokerRegistry = new BrokerRegistryImpl(pulsar); this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar); this.brokerRegistry.start(); + this.unloadManager = new UnloadManager(); + this.serviceUnitStateChannel.listen(unloadManager); this.serviceUnitStateChannel.start(); try { @@ -201,7 +215,8 @@ public void start() throws PulsarServerException { interval, TimeUnit.MILLISECONDS); // TODO: Start bundle split scheduler. - this.unloadScheduler = new UnloadScheduler(pulsar.getLoadManagerExecutor(), context, serviceUnitStateChannel); + this.unloadScheduler = new UnloadScheduler( + pulsar.getLoadManagerExecutor(), unloadManager, context, serviceUnitStateChannel); this.unloadScheduler.start(); this.started = true; } @@ -300,6 +315,12 @@ private CompletableFuture> selectAsync(ServiceUnitId bundle) { @Override public CompletableFuture checkOwnershipAsync(Optional topic, ServiceUnitId bundleUnit) { + return getOwnershipAsync(topic, bundleUnit) + .thenApply(broker -> brokerRegistry.getBrokerId().equals(broker.orElse(null))); + } + + private CompletableFuture> getOwnershipAsync(Optional topic, + ServiceUnitId bundleUnit) { final String bundle = bundleUnit.toString(); CompletableFuture> owner; if (topic.isPresent() && isInternalTopic(topic.get().toString())) { @@ -307,8 +328,35 @@ public CompletableFuture checkOwnershipAsync(Optional to } else { owner = serviceUnitStateChannel.getOwnerAsync(bundle); } + return owner; + } + + public CompletableFuture unloadNamespaceBundleAsync(ServiceUnitId bundle, + Optional destinationBroker) { + return getOwnershipAsync(Optional.empty(), bundle) + .thenCompose(brokerOpt -> { + if (brokerOpt.isEmpty()) { + String msg = String.format("Namespace bundle: %s is not owned by any broker.", bundle); + log.warn(msg); + throw new IllegalStateException(msg); + } + String sourceBroker = brokerOpt.get(); + if (destinationBroker.isPresent() && sourceBroker.endsWith(destinationBroker.get())) { + String msg = String.format("Namespace bundle: %s own by %s, cannot be transfer to same broker.", + bundle, sourceBroker); + log.warn(msg); + throw new IllegalArgumentException(msg); + } + return unloadAsync(new Unload(sourceBroker, bundle.toString(), destinationBroker), + conf.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS); + }); + } - return owner.thenApply(broker -> brokerRegistry.getBrokerId().equals(broker.orElse(null))); + private CompletableFuture unloadAsync(Unload unload, + long timeout, + TimeUnit timeoutUnit) { + CompletableFuture future = serviceUnitStateChannel.publishUnloadEventAsync(unload); + return unloadManager.waitAsync(future, unload.serviceUnit(), timeout, timeoutUnit); } @Override @@ -337,6 +385,7 @@ public void close() throws PulsarServerException { try { this.serviceUnitStateChannel.close(); } finally { + this.unloadManager.close(); this.started = false; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java index 44950a21ffd20..dc4d582ddb081 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java @@ -23,6 +23,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener; import org.apache.pulsar.broker.loadbalance.extensions.models.Split; import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; import org.apache.pulsar.common.stats.Metrics; @@ -156,4 +157,11 @@ public interface ServiceUnitStateChannel extends Closeable { */ List getMetrics(); + /** + * Add a state change listener. + * + * @param listener State change listener. + */ + void listen(StateChangeListener listener); + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index cddaf92d22771..5f24e41dda931 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -72,6 +72,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry; import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper; import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; +import org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener; import org.apache.pulsar.broker.loadbalance.extensions.models.Split; import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy; @@ -117,6 +118,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private final ConcurrentOpenHashMap> getOwnerRequests; private final String lookupServiceAddress; private final ConcurrentOpenHashMap> cleanupJobs; + private final StateChangeListeners stateChangeListeners; private final LeaderElectionService leaderElectionService; private BrokerSelectionStrategy brokerSelector; private BrokerRegistry brokerRegistry; @@ -191,6 +193,7 @@ public ServiceUnitStateChannelImpl(PulsarService pulsar) { this.getOwnerRequests = ConcurrentOpenHashMap.>newBuilder().build(); this.cleanupJobs = ConcurrentOpenHashMap.>newBuilder().build(); + this.stateChangeListeners = new StateChangeListeners(); this.semiTerminalStateWaitingTimeInMillis = config.getLoadBalancerServiceUnitStateCleanUpDelayTimeInSeconds() * 1000; this.inFlightStateWaitingTimeInMillis = MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS; @@ -357,6 +360,10 @@ public synchronized void close() throws PulsarServerException { log.info("Successfully cancelled the cleanup tasks"); } + if (stateChangeListeners != null) { + stateChangeListeners.close(); + } + log.info("Successfully closed the channel."); } catch (Exception e) { @@ -619,6 +626,7 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) { if (getOwnerRequest != null) { getOwnerRequest.complete(data.dstBroker()); } + stateChangeListeners.notify(serviceUnit, data, null); if (isTargetBroker(data.dstBroker())) { log(null, serviceUnit, data, null); } @@ -628,7 +636,7 @@ private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) { if (isTargetBroker(data.dstBroker())) { ServiceUnitStateData next = new ServiceUnitStateData( Owned, data.dstBroker(), data.sourceBroker(), getNextVersionId(data)); - pubAsync(serviceUnit, next) + stateChangeListeners.notifyOnCompletion(pubAsync(serviceUnit, next), serviceUnit, data) .whenComplete((__, e) -> log(e, serviceUnit, data, next)); } } @@ -644,15 +652,15 @@ private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) { next = new ServiceUnitStateData( Free, null, data.sourceBroker(), getNextVersionId(data)); } - closeServiceUnit(serviceUnit) - .thenCompose(__ -> pubAsync(serviceUnit, next)) + stateChangeListeners.notifyOnCompletion(closeServiceUnit(serviceUnit) + .thenCompose(__ -> pubAsync(serviceUnit, next)), serviceUnit, data) .whenComplete((__, e) -> log(e, serviceUnit, data, next)); } } private void handleSplitEvent(String serviceUnit, ServiceUnitStateData data) { if (isTargetBroker(data.sourceBroker())) { - splitServiceUnit(serviceUnit, data) + stateChangeListeners.notifyOnCompletion(splitServiceUnit(serviceUnit, data), serviceUnit, data) .whenComplete((__, e) -> log(e, serviceUnit, data, null)); } } @@ -662,6 +670,7 @@ private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) { if (getOwnerRequest != null) { getOwnerRequest.complete(null); } + stateChangeListeners.notify(serviceUnit, data, null); if (isTargetBroker(data.sourceBroker())) { log(null, serviceUnit, data, null); } @@ -672,6 +681,7 @@ private void handleDeleteEvent(String serviceUnit, ServiceUnitStateData data) { if (getOwnerRequest != null) { getOwnerRequest.completeExceptionally(new IllegalStateException(serviceUnit + "has been deleted.")); } + stateChangeListeners.notify(serviceUnit, data, null); if (isTargetBroker(data.sourceBroker())) { log(null, serviceUnit, data, null); } @@ -682,6 +692,7 @@ private void handleInitEvent(String serviceUnit) { if (getOwnerRequest != null) { getOwnerRequest.complete(null); } + stateChangeListeners.notify(serviceUnit, null, null); log(null, serviceUnit, null, null); } @@ -1302,4 +1313,9 @@ public List getMetrics() { return metrics; } + + @Override + public void listen(StateChangeListener listener) { + this.stateChangeListeners.addListener(listener); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/StateChangeListeners.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/StateChangeListeners.java new file mode 100644 index 0000000000000..1d396f500b648 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/StateChangeListeners.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.channel; + +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener; + +@Slf4j +public class StateChangeListeners { + + private final List stateChangeListeners; + + public StateChangeListeners() { + stateChangeListeners = new CopyOnWriteArrayList<>(); + } + + public void addListener(StateChangeListener listener) { + Objects.requireNonNull(listener); + stateChangeListeners.add(listener); + } + + public void close() { + this.stateChangeListeners.clear(); + } + + /** + * Notify all currently added listeners on completion of the future. + * + * @return future of a new completion stage + */ + public CompletableFuture notifyOnCompletion(CompletableFuture future, + String serviceUnit, + ServiceUnitStateData data) { + return future.whenComplete((r, ex) -> notify(serviceUnit, data, ex)); + } + + public void notify(String serviceUnit, ServiceUnitStateData data, Throwable t) { + stateChangeListeners.forEach(listener -> { + try { + listener.handleEvent(serviceUnit, data, t); + } catch (Throwable ex) { + log.error("StateChangeListener: {} exception while handling {} for service unit {}", + listener, data, serviceUnit, ex); + } + }); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/StateChangeListener.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/StateChangeListener.java new file mode 100644 index 0000000000000..7ba8be8771b91 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/StateChangeListener.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.manager; + +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; + +public interface StateChangeListener { + + /** + * Handle the service unit state change. + * + * @param serviceUnit - Service Unit(Namespace bundle). + * @param data - Service unit state data. + * @param t - Exception, if present. + */ + void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t); +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java new file mode 100644 index 0000000000000..ead6384daba8d --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.manager; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; + +/** + * Unload manager. + */ +@Slf4j +public class UnloadManager implements StateChangeListener { + + private final Map> inFlightUnloadRequest; + + public UnloadManager() { + this.inFlightUnloadRequest = new ConcurrentHashMap<>(); + } + + private void complete(String serviceUnit, Throwable ex) { + inFlightUnloadRequest.computeIfPresent(serviceUnit, (__, future) -> { + if (!future.isDone()) { + if (ex != null) { + future.completeExceptionally(ex); + if (log.isDebugEnabled()) { + log.debug("Complete exceptionally unload bundle: {}", serviceUnit, ex); + } + } else { + future.complete(null); + if (log.isDebugEnabled()) { + log.debug("Complete unload bundle: {}", serviceUnit); + } + } + } + return null; + }); + } + + public CompletableFuture waitAsync(CompletableFuture eventPubFuture, + String bundle, + long timeout, + TimeUnit timeoutUnit) { + + return eventPubFuture.thenCompose(__ -> inFlightUnloadRequest.computeIfAbsent(bundle, ignore -> { + if (log.isDebugEnabled()) { + log.debug("Handle unload bundle: {}, timeout: {} {}", bundle, timeout, timeoutUnit); + } + CompletableFuture future = new CompletableFuture<>(); + future.orTimeout(timeout, timeoutUnit).whenComplete((v, ex) -> { + if (ex != null) { + inFlightUnloadRequest.remove(bundle); + log.warn("Failed to wait unload for serviceUnit: {}", bundle, ex); + } + }); + return future; + })); + } + + @Override + public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) { + ServiceUnitState state = ServiceUnitStateData.state(data); + switch (state) { + case Free, Owned -> this.complete(serviceUnit, t); + default -> { + if (log.isDebugEnabled()) { + log.debug("Handling {} for service unit {}", data, serviceUnit); + } + } + } + } + + public void close() { + inFlightUnloadRequest.forEach((bundle, future) -> { + if (!future.isDone()) { + String msg = String.format("Unloading bundle: %s, but the unload manager already closed.", bundle); + log.warn(msg); + future.completeExceptionally(new IllegalStateException(msg)); + } + }); + inFlightUnloadRequest.clear(); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/package-info.java new file mode 100644 index 0000000000000..ac553c0690068 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.manager; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java index 5cdbd3027104d..e646026978754 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java @@ -31,6 +31,7 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; +import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager; import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.Reflections; @@ -42,6 +43,8 @@ public class UnloadScheduler implements LoadManagerScheduler { private final ScheduledExecutorService loadManagerExecutor; + private final UnloadManager unloadManager; + private final LoadManagerContext context; private final ServiceUnitStateChannel channel; @@ -57,13 +60,16 @@ public class UnloadScheduler implements LoadManagerScheduler { private volatile CompletableFuture currentRunningFuture = null; public UnloadScheduler(ScheduledExecutorService loadManagerExecutor, + UnloadManager unloadManager, LoadManagerContext context, ServiceUnitStateChannel channel) { - this(loadManagerExecutor, context, channel, createNamespaceUnloadStrategy(context.brokerConfiguration())); + this(loadManagerExecutor, unloadManager, context, + channel, createNamespaceUnloadStrategy(context.brokerConfiguration())); } @VisibleForTesting protected UnloadScheduler(ScheduledExecutorService loadManagerExecutor, + UnloadManager unloadManager, LoadManagerContext context, ServiceUnitStateChannel channel, NamespaceUnloadStrategy strategy) { @@ -71,6 +77,7 @@ protected UnloadScheduler(ScheduledExecutorService loadManagerExecutor, this.recentlyUnloadedBundles = new HashMap<>(); this.recentlyUnloadedBrokers = new HashMap<>(); this.loadManagerExecutor = loadManagerExecutor; + this.unloadManager = unloadManager; this.context = context; this.conf = context.brokerConfiguration(); this.channel = channel; @@ -131,9 +138,11 @@ public synchronized void execute() { List> futures = new ArrayList<>(); unloadDecision.getUnloads().forEach((broker, unload) -> { log.info("[{}] Unloading bundle: {}", namespaceUnloadStrategy.getClass().getSimpleName(), unload); - futures.add(channel.publishUnloadEventAsync(unload).thenAccept(__ -> { - recentlyUnloadedBundles.put(unload.serviceUnit(), System.currentTimeMillis()); - recentlyUnloadedBrokers.put(unload.sourceBroker(), System.currentTimeMillis()); + futures.add(unloadManager.waitAsync(channel.publishUnloadEventAsync(unload), unload.serviceUnit(), + conf.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS) + .thenAccept(__ -> { + recentlyUnloadedBundles.put(unload.serviceUnit(), System.currentTimeMillis()); + recentlyUnloadedBrokers.put(unload.sourceBroker(), System.currentTimeMillis()); })); }); return FutureUtil.waitForAll(futures).exceptionally(ex -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 33b15926c3c33..899539e1db6a7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -717,6 +717,14 @@ private Optional> getLeastLoadedFromLoadManager(ServiceUnit } public CompletableFuture unloadNamespaceBundle(NamespaceBundle bundle) { + return unloadNamespaceBundle(bundle, Optional.empty()); + } + + public CompletableFuture unloadNamespaceBundle(NamespaceBundle bundle, Optional destinationBroker) { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { + return ExtensibleLoadManagerImpl.get(loadManager.get()) + .unloadNamespaceBundleAsync(bundle, destinationBroker); + } // unload namespace bundle return unloadNamespaceBundle(bundle, config.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 5484a70e1aad0..321a127ad97bd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -55,6 +55,7 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.resources.BookieResources; @@ -626,14 +627,18 @@ protected NamespaceBundle validateNamespaceBundleOwnership(NamespaceName fqnn, B } } - protected CompletableFuture validateNamespaceBundleOwnershipAsync(NamespaceName fqnn, - BundlesData bundles, String bundleRange, boolean authoritative, boolean readOnly) { + protected CompletableFuture validateNamespaceBundleOwnershipAsync( + NamespaceName fqnn, BundlesData bundles, String bundleRange, + boolean authoritative, boolean readOnly) { NamespaceBundle nsBundle; try { nsBundle = validateNamespaceBundleRange(fqnn, bundles, bundleRange); } catch (WebApplicationException wae) { return CompletableFuture.failedFuture(wae); } + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) { + return CompletableFuture.completedFuture(nsBundle); + } return validateBundleOwnershipAsync(nsBundle, authoritative, readOnly) .thenApply(__ -> nsBundle); } @@ -992,6 +997,10 @@ protected boolean isLeaderBroker() { } protected static boolean isLeaderBroker(PulsarService pulsar) { + // For extensible load manager, it doesn't have leader election service on pulsar broker. + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar.getConfig())) { + return true; + } return pulsar.getLeaderElectionService().isLeader(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index d1bf29725d004..ec82f5c383e2e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -43,10 +43,12 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import com.google.common.collect.Sets; import java.util.LinkedHashMap; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -77,18 +79,22 @@ import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter; +import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder; import org.apache.pulsar.broker.lookup.LookupResult; import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.resources.TenantResources; import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.TableViewImpl; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; @@ -124,10 +130,15 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest { public void setup() throws Exception { conf.setAllowAutoTopicCreation(true); conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); + conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); + conf.setLoadBalancerSheddingEnabled(false); super.internalSetup(conf); pulsar1 = pulsar; ServiceConfiguration defaultConf = getDefaultConf(); + defaultConf.setAllowAutoTopicCreation(true); defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); + defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); + defaultConf.setLoadBalancerSheddingEnabled(false); additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf); pulsar2 = additionalPulsarTestContext.getPulsarService(); @@ -148,6 +159,14 @@ public void setup() throws Exception { channel2 = (ServiceUnitStateChannelImpl) FieldUtils.readField(secondaryLoadManager, "serviceUnitStateChannel", true); + admin.clusters().createCluster(this.conf.getClusterName(), + ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); + admin.tenants().createTenant("public", + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), + Sets.newHashSet(this.conf.getClusterName()))); + admin.namespaces().createNamespace("public/default"); + admin.namespaces().setNamespaceReplicationClusters("public/default", + Sets.newHashSet(this.conf.getClusterName())); } protected void beforePulsarStart(PulsarService pulsar) throws Exception { @@ -307,6 +326,59 @@ public Map filter(Map broker assertTrue(brokerLookupData.isPresent()); } + @Test(timeOut = 30 * 1000) + public void testUnloadAdminAPI() throws Exception { + TopicName topicName = TopicName.get("test-unload"); + NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get(); + + String broker = admin.lookups().lookupTopic(topicName.toString()); + log.info("Assign the bundle {} to {}", bundle, broker); + + checkOwnershipState(broker, bundle); + admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), bundle.getBundleRange()); + assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + + broker = admin.lookups().lookupTopic(topicName.toString()); + log.info("Assign the bundle {} to {}", bundle, broker); + + String dstBrokerUrl = pulsar1.getLookupServiceAddress(); + String dstBrokerServiceUrl; + if (broker.equals(pulsar1.getBrokerServiceUrl())) { + dstBrokerUrl = pulsar2.getLookupServiceAddress(); + dstBrokerServiceUrl = pulsar2.getBrokerServiceUrl(); + } else { + dstBrokerServiceUrl = pulsar1.getBrokerServiceUrl(); + } + checkOwnershipState(broker, bundle); + + admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), bundle.getBundleRange(), dstBrokerUrl); + + assertEquals(admin.lookups().lookupTopic(topicName.toString()), dstBrokerServiceUrl); + + // Test transfer to current broker. + try { + admin.namespaces() + .unloadNamespaceBundle(topicName.getNamespace(), bundle.getBundleRange(), dstBrokerUrl); + fail(); + } catch (PulsarAdminException ex) { + assertTrue(ex.getMessage().contains("cannot be transfer to same broker")); + } + } + + private void checkOwnershipState(String broker, NamespaceBundle bundle) + throws ExecutionException, InterruptedException { + var targetLoadManager = secondaryLoadManager; + var otherLoadManager = primaryLoadManager; + if (broker.equals(pulsar1.getBrokerServiceUrl())) { + targetLoadManager = primaryLoadManager; + otherLoadManager = secondaryLoadManager; + } + assertTrue(targetLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + assertFalse(otherLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + } + + @Test public void testGetMetrics() throws Exception { { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java new file mode 100644 index 0000000000000..75ef913b8a851 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.manager; + +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.VERSION_ID_INIT; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; +import org.apache.pulsar.common.util.FutureUtil; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class UnloadManagerTest { + + @Test + public void testEventPubFutureHasException() { + UnloadManager manager = new UnloadManager(); + CompletableFuture future = + manager.waitAsync(FutureUtil.failedFuture(new Exception("test")), + "bundle-1", 10, TimeUnit.SECONDS); + + assertTrue(future.isCompletedExceptionally()); + try { + future.get(); + fail(); + } catch (Exception ex) { + assertEquals(ex.getCause().getMessage(), "test"); + } + } + + @Test + public void testTimeout() throws IllegalAccessException { + UnloadManager manager = new UnloadManager(); + CompletableFuture future = + manager.waitAsync(CompletableFuture.completedFuture(null), + "bundle-1", 3, TimeUnit.SECONDS); + Map> inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager); + + assertEquals(inFlightUnloadRequestMap.size(), 1); + + try { + future.get(); + fail(); + } catch (Exception ex) { + assertTrue(ex.getCause() instanceof TimeoutException); + } + + assertEquals(inFlightUnloadRequestMap.size(), 0); + } + + @Test + public void testSuccess() throws IllegalAccessException, ExecutionException, InterruptedException { + UnloadManager manager = new UnloadManager(); + CompletableFuture future = + manager.waitAsync(CompletableFuture.completedFuture(null), + "bundle-1", 5, TimeUnit.SECONDS); + Map> inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager); + + assertEquals(inFlightUnloadRequestMap.size(), 1); + + manager.handleEvent("bundle-1", + new ServiceUnitStateData(ServiceUnitState.Assigning, "broker-1", VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequestMap.size(), 1); + + manager.handleEvent("bundle-1", + new ServiceUnitStateData(ServiceUnitState.Deleted, "broker-1", VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequestMap.size(), 1); + + manager.handleEvent("bundle-1", + new ServiceUnitStateData(ServiceUnitState.Splitting, "broker-1", VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequestMap.size(), 1); + + manager.handleEvent("bundle-1", + new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-1", VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequestMap.size(), 1); + + manager.handleEvent("bundle-1", + new ServiceUnitStateData(ServiceUnitState.Init, "broker-1", VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequestMap.size(), 1); + + manager.handleEvent("bundle-1", + new ServiceUnitStateData(ServiceUnitState.Free, "broker-1", VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequestMap.size(), 0); + future.get(); + + // Success with Owned state. + future = manager.waitAsync(CompletableFuture.completedFuture(null), + "bundle-1", 5, TimeUnit.SECONDS); + inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager); + + assertEquals(inFlightUnloadRequestMap.size(), 1); + + manager.handleEvent("bundle-1", + new ServiceUnitStateData(ServiceUnitState.Owned, "broker-1", VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequestMap.size(), 0); + future.get(); + } + + @Test + public void testFailedStage() throws IllegalAccessException { + UnloadManager manager = new UnloadManager(); + CompletableFuture future = + manager.waitAsync(CompletableFuture.completedFuture(null), + "bundle-1", 5, TimeUnit.SECONDS); + Map> inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager); + + assertEquals(inFlightUnloadRequestMap.size(), 1); + + manager.handleEvent("bundle-1", + new ServiceUnitStateData(ServiceUnitState.Owned, "broker-1", VERSION_ID_INIT), + new IllegalStateException("Failed stage.")); + + try { + future.get(); + fail(); + } catch (Exception ex) { + assertTrue(ex.getCause() instanceof IllegalStateException); + assertEquals(ex.getCause().getMessage(), "Failed stage."); + } + + assertEquals(inFlightUnloadRequestMap.size(), 0); + } + + @Test + public void testClose() throws IllegalAccessException { + UnloadManager manager = new UnloadManager(); + CompletableFuture future = + manager.waitAsync(CompletableFuture.completedFuture(null), + "bundle-1", 5, TimeUnit.SECONDS); + Map> inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager); + assertEquals(inFlightUnloadRequestMap.size(), 1); + manager.close(); + assertEquals(inFlightUnloadRequestMap.size(), 0); + + try { + future.get(); + fail(); + } catch (Exception ex) { + assertTrue(ex.getCause() instanceof IllegalStateException); + } + } + + private Map> getInFlightUnloadRequestMap(UnloadManager manager) + throws IllegalAccessException { + Map> inFlightUnloadRequest = + (Map>) FieldUtils.readField(manager, "inFlightUnloadRequest", true); + + return inFlightUnloadRequest; + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java index cda5f81d81b93..73d4eb1f18bfb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.loadbalance.extensions.scheduler; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -31,6 +32,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry; import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; +import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager; import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision; import org.apache.pulsar.client.util.ExecutorProvider; @@ -71,17 +73,20 @@ public void testExecuteSuccess() { LoadManagerContext context = setupContext(); BrokerRegistry registry = context.brokerRegistry(); ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class); + UnloadManager unloadManager = mock(UnloadManager.class); NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class); doReturn(CompletableFuture.completedFuture(true)).when(channel).isChannelOwnerAsync(); doReturn(CompletableFuture.completedFuture(Lists.newArrayList("broker-1", "broker-2"))) .when(registry).getAvailableBrokersAsync(); doReturn(CompletableFuture.completedFuture(null)).when(channel).publishUnloadEventAsync(any()); + doReturn(CompletableFuture.completedFuture(null)).when(unloadManager) + .waitAsync(any(), any(), anyLong(), any()); UnloadDecision decision = new UnloadDecision(); Unload unload = new Unload("broker-1", "bundle-1"); decision.getUnloads().put("broker-1", unload); doReturn(decision).when(unloadStrategy).findBundlesForUnloading(any(), any(), any()); - UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, context, channel, unloadStrategy); + UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, unloadManager, context, channel, unloadStrategy); scheduler.execute(); @@ -101,6 +106,7 @@ public void testExecuteMoreThenOnceWhenFirstNotDone() throws InterruptedExceptio LoadManagerContext context = setupContext(); BrokerRegistry registry = context.brokerRegistry(); ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class); + UnloadManager unloadManager = mock(UnloadManager.class); NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class); doReturn(CompletableFuture.completedFuture(true)).when(channel).isChannelOwnerAsync(); doAnswer(__ -> CompletableFuture.supplyAsync(() -> { @@ -112,7 +118,7 @@ public void testExecuteMoreThenOnceWhenFirstNotDone() throws InterruptedExceptio } return Lists.newArrayList("broker-1", "broker-2"); }, Executors.newFixedThreadPool(1))).when(registry).getAvailableBrokersAsync(); - UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, context, channel, unloadStrategy); + UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, unloadManager, context, channel, unloadStrategy); ExecutorService executorService = Executors.newFixedThreadPool(10); CountDownLatch latch = new CountDownLatch(10); @@ -133,7 +139,8 @@ public void testDisableLoadBalancer() { context.brokerConfiguration().setLoadBalancerEnabled(false); ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class); NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class); - UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, context, channel, unloadStrategy); + UnloadManager unloadManager = mock(UnloadManager.class); + UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, unloadManager, context, channel, unloadStrategy); scheduler.execute(); @@ -152,7 +159,8 @@ public void testNotChannelOwner() { context.brokerConfiguration().setLoadBalancerEnabled(false); ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class); NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class); - UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, context, channel, unloadStrategy); + UnloadManager unloadManager = mock(UnloadManager.class); + UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, unloadManager, context, channel, unloadStrategy); doReturn(CompletableFuture.completedFuture(false)).when(channel).isChannelOwnerAsync(); scheduler.execute();