Skip to content

Commit

Permalink
guarantees the behavior of non-persistent topic is the same as before
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed May 22, 2024
1 parent 6211a2d commit f7a4805
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -556,16 +556,7 @@ protected CompletableFuture<PartitionedTopicMetadata> internalGetPartitionedMeta
} else {
// If it does not exist, response a Not Found error.
// Otherwise, response a non-partitioned metadata.
if (topicName.isPersistent()) {
return internalCheckTopicExists(topicName).thenApply(__ -> metadata);
} else {
// Regarding non-persistent topic, we do not know whether it exists or not.
// Just return a non-partitioned metadata if partitioned metadata does not
// exist.
// Broker will respond a not found error when doing subscribing or producing if
// broker not allow to auto create topics.
return CompletableFuture.completedFuture(metadata);
}
return internalCheckTopicExists(topicName).thenApply(__ -> metadata);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.Semaphore;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
Expand All @@ -38,6 +39,7 @@
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -325,17 +327,20 @@ public Object[][] autoCreationParamsNotAllow(){
{true, false, false},
{false, false, true},
{false, false, false},
// These test cases are for the following PR.
// Which was described in the Motivation of https://github.com/apache/pulsar/pull/22206.
//{false, true, true},
//{false, true, false},
{false, true, true},
{false, true, false},
};
}

@Test(dataProvider = "autoCreationParamsNotAllow")
public void testGetMetadataIfNotAllowedCreate(boolean configAllowAutoTopicCreation,
boolean paramMetadataAutoCreationEnabled,
boolean isUsingHttpLookup) throws Exception {
if (!configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) {
// These test cases are for the following PR.
// Which was described in the Motivation of https://github.com/apache/pulsar/pull/22206.
return;
}
conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
conf.setDefaultNumPartitions(3);
conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation);
Expand Down Expand Up @@ -379,7 +384,34 @@ public void testGetMetadataIfNotAllowedCreate(boolean configAllowAutoTopicCreati
client.close();
}

@Test(dataProvider = "autoCreationParamsNotAllow")
@DataProvider(name = "autoCreationParamsForNonPersistentTopic")
public Object[][] autoCreationParamsForNonPersistentTopic(){
return new Object[][]{
// configAllowAutoTopicCreation, paramCreateIfAutoCreationEnabled, isUsingHttpLookup.
{true, true, true},
{true, true, false},
{false, true, true},
{false, true, false},
{false, false, true}
};
}

/**
* Regarding the API "get partitioned metadata" about non-persistent topic.
* The original behavior is:
* param-auto-create = true, broker-config-auto-create = true
* HTTP API: default configuration {@link ServiceConfiguration#getDefaultNumPartitions()}
* binary API: default configuration {@link ServiceConfiguration#getDefaultNumPartitions()}
* param-auto-create = true, broker-config-auto-create = false
* HTTP API: {partitions: 0}
* binary API: {partitions: 0}
* param-auto-create = false
* HTTP API: not found error
* binary API: not support
* This test only guarantees that the behavior is the same as before. The following separated PR will fix the
* incorrect behavior.
*/
@Test(dataProvider = "autoCreationParamsForNonPersistentTopic")
public void testGetNonPersistentMetadataIfNotAllowedCreate(boolean configAllowAutoTopicCreation,
boolean paramMetadataAutoCreationEnabled,
boolean isUsingHttpLookup) throws Exception {
Expand All @@ -399,17 +431,34 @@ public void testGetNonPersistentMetadataIfNotAllowedCreate(boolean configAllowAu
// Regarding non-persistent topic, we do not know whether it exists or not.
// Broker will return a non-partitioned metadata if partitioned metadata does not exist.
PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();

if (!configAllowAutoTopicCreation && !paramMetadataAutoCreationEnabled && isUsingHttpLookup) {
try {
lookup.getPartitionedTopicMetadata(TopicName.get(topicNameStr), paramMetadataAutoCreationEnabled)
.join();
Assert.fail("Expected a not found ex");
} catch (Exception ex) {
// Cleanup.
client.close();
return;
}
}

PartitionedTopicMetadata metadata = lookup
.getPartitionedTopicMetadata(TopicName.get(topicNameStr), paramMetadataAutoCreationEnabled).join();
assertEquals(metadata.partitions, 0);
if (configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) {
assertEquals(metadata.partitions, 3);
} else {
assertEquals(metadata.partitions, 0);
}

List<String> partitionedTopics = admin.topics().getPartitionedTopicList("public/default");
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().partitionedTopicExists(topicName);
assertFalse(partitionedTopics.contains(topicNameStr));
List<String> topicList = admin.topics().getList("public/default");
assertFalse(topicList.contains(topicNameStr));
for (int i = 0; i < 3; i++) {
assertFalse(topicList.contains(topicName.getPartition(i)));
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.partitionedTopicExists(topicName);
if (configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) {
assertTrue(partitionedTopics.contains(topicNameStr));
} else {
assertFalse(partitionedTopics.contains(topicNameStr));
}

// Verify: lookup semaphore has been releases.
Expand Down

0 comments on commit f7a4805

Please sign in to comment.