diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 7c50d51d129736..77cc8f11ff5535 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2784,9 +2784,11 @@ public CompletableFuture fetchPartitionedTopicMetadata && !topicExists && !topicName.isPartitioned() && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName, policies) - && pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName, policies)) { + && pulsar.getBrokerService() + .isDefaultTopicTypePartitioned(topicName, policies)) { - pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName, policies) + pulsar.getBrokerService() + .createDefaultPartitionedTopicAsync(topicName, policies) .thenAccept(md -> future.complete(md)) .exceptionally(ex -> { if (ex.getCause() @@ -3046,7 +3048,8 @@ public int getDefaultNumPartitions(final TopicName topicName, final Optional policies) { + private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName, + Optional policies) { // If namespace policies have the field set, it will override the broker-level setting if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) { return policies.get().autoTopicCreationOverride; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java index 09335d43302f2a..ab2de8820439f2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java @@ -19,18 +19,33 @@ package org.apache.pulsar.broker.admin; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import java.net.InetSocketAddress; import java.util.List; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.LookupService; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.powermock.reflect.Whitebox; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Test(groups = "broker-admin") +@Slf4j public class TopicAutoCreationTest extends ProducerConsumerBase { @Override @@ -43,6 +58,11 @@ protected void setup() throws Exception { super.producerBaseSetup(); } + @Override + protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) { + clientBuilder.operationTimeout(2, TimeUnit.SECONDS); + } + @Override @AfterMethod(alwaysRun = true) protected void cleanup() throws Exception { @@ -85,4 +105,76 @@ public void testPartitionedTopicAutoCreation() throws PulsarAdminException, Puls producer.close(); } + + + @Test + public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion() + throws Exception { + final String namespaceName = "my-property/my-ns"; + final String topic = "persistent://" + namespaceName + "/test-partitioned-topi-auto-creation-" + + UUID.randomUUID().toString(); + + pulsar.getPulsarResources().getNamespaceResources() + .setPolicies(NamespaceName.get(namespaceName), old -> { + old.deleted = true; + return old; + }); + + + LookupService original = Whitebox.getInternalState(pulsarClient, "lookup"); + try { + + // we want to skip the "lookup" phase, because it is blocked by the HTTP API + LookupService mockLookup = mock(LookupService.class); + Whitebox.setInternalState(pulsarClient, "lookup", mockLookup); + when(mockLookup.getPartitionedTopicMetadata(any())).thenAnswer(i -> { + return CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)); + }); + when(mockLookup.getBroker(any())).thenAnswer(i -> { + InetSocketAddress brokerAddress = + new InetSocketAddress(pulsar.getAdvertisedAddress(), pulsar.getBrokerListenPort().get()); + return CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress)); + }); + + // Creating a producer and creating a Consumer may trigger automatic topic + // creation, let's try to create a Producer and a Consumer + try (Producer producer = pulsarClient.newProducer() + .sendTimeout(1, TimeUnit.SECONDS) + .topic(topic) + .create();) { + } catch (PulsarClientException.LookupException expected) { + } + + try (Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("test") + .subscribe();) { + } catch (PulsarClientException.LookupException expected) { + } + + + // verify that the topic does not exist + pulsar.getPulsarResources().getNamespaceResources() + .setPolicies(NamespaceName.get(namespaceName), old -> { + old.deleted = false; + return old; + }); + + admin.topics().getList(namespaceName).isEmpty(); + + // create now the topic using auto creation + Whitebox.setInternalState(pulsarClient, "lookup", original); + + try (Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("test") + .subscribe();) { + } + + admin.topics().getList(namespaceName).contains(topic); + } finally { + Whitebox.setInternalState(pulsarClient, "lookup", original); + } + + } }