Skip to content

Commit

Permalink
[fix][broker] Fix deleting topic not delete the related topic policy …
Browse files Browse the repository at this point in the history
…and schema. (apache#21093)

Fixes apache#21075 

### Motivation

When the topic is loaded, it will delete the topic-level policy if it is enabled. But if the topic is not loaded, it will directly delete through managed ledger factory. But then we will leave the topic policy there. When the topic is created next time, it will use the old topic policy

### Modifications

When deleting the topic, delete the schema and topic policies even if the topic is not loaded.
  • Loading branch information
Technoboy- authored Sep 1, 2023
1 parent 927d1b2 commit a1405ea
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
Expand Down Expand Up @@ -674,21 +673,7 @@ private boolean allowAutoUpdateSchema() {

@Override
public CompletableFuture<SchemaVersion> deleteSchema() {
String id = getSchemaId();
SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService();
return BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id))
.thenCompose(schema -> {
if (schema != null) {
// It's different from `SchemasResource.deleteSchema`
// because when we delete a topic, the schema
// history is meaningless. But when we delete a schema of a topic, a new schema could be
// registered in the future.
log.info("Delete schema storage of id: {}", id);
return schemaRegistryService.deleteSchemaStorage(id);
} else {
return CompletableFuture.completedFuture(null);
}
});
return brokerService.deleteSchema(TopicName.get(getName()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
Expand Down Expand Up @@ -159,6 +161,7 @@
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FieldParser;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -1156,26 +1159,33 @@ private CompletableFuture<Void> deleteTopicInternal(String topic, boolean forceD
CompletableFuture<Void> future = new CompletableFuture<>();
CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<>();
deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);

deleteTopicAuthenticationFuture.whenComplete((v, ex) -> {
deleteTopicAuthenticationFuture
.thenCompose(__ -> deleteSchema(tn))
.thenCompose(__ -> {
if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic)
&& getPulsar().getConfiguration().isSystemTopicEnabled()) {
return deleteTopicPolicies(tn);
}
return CompletableFuture.completedFuture(null);
}).whenComplete((v, ex) -> {
if (ex != null) {
future.completeExceptionally(ex);
return;
}
CompletableFuture<ManagedLedgerConfig> mlConfigFuture = getManagedLedgerConfig(topicName);
managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(),
mlConfigFuture, new DeleteLedgerCallback() {
@Override
public void deleteLedgerComplete(Object ctx) {
future.complete(null);
}
mlConfigFuture, new DeleteLedgerCallback() {
@Override
public void deleteLedgerComplete(Object ctx) {
future.complete(null);
}

@Override
public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null);
});
@Override
public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null);
});

return future;
}
Expand Down Expand Up @@ -3451,6 +3461,25 @@ public CompletableFuture<Void> deleteTopicPolicies(TopicName topicName) {
.deleteTopicPoliciesAsync(TopicName.get(topicName.getPartitionedTopicName()));
}

CompletableFuture<SchemaVersion> deleteSchema(TopicName topicName) {
String base = topicName.getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
SchemaRegistryService schemaRegistryService = getPulsar().getSchemaRegistryService();
return BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id))
.thenCompose(schema -> {
if (schema != null) {
// It's different from `SchemasResource.deleteSchema`
// because when we delete a topic, the schema
// history is meaningless. But when we delete a schema of a topic, a new schema could be
// registered in the future.
log.info("Delete schema storage of id: {}", id);
return getPulsar().getSchemaRegistryService().deleteSchemaStorage(id);
} else {
return CompletableFuture.completedFuture(null);
}
});
}

private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions) {
return pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesAsync(topicName.getNamespaceObject())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.fail;

import java.lang.reflect.Field;
import java.util.Map.Entry;
import java.util.NavigableMap;
Expand All @@ -31,10 +31,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Sets;
import lombok.Cleanup;

import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
Expand All @@ -47,6 +45,7 @@
import org.apache.bookkeeper.util.StringUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
Expand Down Expand Up @@ -497,10 +496,31 @@ public void testDeleteTopicWithMissingData() throws Exception {
// Expected
}

// Deletion must succeed
admin.topics().delete(topic);
assertThrows(PulsarAdminException.ServerSideErrorException.class, () -> admin.topics().delete(topic));
}

@Test
public void testDeleteTopicWithoutTopicLoaded() throws Exception {
String namespace = BrokerTestUtil.newUniqueName("prop/usc");
admin.namespaces().createNamespace(namespace);

String topic = BrokerTestUtil.newUniqueName(namespace + "/my-topic");

@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.statsInterval(0, TimeUnit.SECONDS)
.build();

// Topic will not be there after
@Cleanup
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(topic)
.create();

producer.close();
admin.topics().unload(topic);

admin.topics().delete(topic);
assertEquals(pulsar.getBrokerService().getTopicIfExists(topic).join(), Optional.empty());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.systopic;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import com.google.common.collect.Sets;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand All @@ -37,6 +39,7 @@
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
Expand All @@ -55,6 +58,7 @@
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -299,4 +303,25 @@ public void testSystemTopicNotCheckExceed() throws Exception {
writer1.get().close();
writer2.get().close();
}

@Test
public void testDeleteTopicSchemaAndPolicyWhenTopicIsNotLoaded() throws Exception {
final String ns = "prop/ns-test";
admin.namespaces().createNamespace(ns, 2);
final String topicName = "persistent://prop/ns-test/testDeleteTopicSchemaAndPolicyWhenTopicIsNotLoaded";
admin.topics().createNonPartitionedTopic(topicName);
pulsarClient.newProducer(Schema.STRING).topic(topicName).create().close();
admin.topicPolicies().setMaxConsumers(topicName, 2);
Awaitility.await().untilAsserted(() -> assertEquals(admin.topicPolicies().getMaxConsumers(topicName), 2));
CompletableFuture<Optional<Topic>> topic = pulsar.getBrokerService().getTopic(topicName, false);
PersistentTopic persistentTopic = (PersistentTopic) topic.join().get();
persistentTopic.close();
admin.topics().delete(topicName);
TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPoliciesIfExists(TopicName.get(topicName));
assertNull(topicPolicies);
String base = TopicName.get(topicName).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
CompletableFuture<SchemaRegistry.SchemaAndMetadata> schema = pulsar.getSchemaRegistryService().getSchema(id);
assertNull(schema.join());
}
}

0 comments on commit a1405ea

Please sign in to comment.