From a1405ea006f175b1bd0b9d28b9444d592fb4a010 Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Fri, 1 Sep 2023 18:07:46 +0800 Subject: [PATCH] [fix][broker] Fix deleting topic not delete the related topic policy and schema. (#21093) Fixes #21075 ### Motivation When the topic is loaded, it will delete the topic-level policy if it is enabled. But if the topic is not loaded, it will directly delete through managed ledger factory. But then we will leave the topic policy there. When the topic is created next time, it will use the old topic policy ### Modifications When deleting the topic, delete the schema and topic policies even if the topic is not loaded. --- .../pulsar/broker/service/AbstractTopic.java | 17 +----- .../pulsar/broker/service/BrokerService.java | 55 ++++++++++++++----- .../service/BrokerBkEnsemblesTests.java | 32 +++++++++-- .../systopic/PartitionedSystemTopicTest.java | 25 +++++++++ 4 files changed, 94 insertions(+), 35 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index b15f8cbf0b848..cef2dd2080cf0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -58,7 +58,6 @@ import org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException; import org.apache.pulsar.broker.service.plugin.EntryFilter; -import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; @@ -674,21 +673,7 @@ private boolean allowAutoUpdateSchema() { @Override public CompletableFuture deleteSchema() { - String id = getSchemaId(); - SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService(); - return BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id)) - .thenCompose(schema -> { - if (schema != null) { - // It's different from `SchemasResource.deleteSchema` - // because when we delete a topic, the schema - // history is meaningless. But when we delete a schema of a topic, a new schema could be - // registered in the future. - log.info("Delete schema storage of id: {}", id); - return schemaRegistryService.deleteSchemaStorage(id); - } else { - return CompletableFuture.completedFuture(null); - } - }); + return brokerService.deleteSchema(TopicName.get(getName())); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 199901b090251..391affef1da9b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -119,6 +119,8 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.SystemTopic; import org.apache.pulsar.broker.service.plugin.EntryFilterProvider; +import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage; +import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge; import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; @@ -159,6 +161,7 @@ import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; +import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FieldParser; import org.apache.pulsar.common.util.FutureUtil; @@ -1156,26 +1159,33 @@ private CompletableFuture deleteTopicInternal(String topic, boolean forceD CompletableFuture future = new CompletableFuture<>(); CompletableFuture deleteTopicAuthenticationFuture = new CompletableFuture<>(); deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5); - - deleteTopicAuthenticationFuture.whenComplete((v, ex) -> { + deleteTopicAuthenticationFuture + .thenCompose(__ -> deleteSchema(tn)) + .thenCompose(__ -> { + if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic) + && getPulsar().getConfiguration().isSystemTopicEnabled()) { + return deleteTopicPolicies(tn); + } + return CompletableFuture.completedFuture(null); + }).whenComplete((v, ex) -> { if (ex != null) { future.completeExceptionally(ex); return; } CompletableFuture mlConfigFuture = getManagedLedgerConfig(topicName); managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(), - mlConfigFuture, new DeleteLedgerCallback() { - @Override - public void deleteLedgerComplete(Object ctx) { - future.complete(null); - } + mlConfigFuture, new DeleteLedgerCallback() { + @Override + public void deleteLedgerComplete(Object ctx) { + future.complete(null); + } - @Override - public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { - future.completeExceptionally(exception); - } - }, null); - }); + @Override + public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null); + }); return future; } @@ -3451,6 +3461,25 @@ public CompletableFuture deleteTopicPolicies(TopicName topicName) { .deleteTopicPoliciesAsync(TopicName.get(topicName.getPartitionedTopicName())); } + CompletableFuture deleteSchema(TopicName topicName) { + String base = topicName.getPartitionedTopicName(); + String id = TopicName.get(base).getSchemaName(); + SchemaRegistryService schemaRegistryService = getPulsar().getSchemaRegistryService(); + return BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id)) + .thenCompose(schema -> { + if (schema != null) { + // It's different from `SchemasResource.deleteSchema` + // because when we delete a topic, the schema + // history is meaningless. But when we delete a schema of a topic, a new schema could be + // registered in the future. + log.info("Delete schema storage of id: {}", id); + return getPulsar().getSchemaRegistryService().deleteSchemaStorage(id); + } else { + return CompletableFuture.completedFuture(null); + } + }); + } + private CompletableFuture checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions) { return pulsar.getPulsarResources().getNamespaceResources() .getPoliciesAsync(topicName.getNamespaceObject()) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index 9f19bda3647f3..40649a4164047 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -21,8 +21,8 @@ import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.fail; - import java.lang.reflect.Field; import java.util.Map.Entry; import java.util.NavigableMap; @@ -31,10 +31,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; - import com.google.common.collect.Sets; import lombok.Cleanup; - import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -47,6 +45,7 @@ import org.apache.bookkeeper.util.StringUtils; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; @@ -497,10 +496,31 @@ public void testDeleteTopicWithMissingData() throws Exception { // Expected } - // Deletion must succeed - admin.topics().delete(topic); + assertThrows(PulsarAdminException.ServerSideErrorException.class, () -> admin.topics().delete(topic)); + } + + @Test + public void testDeleteTopicWithoutTopicLoaded() throws Exception { + String namespace = BrokerTestUtil.newUniqueName("prop/usc"); + admin.namespaces().createNamespace(namespace); + + String topic = BrokerTestUtil.newUniqueName(namespace + "/my-topic"); + + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .statsInterval(0, TimeUnit.SECONDS) + .build(); - // Topic will not be there after + @Cleanup + Producer producer = client.newProducer(Schema.STRING) + .topic(topic) + .create(); + + producer.close(); + admin.topics().unload(topic); + + admin.topics().delete(topic); assertEquals(pulsar.getBrokerService().getTopicIfExists(topic).join(), Optional.empty()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index 008c2143a3566..34e7dfe92e5d7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.systopic; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; import com.google.common.collect.Sets; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -37,6 +39,7 @@ import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.service.schema.SchemaRegistry; import org.apache.pulsar.client.admin.ListTopicsOptions; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; @@ -55,6 +58,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; @@ -299,4 +303,25 @@ public void testSystemTopicNotCheckExceed() throws Exception { writer1.get().close(); writer2.get().close(); } + + @Test + public void testDeleteTopicSchemaAndPolicyWhenTopicIsNotLoaded() throws Exception { + final String ns = "prop/ns-test"; + admin.namespaces().createNamespace(ns, 2); + final String topicName = "persistent://prop/ns-test/testDeleteTopicSchemaAndPolicyWhenTopicIsNotLoaded"; + admin.topics().createNonPartitionedTopic(topicName); + pulsarClient.newProducer(Schema.STRING).topic(topicName).create().close(); + admin.topicPolicies().setMaxConsumers(topicName, 2); + Awaitility.await().untilAsserted(() -> assertEquals(admin.topicPolicies().getMaxConsumers(topicName), 2)); + CompletableFuture> topic = pulsar.getBrokerService().getTopic(topicName, false); + PersistentTopic persistentTopic = (PersistentTopic) topic.join().get(); + persistentTopic.close(); + admin.topics().delete(topicName); + TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPoliciesIfExists(TopicName.get(topicName)); + assertNull(topicPolicies); + String base = TopicName.get(topicName).getPartitionedTopicName(); + String id = TopicName.get(base).getSchemaName(); + CompletableFuture schema = pulsar.getSchemaRegistryService().getSchema(id); + assertNull(schema.join()); + } }