Skip to content

Commit

Permalink
[fix][client] Fix enableRetry for consumers using legacy topic naming…
Browse files Browse the repository at this point in the history
… where cluster name is included (#23753)

(cherry picked from commit 217ebfb)
  • Loading branch information
crossoverJie authored and lhotari committed Dec 21, 2024
1 parent 6afb550 commit 80dc6ab
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,24 @@ public void testDifferentTopicsNameSubscribe() throws Exception {
}
}

@Test(timeOut = testTimeout)
public void testRetryClusterTopic() throws Exception {
String key = "testRetryClusterTopic";
final String topicName = "persistent://prop/use/ns-abc1/topic-1-" + key;
TenantInfoImpl tenantInfo = createDefaultTenantInfo();
final String namespace = "prop/ns-abc1";
admin.tenants().createTenant("prop", tenantInfo);
admin.namespaces().createNamespace(namespace, Set.of("test"));
Consumer consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.subscribe();
assertTrue(consumer instanceof MultiTopicsConsumerImpl);
}

@Test(timeOut = testTimeout)
public void testGetConsumersAndGetTopics() throws Exception {
String key = "TopicsConsumerGet";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ public CompletableFuture<Consumer<T>> subscribeAsync() {
if (conf.isRetryEnable() && conf.getTopicNames().size() > 0) {
TopicName topicFirst = TopicName.get(conf.getTopicNames().iterator().next());
//Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed
String oldRetryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName()
+ RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
String oldDeadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName()
+ RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
String oldRetryLetterTopic = TopicName.get(topicFirst.getDomain().value(), topicFirst.getNamespaceObject(),
conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX).toString();
String oldDeadLetterTopic = TopicName.get(topicFirst.getDomain().value(), topicFirst.getNamespaceObject(),
conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX).toString();
DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy();
if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())
|| StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {
Expand Down

0 comments on commit 80dc6ab

Please sign in to comment.