diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index c93185e78f75b5..104a84d041d8a2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -556,16 +556,7 @@ protected CompletableFuture internalGetPartitionedMeta } else { // If it does not exist, response a Not Found error. // Otherwise, response a non-partitioned metadata. - if (topicName.isPersistent()) { - return internalCheckTopicExists(topicName).thenApply(__ -> metadata); - } else { - // Regarding non-persistent topic, we do not know whether it exists or not. - // Just return a non-partitioned metadata if partitioned metadata does not - // exist. - // Broker will respond a not found error when doing subscribing or producing if - // broker not allow to auto create topics. - return CompletableFuture.completedFuture(metadata); - } + return internalCheckTopicExists(topicName).thenApply(__ -> metadata); } }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java index 147960ff03dde3..51f643d2b78230 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.Semaphore; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -38,6 +39,7 @@ import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -325,10 +327,8 @@ public Object[][] autoCreationParamsNotAllow(){ {true, false, false}, {false, false, true}, {false, false, false}, - // These test cases are for the following PR. - // Which was described in the Motivation of https://github.com/apache/pulsar/pull/22206. - //{false, true, true}, - //{false, true, false}, + {false, true, true}, + {false, true, false}, }; } @@ -336,6 +336,11 @@ public Object[][] autoCreationParamsNotAllow(){ public void testGetMetadataIfNotAllowedCreate(boolean configAllowAutoTopicCreation, boolean paramMetadataAutoCreationEnabled, boolean isUsingHttpLookup) throws Exception { + if (!configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) { + // These test cases are for the following PR. + // Which was described in the Motivation of https://github.com/apache/pulsar/pull/22206. + return; + } conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED); conf.setDefaultNumPartitions(3); conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation); @@ -379,7 +384,34 @@ public void testGetMetadataIfNotAllowedCreate(boolean configAllowAutoTopicCreati client.close(); } - @Test(dataProvider = "autoCreationParamsNotAllow") + @DataProvider(name = "autoCreationParamsForNonPersistentTopic") + public Object[][] autoCreationParamsForNonPersistentTopic(){ + return new Object[][]{ + // configAllowAutoTopicCreation, paramCreateIfAutoCreationEnabled, isUsingHttpLookup. + {true, true, true}, + {true, true, false}, + {false, true, true}, + {false, true, false}, + {false, false, true} + }; + } + + /** + * Regarding the API "get partitioned metadata" about non-persistent topic. + * The original behavior is: + * param-auto-create = true, broker-config-auto-create = true + * HTTP API: default configuration {@link ServiceConfiguration#getDefaultNumPartitions()} + * binary API: default configuration {@link ServiceConfiguration#getDefaultNumPartitions()} + * param-auto-create = true, broker-config-auto-create = false + * HTTP API: {partitions: 0} + * binary API: {partitions: 0} + * param-auto-create = false + * HTTP API: not found error + * binary API: not support + * This test only guarantees that the behavior is the same as before. The following separated PR will fix the + * incorrect behavior. + */ + @Test(dataProvider = "autoCreationParamsForNonPersistentTopic") public void testGetNonPersistentMetadataIfNotAllowedCreate(boolean configAllowAutoTopicCreation, boolean paramMetadataAutoCreationEnabled, boolean isUsingHttpLookup) throws Exception { @@ -399,17 +431,34 @@ public void testGetNonPersistentMetadataIfNotAllowedCreate(boolean configAllowAu // Regarding non-persistent topic, we do not know whether it exists or not. // Broker will return a non-partitioned metadata if partitioned metadata does not exist. PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); + + if (!configAllowAutoTopicCreation && !paramMetadataAutoCreationEnabled && isUsingHttpLookup) { + try { + lookup.getPartitionedTopicMetadata(TopicName.get(topicNameStr), paramMetadataAutoCreationEnabled) + .join(); + Assert.fail("Expected a not found ex"); + } catch (Exception ex) { + // Cleanup. + client.close(); + return; + } + } + PartitionedTopicMetadata metadata = lookup .getPartitionedTopicMetadata(TopicName.get(topicNameStr), paramMetadataAutoCreationEnabled).join(); - assertEquals(metadata.partitions, 0); + if (configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) { + assertEquals(metadata.partitions, 3); + } else { + assertEquals(metadata.partitions, 0); + } List partitionedTopics = admin.topics().getPartitionedTopicList("public/default"); - pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().partitionedTopicExists(topicName); - assertFalse(partitionedTopics.contains(topicNameStr)); - List topicList = admin.topics().getList("public/default"); - assertFalse(topicList.contains(topicNameStr)); - for (int i = 0; i < 3; i++) { - assertFalse(topicList.contains(topicName.getPartition(i))); + pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() + .partitionedTopicExists(topicName); + if (configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) { + assertTrue(partitionedTopics.contains(topicNameStr)); + } else { + assertFalse(partitionedTopics.contains(topicNameStr)); } // Verify: lookup semaphore has been releases.