Skip to content

Commit

Permalink
[fix][admin] Fix half deletion when attempt to topic with a incorrect…
Browse files Browse the repository at this point in the history
… API (apache#23002)
  • Loading branch information
poorbarcode authored Jul 10, 2024
1 parent b473a7b commit 1f34497
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,17 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse,
.thenCompose(partitionedMeta -> {
final int numPartitions = partitionedMeta.partitions;
if (numPartitions < 1) {
return CompletableFuture.completedFuture(null);
return pulsar().getNamespaceService().checkNonPartitionedTopicExists(topicName)
.thenApply(exists -> {
if (exists) {
throw new RestException(Response.Status.CONFLICT,
String.format("%s is a non-partitioned topic. Instead of calling"
+ " delete-partitioned-topic please call delete.", topicName));
} else {
throw new RestException(Status.NOT_FOUND,
String.format("Topic %s not found.", topicName));
}
});
}
return internalRemovePartitionsAuthenticationPoliciesAsync()
.thenCompose(unused -> internalRemovePartitionsTopicAsync(numPartitions, force));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,17 @@ public void deleteTopic(
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
internalDeleteTopicAsync(authoritative, force)

getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.partitionedTopicExistsAsync(topicName).thenAccept(exists -> {
if (exists) {
RestException restException = new RestException(Response.Status.CONFLICT,
String.format("%s is a partitioned topic, instead of calling delete topic, please call"
+ " delete-partitioned-topic.", topicName));
resumeAsyncResponseExceptionally(asyncResponse, restException);
return;
}
internalDeleteTopicAsync(authoritative, force)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
Throwable t = FutureUtil.unwrapCompletionException(ex);
Expand All @@ -1186,6 +1196,8 @@ public void deleteTopic(
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
});

}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -32,6 +34,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
Expand All @@ -40,12 +43,14 @@
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand All @@ -71,6 +76,62 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testDeleteNonExistTopic() throws Exception {
// Case 1: call delete for a partitioned topic.
final String topic1 = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
admin.topics().createPartitionedTopic(topic1, 2);
admin.schemas().createSchemaAsync(topic1, Schema.STRING.getSchemaInfo());
Awaitility.await().untilAsserted(() -> {
assertEquals(admin.schemas().getAllSchemas(topic1).size(), 1);
});
try {
admin.topics().delete(topic1);
fail("expected a 409 error");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("please call delete-partitioned-topic"));
}
Awaitility.await().pollDelay(Duration.ofSeconds(2)).untilAsserted(() -> {
assertEquals(admin.schemas().getAllSchemas(topic1).size(), 1);
});
// cleanup.
admin.topics().deletePartitionedTopic(topic1, false);

// Case 2: call delete-partitioned-topi for a non-partitioned topic.
final String topic2 = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
admin.topics().createNonPartitionedTopic(topic2);
admin.schemas().createSchemaAsync(topic2, Schema.STRING.getSchemaInfo());
Awaitility.await().untilAsserted(() -> {
assertEquals(admin.schemas().getAllSchemas(topic2).size(), 1);
});
try {
admin.topics().deletePartitionedTopic(topic2);
fail("expected a 409 error");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("Instead of calling delete-partitioned-topic please call delete"));
}
Awaitility.await().pollDelay(Duration.ofSeconds(2)).untilAsserted(() -> {
assertEquals(admin.schemas().getAllSchemas(topic2).size(), 1);
});
// cleanup.
admin.topics().delete(topic2, false);

// Case 3: delete topic does not exist.
final String topic3 = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
try {
admin.topics().delete(topic3);
fail("expected a 404 error");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("not found"));
}
try {
admin.topics().deletePartitionedTopic(topic3);
fail("expected a 404 error");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("not found"));
}
}

@Test
public void testPeekMessages() throws Exception {
@Cleanup
Expand Down

0 comments on commit 1f34497

Please sign in to comment.