Skip to content

Commit

Permalink
[bugfix] Prevent Automatic Topic Creation during namespace deletion (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored Sep 15, 2022
1 parent c7b7146 commit af98304
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2766,6 +2766,9 @@ public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadata
if (pulsar.getNamespaceService() == null) {
return FutureUtil.failedFuture(new NamingException("namespace service is not ready"));
}
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesIfCached(topicName.getNamespaceObject());
return pulsar.getNamespaceService().checkTopicExists(topicName)
.thenCompose(topicExists -> {
return fetchPartitionedTopicMetadataAsync(topicName)
Expand All @@ -2780,10 +2783,12 @@ public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadata
if (metadata.partitions == 0
&& !topicExists
&& !topicName.isPartitioned()
&& pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
&& pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
&& pulsar.getBrokerService().isAllowAutoTopicCreation(topicName, policies)
&& pulsar.getBrokerService()
.isDefaultTopicTypePartitioned(topicName, policies)) {

pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName)
pulsar.getBrokerService()
.createDefaultPartitionedTopicAsync(topicName, policies)
.thenAccept(md -> future.complete(md))
.exceptionally(ex -> {
if (ex.getCause()
Expand Down Expand Up @@ -2813,8 +2818,9 @@ public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadata
}

@SuppressWarnings("deprecation")
private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(TopicName topicName) {
final int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName);
private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(TopicName topicName,
Optional<Policies> policies) {
final int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName, policies);
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
checkArgument(defaultNumPartitions > 0,
"Default number of partitions should be more than 0");
Expand Down Expand Up @@ -3000,40 +3006,50 @@ public boolean isAllowAutoTopicCreation(final String topic) {
}

public boolean isAllowAutoTopicCreation(final TopicName topicName) {
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesIfCached(topicName.getNamespaceObject());
return isAllowAutoTopicCreation(topicName, policies);
}

public boolean isAllowAutoTopicCreation(final TopicName topicName, final Optional<Policies> policies) {
if (policies.isPresent() && policies.get().deleted) {
log.info("Preventing AutoTopicCreation on a namespace that is being deleted {}",
topicName.getNamespaceObject());
return false;
}
//System topic can always be created automatically
if (pulsar.getConfiguration().isSystemTopicEnabled() && isSystemTopic(topicName)) {
return true;
}
AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName, policies);
if (autoTopicCreationOverride != null) {
return autoTopicCreationOverride.isAllowAutoTopicCreation();
} else {
return pulsar.getConfiguration().isAllowAutoTopicCreation();
}
}

public boolean isDefaultTopicTypePartitioned(final TopicName topicName) {
AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
public boolean isDefaultTopicTypePartitioned(final TopicName topicName, final Optional<Policies> policies) {
AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName, policies);
if (autoTopicCreationOverride != null) {
return TopicType.PARTITIONED.toString().equals(autoTopicCreationOverride.getTopicType());
} else {
return pulsar.getConfiguration().isDefaultTopicTypePartitioned();
}
}

public int getDefaultNumPartitions(final TopicName topicName) {
AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
public int getDefaultNumPartitions(final TopicName topicName, final Optional<Policies> policies) {
AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName, policies);
if (autoTopicCreationOverride != null) {
return autoTopicCreationOverride.getDefaultNumPartitions();
} else {
return pulsar.getConfiguration().getDefaultNumPartitions();
}
}

private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName) {
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesIfCached(topicName.getNamespaceObject());
private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName,
Optional<Policies> policies) {
// If namespace policies have the field set, it will override the broker-level setting
if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) {
return policies.get().autoTopicCreationOverride;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1846,6 +1846,40 @@ public void testForceDeleteNamespace() throws Exception {
}
}

@Test
public void testForceDeleteNamespaceWithAutomaticTopicCreation() throws Exception {
conf.setForceDeleteNamespaceAllowed(true);
final String namespaceName = "prop-xyz2/ns1";
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("prop-xyz2", tenantInfo);
admin.namespaces().createNamespace(namespaceName, 1);
admin.namespaces().setAutoTopicCreation(namespaceName,
AutoTopicCreationOverride.builder()
.allowAutoTopicCreation(true)
.topicType("partitioned")
.defaultNumPartitions(20)
.build());
final String topic = "persistent://" + namespaceName + "/test" + UUID.randomUUID();

// start a consumer, that creates the topic
try (Consumer<Double> consumer = pulsarClient.newConsumer(Schema.DOUBLE).topic(topic)
.subscriptionName("test").autoUpdatePartitions(true).subscribe()) {

// wait for the consumer to settle
Awaitility.await().ignoreExceptions().untilAsserted(() ->
assertNotNull(admin.topics().getSubscriptions(topic).contains("test")));

// verify that the partitioned topic is created
assertEquals(20, admin.topics().getPartitionedTopicMetadata(topic).partitions);

// the consumer will race with the deletion
// the consumer will try to re-create the partitions
admin.namespaces().deleteNamespace(namespaceName, true);

assertFalse(admin.namespaces().getNamespaces("prop-xyz2").contains("ns1"));
}
}

@Test
public void testUpdateClusterWithProxyUrl() throws Exception {
ClusterData cluster = ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,34 @@

package org.apache.pulsar.broker.admin;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.powermock.reflect.Whitebox;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = "broker-admin")
@Slf4j
public class TopicAutoCreationTest extends ProducerConsumerBase {

@Override
Expand All @@ -43,6 +59,11 @@ protected void setup() throws Exception {
super.producerBaseSetup();
}

@Override
protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
clientBuilder.operationTimeout(2, TimeUnit.SECONDS);
}

@Override
@AfterMethod(alwaysRun = true)
protected void cleanup() throws Exception {
Expand Down Expand Up @@ -85,4 +106,82 @@ public void testPartitionedTopicAutoCreation() throws PulsarAdminException, Puls

producer.close();
}


@Test
public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
throws Exception {
final String namespaceName = "my-property/my-ns";
final String topic = "persistent://" + namespaceName + "/test-partitioned-topi-auto-creation-"
+ UUID.randomUUID().toString();

pulsar.getPulsarResources().getNamespaceResources()
.setPolicies(NamespaceName.get(namespaceName), old -> {
old.deleted = true;
return old;
});


LookupService original = Whitebox.getInternalState(pulsarClient, "lookup");
try {

// we want to skip the "lookup" phase, because it is blocked by the HTTP API
LookupService mockLookup = mock(LookupService.class);
Whitebox.setInternalState(pulsarClient, "lookup", mockLookup);
when(mockLookup.getPartitionedTopicMetadata(any())).thenAnswer(i -> {
return CompletableFuture.completedFuture(new PartitionedTopicMetadata(0));
});
when(mockLookup.getBroker(any())).thenAnswer(i -> {
InetSocketAddress brokerAddress =
new InetSocketAddress(pulsar.getAdvertisedAddress(), pulsar.getBrokerListenPort().get());
return CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress));
});

// Creating a producer and creating a Consumer may trigger automatic topic
// creation, let's try to create a Producer and a Consumer
try (Producer<byte[]> producer = pulsarClient.newProducer()
.sendTimeout(1, TimeUnit.SECONDS)
.topic(topic)
.create();) {
} catch (PulsarClientException.LookupException expected) {
String msg = "Namespace bundle for topic (%s) not served by this instance";
log.info("Expected error", expected);
assertTrue(expected.getMessage().contains(String.format(msg, topic)));
}

try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("test")
.subscribe();) {
} catch (PulsarClientException.LookupException expected) {
String msg = "Namespace bundle for topic (%s) not served by this instance";
log.info("Expected error", expected);
assertTrue(expected.getMessage().contains(String.format(msg, topic)));
}


// verify that the topic does not exist
pulsar.getPulsarResources().getNamespaceResources()
.setPolicies(NamespaceName.get(namespaceName), old -> {
old.deleted = false;
return old;
});

admin.topics().getList(namespaceName).isEmpty();

// create now the topic using auto creation
Whitebox.setInternalState(pulsarClient, "lookup", original);

try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("test")
.subscribe();) {
}

admin.topics().getList(namespaceName).contains(topic);
} finally {
Whitebox.setInternalState(pulsarClient, "lookup", original);
}

}
}

0 comments on commit af98304

Please sign in to comment.