From 217ebfbeaab9a33e648912bfae8ed47e9199d41a Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Fri, 20 Dec 2024 20:19:16 +0800 Subject: [PATCH] [fix][client] Fix enableRetry for consumers using legacy topic naming where cluster name is included (#23753) --- .../client/impl/TopicsConsumerImplTest.java | 18 ++++++++++++++++++ .../client/impl/ConsumerBuilderImpl.java | 8 ++++---- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index 1d5ac75962524..3c7cd16f14408 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -138,6 +138,24 @@ public void testDifferentTopicsNameSubscribe() throws Exception { } } + @Test(timeOut = testTimeout) + public void testRetryClusterTopic() throws Exception { + String key = "testRetryClusterTopic"; + final String topicName = "persistent://prop/use/ns-abc1/topic-1-" + key; + TenantInfoImpl tenantInfo = createDefaultTenantInfo(); + final String namespace = "prop/ns-abc1"; + admin.tenants().createTenant("prop", tenantInfo); + admin.namespaces().createNamespace(namespace, Set.of("test")); + Consumer consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName("my-sub") + .subscriptionType(SubscriptionType.Shared) + .enableRetry(true) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .subscribe(); + assertTrue(consumer instanceof MultiTopicsConsumerImpl); + } + @Test(timeOut = testTimeout) public void testGetConsumersAndGetTopics() throws Exception { String key = "TopicsConsumerGet"; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 351025d426a39..35f772028f17a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -154,10 +154,10 @@ public CompletableFuture> subscribeAsync() { if (conf.isRetryEnable() && conf.getTopicNames().size() > 0) { TopicName topicFirst = TopicName.get(conf.getTopicNames().iterator().next()); //Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed - String oldRetryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() - + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX; - String oldDeadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() - + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX; + String oldRetryLetterTopic = TopicName.get(topicFirst.getDomain().value(), topicFirst.getNamespaceObject(), + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX).toString(); + String oldDeadLetterTopic = TopicName.get(topicFirst.getDomain().value(), topicFirst.getNamespaceObject(), + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX).toString(); DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy(); if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic()) || StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {