From 35b12f7f0dc4d328897f11ef8b6e7f9662ff6c4c Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 26 Mar 2024 15:04:01 +0800 Subject: [PATCH] [fix][broker] Fast fix infinite HTTP call getSubscriptions caused by wrong topicName --- .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 2 +- ...TopicNameForInfiniteHttpCallGetSubscriptionsTest.java | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index dbcfd734a375c..6e537900f398f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2222,7 +2222,7 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su getPartitionedTopicMetadataAsync(topicName, authoritative, allowAutoTopicCreation)) .thenAccept(partitionMetadata -> { final int numPartitions = partitionMetadata.partitions; - if (numPartitions > 0) { + if (partitionMetadata.partitions > 0 && !isUnexpectedTopicName(partitionMetadata)) { final CompletableFuture future = new CompletableFuture<>(); final AtomicInteger count = new AtomicInteger(numPartitions); final AtomicInteger failureCount = new AtomicInteger(0); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java index 6d3806f312ec6..25eee62609f2b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java @@ -52,7 +52,7 @@ protected void cleanup() throws Exception { } @Test - public void testInfiniteHttpCallGetSubscriptions() throws Exception { + public void testInfiniteHttpCallGetOrCreateSubscriptions() throws Exception { final String randomStr = UUID.randomUUID().toString().replaceAll("-", ""); final String partitionedTopicName = "persistent://my-property/my-ns/tp1_" + randomStr; final String topic_p0 = partitionedTopicName + TopicName.PARTITIONED_TOPIC_SUFFIX + "0"; @@ -64,6 +64,7 @@ public void testInfiniteHttpCallGetSubscriptions() throws Exception { // Do test. ProducerAndConsumerEntry pcEntry = triggerDLQCreated(topic_p0, topicDLQ, subscriptionName); admin.topics().getSubscriptions(topicDLQ); + admin.topics().createSubscription(topicDLQ, "s1", MessageId.earliest); // cleanup. pcEntry.consumer.close(); @@ -72,7 +73,7 @@ public void testInfiniteHttpCallGetSubscriptions() throws Exception { } @Test - public void testInfiniteHttpCallGetSubscriptions2() throws Exception { + public void testInfiniteHttpCallGetOrCreateSubscriptions2() throws Exception { final String randomStr = UUID.randomUUID().toString().replaceAll("-", ""); final String topicName = "persistent://my-property/my-ns/tp1_" + randomStr + "-partition-0-abc"; Producer producer = pulsarClient.newProducer(Schema.STRING) @@ -81,13 +82,14 @@ public void testInfiniteHttpCallGetSubscriptions2() throws Exception { // Do test. admin.topics().getSubscriptions(topicName); + admin.topics().createSubscription(topicName, "s1", MessageId.earliest); // cleanup. producer.close(); } @Test - public void testInfiniteHttpCallGetSubscriptions3() throws Exception { + public void testInfiniteHttpCallGetOrCreateSubscriptions3() throws Exception { final String randomStr = UUID.randomUUID().toString().replaceAll("-", ""); final String topicName = "persistent://my-property/my-ns/tp1_" + randomStr + "-partition-0"; Producer producer = pulsarClient.newProducer(Schema.STRING) @@ -96,6 +98,7 @@ public void testInfiniteHttpCallGetSubscriptions3() throws Exception { // Do test. admin.topics().getSubscriptions(topicName); + admin.topics().createSubscription(topicName, "s1", MessageId.earliest); // cleanup. producer.close();