Skip to content

Commit

Permalink
add test case and fix checkstyle
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed Sep 13, 2022
1 parent 46707d7 commit d5bc53f
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2784,9 +2784,11 @@ public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadata
&& !topicExists
&& !topicName.isPartitioned()
&& pulsar.getBrokerService().isAllowAutoTopicCreation(topicName, policies)
&& pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName, policies)) {
&& pulsar.getBrokerService()
.isDefaultTopicTypePartitioned(topicName, policies)) {

pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName, policies)
pulsar.getBrokerService()
.createDefaultPartitionedTopicAsync(topicName, policies)
.thenAccept(md -> future.complete(md))
.exceptionally(ex -> {
if (ex.getCause()
Expand Down Expand Up @@ -3046,7 +3048,8 @@ public int getDefaultNumPartitions(final TopicName topicName, final Optional<Pol
}
}

private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName, Optional<Policies> policies) {
private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName,
Optional<Policies> policies) {
// If namespace policies have the field set, it will override the broker-level setting
if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) {
return policies.get().autoTopicCreationOverride;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,33 @@

package org.apache.pulsar.broker.admin;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.powermock.reflect.Whitebox;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = "broker-admin")
@Slf4j
public class TopicAutoCreationTest extends ProducerConsumerBase {

@Override
Expand All @@ -43,6 +58,11 @@ protected void setup() throws Exception {
super.producerBaseSetup();
}

@Override
protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
clientBuilder.operationTimeout(2, TimeUnit.SECONDS);
}

@Override
@AfterMethod(alwaysRun = true)
protected void cleanup() throws Exception {
Expand Down Expand Up @@ -85,4 +105,76 @@ public void testPartitionedTopicAutoCreation() throws PulsarAdminException, Puls

producer.close();
}


@Test
public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
throws Exception {
final String namespaceName = "my-property/my-ns";
final String topic = "persistent://" + namespaceName + "/test-partitioned-topi-auto-creation-"
+ UUID.randomUUID().toString();

pulsar.getPulsarResources().getNamespaceResources()
.setPolicies(NamespaceName.get(namespaceName), old -> {
old.deleted = true;
return old;
});


LookupService original = Whitebox.getInternalState(pulsarClient, "lookup");
try {

// we want to skip the "lookup" phase, because it is blocked by the HTTP API
LookupService mockLookup = mock(LookupService.class);
Whitebox.setInternalState(pulsarClient, "lookup", mockLookup);
when(mockLookup.getPartitionedTopicMetadata(any())).thenAnswer(i -> {
return CompletableFuture.completedFuture(new PartitionedTopicMetadata(0));
});
when(mockLookup.getBroker(any())).thenAnswer(i -> {
InetSocketAddress brokerAddress =
new InetSocketAddress(pulsar.getAdvertisedAddress(), pulsar.getBrokerListenPort().get());
return CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress));
});

// Creating a producer and creating a Consumer may trigger automatic topic
// creation, let's try to create a Producer and a Consumer
try (Producer<byte[]> producer = pulsarClient.newProducer()
.sendTimeout(1, TimeUnit.SECONDS)
.topic(topic)
.create();) {
} catch (PulsarClientException.LookupException expected) {
}

try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("test")
.subscribe();) {
} catch (PulsarClientException.LookupException expected) {
}


// verify that the topic does not exist
pulsar.getPulsarResources().getNamespaceResources()
.setPolicies(NamespaceName.get(namespaceName), old -> {
old.deleted = false;
return old;
});

admin.topics().getList(namespaceName).isEmpty();

// create now the topic using auto creation
Whitebox.setInternalState(pulsarClient, "lookup", original);

try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("test")
.subscribe();) {
}

admin.topics().getList(namespaceName).contains(topic);
} finally {
Whitebox.setInternalState(pulsarClient, "lookup", original);
}

}
}

0 comments on commit d5bc53f

Please sign in to comment.