Skip to content

Commit

Permalink
[fix][broker] Fix redirect loop when using ExtensibleLoadManager and …
Browse files Browse the repository at this point in the history
…list in bundle admin API (#20528)

PIP: #16691

When using `ExtensibleLoadManager` and list in bundle admin API,
it will redirect forever because `isServiceUnitOwned` method is checking the `ownershipCache` as the ownership storage, however, when using `ExtensibleLoadManager`, it stored the ownership to table view.

* Call `isServiceUnitOwnedAsync ` when using `isServiceUnitOwned `.
* Add unit test to cover this case.
  • Loading branch information
Demogorgon314 authored and Technoboy- committed Jun 14, 2023
1 parent 692c182 commit 52e57a6
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1102,19 +1102,7 @@ public Set<NamespaceBundle> getOwnedServiceUnits() {
}

public boolean isServiceUnitOwned(ServiceUnitId suName) throws Exception {
if (suName instanceof TopicName) {
return isTopicOwnedAsync((TopicName) suName).get();
}

if (suName instanceof NamespaceName) {
return isNamespaceOwned((NamespaceName) suName);
}

if (suName instanceof NamespaceBundle) {
return ownershipCache.isNamespaceBundleOwned((NamespaceBundle) suName);
}

throw new IllegalArgumentException("Invalid class of NamespaceBundle: " + suName.getClass().getName());
return isServiceUnitOwnedAsync(suName).get(config.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
}

public CompletableFuture<Boolean> isServiceUnitOwnedAsync(ServiceUnitId suName) {
Expand Down Expand Up @@ -1168,10 +1156,6 @@ public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName topicName)
});
}

private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception {
return ownershipCache.getOwnedBundle(getFullBundle(fqnn)) != null;
}

private CompletableFuture<Boolean> isNamespaceOwnedAsync(NamespaceName fqnn) {
// TODO: Add unit tests cover it.
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Sets;
import java.util.LinkedHashMap;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -63,7 +64,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand Down Expand Up @@ -927,6 +927,43 @@ public void testDisableBroker() throws Exception {
}
}

@Test(timeOut = 30 * 1000)
public void testListTopic() throws Exception {
final String namespace = "public/testListTopic";
admin.namespaces().createNamespace(namespace, 3);

final String persistentTopicName = TopicName.get(
"persistent", NamespaceName.get(namespace),
"get_topics_mode_" + UUID.randomUUID()).toString();

final String nonPersistentTopicName = TopicName.get(
"non-persistent", NamespaceName.get(namespace),
"get_topics_mode_" + UUID.randomUUID()).toString();
admin.topics().createPartitionedTopic(persistentTopicName, 3);
admin.topics().createPartitionedTopic(nonPersistentTopicName, 3);
pulsarClient.newProducer().topic(persistentTopicName).create().close();
pulsarClient.newProducer().topic(nonPersistentTopicName).create().close();

BundlesData bundlesData = admin.namespaces().getBundles(namespace);
List<String> boundaries = bundlesData.getBoundaries();
int topicNum = 0;
for (int i = 0; i < boundaries.size() - 1; i++) {
String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1));
List<String> topic = admin.topics().getListInBundle(namespace, bundle);
if (topic == null) {
continue;
}
topicNum += topic.size();
for (String s : topic) {
assertFalse(TopicName.get(s).isPersistent());
}
}
assertEquals(topicNum, 3);

List<String> list = admin.topics().getList(namespace);
assertEquals(list.size(), 6);
}

private static abstract class MockBrokerFilter implements BrokerFilter {

@Override
Expand Down

0 comments on commit 52e57a6

Please sign in to comment.