From cd26340cd5ede132a82ec1c0973e5cd6449967d4 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Wed, 9 Oct 2024 13:11:28 -0700 Subject: [PATCH 1/2] [fix][broker] Allow broker to handle non-recoverable schema error only if SchemaLedgerForceRecovery flag is enabled --- .../java/org/apache/pulsar/broker/service/AbstractTopic.java | 3 ++- .../pulsar/broker/service/schema/ClientGetSchemaTest.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 76dd277159cf4..2dd4f71837aaf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -671,7 +671,8 @@ public CompletableFuture hasSchema() { return brokerService.pulsar().getSchemaRegistryService().getSchema(getSchemaId()).thenApply(Objects::nonNull) .exceptionally(e -> { Throwable ex = e.getCause(); - if (ex instanceof SchemaException || !((SchemaException) ex).isRecoverable()) { + if (brokerService.pulsar().getConfig().isSchemaLedgerForceRecovery() + && !((SchemaException) ex).isRecoverable()) { return false; } throw ex instanceof CompletionException ? (CompletionException) ex : new CompletionException(ex); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java index ec81f39fef92c..f9c1042b0e97e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java @@ -186,7 +186,7 @@ public void testAddProducerOnDeletedSchemaLedgerTopic() throws Exception { final String topicOne = "test-deleted-schema-ledger"; final String fqtnOne = TopicName.get(TopicDomain.persistent.value(), tenant, namespace, topicOne).toString(); - //pulsar.getConfig().setManagedLedgerForceRecovery(true); + pulsar.getConfig().setSchemaLedgerForceRecovery(true); admin.namespaces().createNamespace(tenant + "/" + namespace, Sets.newHashSet("test")); // (1) create topic with schema From d0b64de5cce68fc39b511f66b6983650ed89a91c Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Wed, 9 Oct 2024 22:34:18 -0700 Subject: [PATCH 2/2] fix type cast --- .../java/org/apache/pulsar/broker/service/AbstractTopic.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 2dd4f71837aaf..11f00fb28e34b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -672,7 +672,7 @@ public CompletableFuture hasSchema() { .exceptionally(e -> { Throwable ex = e.getCause(); if (brokerService.pulsar().getConfig().isSchemaLedgerForceRecovery() - && !((SchemaException) ex).isRecoverable()) { + && (ex instanceof SchemaException && !((SchemaException) ex).isRecoverable())) { return false; } throw ex instanceof CompletionException ? (CompletionException) ex : new CompletionException(ex);