From 51be8fbb849fc01ba978f0d00464af967c869180 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Tue, 17 Jan 2023 16:53:48 +0800 Subject: [PATCH] Revert: Return empty broker when has broker filter exception --- .../extensions/ExtensibleLoadManagerImpl.java | 9 +++++++-- .../extensions/ExtensibleLoadManagerImplTest.java | 11 ++--------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 1aa46f9dd76f2..6d5f2be041694 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -19,7 +19,9 @@ package org.apache.pulsar.broker.loadbalance.extensions; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; @@ -168,18 +170,21 @@ public CompletableFuture> assign(Optional> selectAsync(ServiceUnitId bundle) { BrokerRegistry brokerRegistry = getBrokerRegistry(); return brokerRegistry.getAvailableBrokerLookupDataAsync() - .thenCompose(availableBrokerCandidates -> { + .thenCompose(availableBrokers -> { // TODO: Support isolation policies LoadManagerContext context = this.getContext(); + Map availableBrokerCandidates = new HashMap<>(availableBrokers); + // Filter out brokers that do not meet the rules. List filterPipeline = getBrokerFilterPipeline(); for (final BrokerFilter filter : filterPipeline) { try { filter.filter(availableBrokerCandidates, context); } catch (BrokerFilterException e) { + // TODO: We may need to revisit this error case. log.error("Failed to filter out brokers.", e); - return CompletableFuture.completedFuture(Optional.empty()); + availableBrokerCandidates = availableBrokers; } } if (availableBrokerCandidates.isEmpty()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 9314075c18405..d53d2a708196e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -26,7 +26,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; @@ -60,7 +59,6 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; @@ -267,13 +265,8 @@ public Map filter(Map broker } })).when(primaryLoadManager).getBrokerFilterPipeline(); - try { - primaryLoadManager.assign(Optional.empty(), bundle).get(); - fail(); - } catch (Exception ex) { - log.info("Assign bundle: {} failed.", bundle, ex); - assertTrue(FutureUtil.unwrapCompletionException(ex) instanceof IllegalStateException); - } + Optional brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle).get(); + assertTrue(brokerLookupData.isPresent()); } private static void cleanTableView(ServiceUnitStateChannel channel)