Skip to content

Commit

Permalink
Revert: Return empty broker when has broker filter exception
Browse files Browse the repository at this point in the history
  • Loading branch information
Demogorgon314 committed Jan 17, 2023
1 parent 5562d2f commit 51be8fb
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.pulsar.broker.loadbalance.extensions;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -168,18 +170,21 @@ public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnit
public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle) {
BrokerRegistry brokerRegistry = getBrokerRegistry();
return brokerRegistry.getAvailableBrokerLookupDataAsync()
.thenCompose(availableBrokerCandidates -> {
.thenCompose(availableBrokers -> {
// TODO: Support isolation policies
LoadManagerContext context = this.getContext();

Map<String, BrokerLookupData> availableBrokerCandidates = new HashMap<>(availableBrokers);

// Filter out brokers that do not meet the rules.
List<BrokerFilter> filterPipeline = getBrokerFilterPipeline();
for (final BrokerFilter filter : filterPipeline) {
try {
filter.filter(availableBrokerCandidates, context);
} catch (BrokerFilterException e) {
// TODO: We may need to revisit this error case.
log.error("Failed to filter out brokers.", e);
return CompletableFuture.completedFuture(Optional.empty());
availableBrokerCandidates = availableBrokers;
}
}
if (availableBrokerCandidates.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -60,7 +59,6 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -267,13 +265,8 @@ public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> broker
}
})).when(primaryLoadManager).getBrokerFilterPipeline();

try {
primaryLoadManager.assign(Optional.empty(), bundle).get();
fail();
} catch (Exception ex) {
log.info("Assign bundle: {} failed.", bundle, ex);
assertTrue(FutureUtil.unwrapCompletionException(ex) instanceof IllegalStateException);
}
Optional<BrokerLookupData> brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle).get();
assertTrue(brokerLookupData.isPresent());
}

private static void cleanTableView(ServiceUnitStateChannel channel)
Expand Down

0 comments on commit 51be8fb

Please sign in to comment.