From e8fddf062516a17d83b31b54c2a98840ae7e137a Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Thu, 3 Oct 2024 16:41:34 -0700 Subject: [PATCH] [fix][broker] Fix Broker was failing to create producer with broken schema ledger --- .../pulsar/broker/service/AbstractTopic.java | 13 +++++-- .../service/schema/ClientGetSchemaTest.java | 35 ++++++++++++++++++- 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index dce50a54db1f6..76dd277159cf4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -37,6 +37,7 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -64,6 +65,7 @@ import org.apache.pulsar.broker.service.plugin.EntryFilter; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; +import org.apache.pulsar.broker.service.schema.exceptions.SchemaException; import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.NamespaceName; @@ -666,9 +668,14 @@ protected String getSchemaId() { } @Override public CompletableFuture hasSchema() { - return brokerService.pulsar() - .getSchemaRegistryService() - .getSchema(getSchemaId()).thenApply(Objects::nonNull); + return brokerService.pulsar().getSchemaRegistryService().getSchema(getSchemaId()).thenApply(Objects::nonNull) + .exceptionally(e -> { + Throwable ex = e.getCause(); + if (ex instanceof SchemaException || !((SchemaException) ex).isRecoverable()) { + return false; + } + throw ex instanceof CompletionException ? (CompletionException) ex : new CompletionException(ex); + }); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java index 970e6b2712981..ec81f39fef92c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java @@ -20,8 +20,9 @@ import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; import static org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName; -import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; import java.util.ArrayList; import java.util.List; @@ -177,4 +178,36 @@ public void testSchemaFailure() throws Exception { producer.close(); consumer.close(); } + + @Test + public void testAddProducerOnDeletedSchemaLedgerTopic() throws Exception { + final String tenant = PUBLIC_TENANT; + final String namespace = "test-namespace-" + randomName(16); + final String topicOne = "test-deleted-schema-ledger"; + final String fqtnOne = TopicName.get(TopicDomain.persistent.value(), tenant, namespace, topicOne).toString(); + + //pulsar.getConfig().setManagedLedgerForceRecovery(true); + admin.namespaces().createNamespace(tenant + "/" + namespace, Sets.newHashSet("test")); + + // (1) create topic with schema + Producer producer = pulsarClient + .newProducer(Schema.AVRO(SchemaDefinition. builder().withAlwaysAllowNull(false) + .withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build())) + .topic(fqtnOne).create(); + + producer.close(); + + String key = TopicName.get(fqtnOne).getSchemaName(); + BookkeeperSchemaStorage schemaStrogate = (BookkeeperSchemaStorage) pulsar.getSchemaStorage(); + long schemaLedgerId = schemaStrogate.getSchemaLedgerList(key).get(0); + + // (2) break schema locator by deleting schema-ledger + schemaStrogate.getBookKeeper().deleteLedger(schemaLedgerId); + + admin.topics().unload(fqtnOne); + + Producer producerWihtoutSchema = pulsarClient.newProducer().topic(fqtnOne).create(); + + assertNotNull(producerWihtoutSchema); + } }