From f72b25faddcae6a790c31d8292b27c5ce52d3966 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Tue, 16 May 2023 17:31:58 -0700 Subject: [PATCH] Revert "replaced optional" This reverts commit 2e8eb09db783948e94a276e36caf874094edb967. --- .../extensions/ExtensibleLoadManagerImpl.java | 9 ++++----- .../extensions/channel/ServiceUnitStateChannelImpl.java | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index cbaed8ee8f94f8..3c7fdf060c386c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -26,7 +26,6 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -381,19 +380,19 @@ public CompletableFuture> assign(Optional> selectAsync(ServiceUnitId bundle) { - return selectAsync(bundle, Collections.emptySet()); + return selectAsync(bundle, Optional.empty()); } public CompletableFuture> selectAsync(ServiceUnitId bundle, - Set excludeBrokerSet) { + Optional> excludeBrokerSet) { BrokerRegistry brokerRegistry = getBrokerRegistry(); return brokerRegistry.getAvailableBrokerLookupDataAsync() .thenCompose(availableBrokers -> { LoadManagerContext context = this.getContext(); Map availableBrokerCandidates = new HashMap<>(availableBrokers); - if (!excludeBrokerSet.isEmpty()) { - for (String exclude : excludeBrokerSet) { + if (excludeBrokerSet.isPresent()) { + for (String exclude : excludeBrokerSet.get()) { availableBrokerCandidates.remove(exclude); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index f3a308add9ef1d..3d385a47fd34bc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -1265,7 +1265,7 @@ private synchronized void doCleanup(String broker) { private Optional selectBroker(String serviceUnit, String inactiveBroker) { try { - return loadManager.selectAsync(getNamespaceBundle(serviceUnit), Set.of(inactiveBroker)) + return loadManager.selectAsync(getNamespaceBundle(serviceUnit), Optional.of(Set.of(inactiveBroker))) .get(inFlightStateWaitingTimeInMillis, MILLISECONDS); } catch (Throwable e) { log.error("Failed to select a broker for serviceUnit:{}", serviceUnit);