From a8882209475537fe730eefe26cffe0e4152d8433 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 11 Jul 2024 23:33:08 +0800 Subject: [PATCH] [fix][broker]Fix lookupService.getTopicsUnderNamespace can not work with a quote pattern (#23014) (cherry picked from commit 7c0e82739215fbae9e21270d4c70c9a52dd3e403) --- .../pulsar/broker/service/TopicGCTest.java | 6 +- .../impl/PatternTopicsConsumerImplTest.java | 56 ++++++++++++++++++ .../client/impl/TopicsConsumerImplTest.java | 6 +- .../pulsar/common/topics/TopicList.java | 20 +++++-- .../pulsar/common/topics/TopicListTest.java | 58 ++++++++++++++++++- 5 files changed, 135 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java index aabef91a7db64..5f9833e1d8165 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java @@ -38,8 +38,8 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ConsumerImpl; -import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl; +import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; @@ -197,8 +197,8 @@ public void testAppendCreateConsumerAfterOnePartGc(SubscribeTopicType subscribeT Message msg = consumer1.receive(2, TimeUnit.SECONDS); assertNotNull(msg, "Expected at least received 2 messages."); log.info("received msg[{}]: {}", i, msg.getValue()); - MessageIdImpl messageId = (MessageIdImpl) msg.getMessageId(); - if (messageId.getPartitionIndex() == 1) { + TopicMessageIdImpl messageId = (TopicMessageIdImpl) msg.getMessageId(); + if (messageId.getTopicPartitionName().equals(partition1)) { consumer1.acknowledgeAsync(msg); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index 51f0ae817c210..4147673c6e3d6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -27,6 +27,8 @@ import com.google.common.collect.Lists; import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -48,6 +50,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.api.proto.BaseCommand; +import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; @@ -1044,4 +1047,57 @@ public void testTopicDeletion() throws Exception { assertEquals(pulsar.getBrokerService().getTopicIfExists(baseTopicName + "-1").join(), Optional.empty()); assertTrue(pulsar.getBrokerService().getTopicIfExists(baseTopicName + "-2").join().isPresent()); } + + @Test(dataProvider = "partitioned") + public void testPatternQuote(boolean partitioned) throws Exception { + final NamespaceName namespace = NamespaceName.get("public/default"); + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final PulsarClientImpl client = (PulsarClientImpl) pulsarClient; + final LookupService lookup = client.getLookup(); + List expectedRes = new ArrayList<>(); + if (partitioned) { + admin.topics().createPartitionedTopic(topicName, 2); + expectedRes.add(TopicName.get(topicName).getPartition(0).toString()); + expectedRes.add(TopicName.get(topicName).getPartition(1).toString()); + Collections.sort(expectedRes); + } else { + admin.topics().createNonPartitionedTopic(topicName); + expectedRes.add(topicName); + } + + // Verify 1: "java.util.regex.Pattern.quote". + String pattern1 = java.util.regex.Pattern.quote(topicName); + List res1 = lookup.getTopicsUnderNamespace(namespace, CommandGetTopicsOfNamespace.Mode.PERSISTENT, + pattern1, null).join().getNonPartitionedOrPartitionTopics(); + Collections.sort(res1); + assertEquals(res1, expectedRes); + + // Verify 2: "com.google.re2j.Pattern.quote" + String pattern2 = com.google.re2j.Pattern.quote(topicName); + List res2 = lookup.getTopicsUnderNamespace(namespace, CommandGetTopicsOfNamespace.Mode.PERSISTENT, + pattern2, null).join().getNonPartitionedOrPartitionTopics(); + Collections.sort(res2); + assertEquals(res2, expectedRes); + + // Verify 3: "java.util.regex.Pattern.quote" & "^$" + String pattern3 = "^" + java.util.regex.Pattern.quote(topicName) + "$"; + List res3 = lookup.getTopicsUnderNamespace(namespace, CommandGetTopicsOfNamespace.Mode.PERSISTENT, + pattern3, null).join().getNonPartitionedOrPartitionTopics(); + Collections.sort(res3); + assertEquals(res3, expectedRes); + + // Verify 4: "com.google.re2j.Pattern.quote" & "^$" + String pattern4 = "^" + com.google.re2j.Pattern.quote(topicName) + "$"; + List res4 = lookup.getTopicsUnderNamespace(namespace, CommandGetTopicsOfNamespace.Mode.PERSISTENT, + pattern4, null).join().getNonPartitionedOrPartitionTopics(); + Collections.sort(res4); + assertEquals(res4, expectedRes); + + // cleanup. + if (partitioned) { + admin.topics().deletePartitionedTopic(topicName, false); + } else { + admin.topics().delete(topicName, false); + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index c88e429733f47..bf02232e0acf8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -49,6 +49,7 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicStats; import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -1269,19 +1270,20 @@ public void testAutoDiscoverMultiTopicsPartitions() throws Exception { .topicsPattern(topicName) .subscriptionName("sub-issue-9585") .subscribe(); + PatternConsumerUpdateQueue taskQueue = WhiteboxImpl.getInternalState(consumer, "updateTaskQueue"); Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 3); Assert.assertEquals(consumer.getConsumers().size(), 3); admin.topics().deletePartitionedTopic(topicName, true); - consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout()); + taskQueue.appendRecheckOp(); Awaitility.await().untilAsserted(() -> { Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 0); Assert.assertEquals(consumer.getConsumers().size(), 0); }); admin.topics().createPartitionedTopic(topicName, 7); - consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout()); + taskQueue.appendRecheckOp(); Awaitility.await().untilAsserted(() -> { Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 7); Assert.assertEquals(consumer.getConsumers().size(), 7); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java index b231f21e598a1..380582edbde38 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.common.topics; +import com.google.common.annotations.VisibleForTesting; import com.google.common.hash.Hashing; import java.nio.charset.StandardCharsets; import java.util.Collection; @@ -28,6 +29,7 @@ import java.util.stream.Collectors; import lombok.experimental.UtilityClass; import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; @UtilityClass @@ -82,15 +84,23 @@ public static Set minus(Collection list1, Collection lis return s1; } - private static String removeTopicDomainScheme(String originalRegexp) { + @VisibleForTesting + static String removeTopicDomainScheme(String originalRegexp) { if (!originalRegexp.toString().contains(SCHEME_SEPARATOR)) { return originalRegexp; } - String removedTopicDomain = SCHEME_SEPARATOR_PATTERN.split(originalRegexp.toString())[1]; - if (originalRegexp.contains("^")) { - return String.format("^%s", removedTopicDomain); + String[] parts = SCHEME_SEPARATOR_PATTERN.split(originalRegexp.toString()); + String prefix = parts[0]; + String removedTopicDomain = parts[1]; + if (prefix.equals(TopicDomain.persistent.value()) || prefix.equals(TopicDomain.non_persistent.value())) { + prefix = ""; + } else if (prefix.endsWith(TopicDomain.non_persistent.value())) { + prefix = prefix.substring(0, prefix.length() - TopicDomain.non_persistent.value().length()); + } else if (prefix.endsWith(TopicDomain.persistent.value())){ + prefix = prefix.substring(0, prefix.length() - TopicDomain.persistent.value().length()); } else { - return removedTopicDomain; + throw new IllegalArgumentException("Does not support topic domain: " + prefix); } + return String.format("%s%s", prefix, removedTopicDomain); } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java index 9c3b54a0f0d80..bb9e6a91279e3 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java @@ -30,6 +30,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; public class TopicListTest { @@ -107,5 +108,60 @@ public void testCalculateHash() { } - + @Test + public void testRemoveTopicDomainScheme() { + // persistent. + final String tpName1 = "persistent://public/default/tp"; + String res1 = TopicList.removeTopicDomainScheme(tpName1); + assertEquals(res1, "public/default/tp"); + + // non-persistent + final String tpName2 = "non-persistent://public/default/tp"; + String res2 = TopicList.removeTopicDomainScheme(tpName2); + assertEquals(res2, "public/default/tp"); + + // without topic domain. + final String tpName3 = "public/default/tp"; + String res3 = TopicList.removeTopicDomainScheme(tpName3); + assertEquals(res3, "public/default/tp"); + + // persistent & "java.util.regex.Pattern.quote". + final String tpName4 = java.util.regex.Pattern.quote(tpName1); + String res4 = TopicList.removeTopicDomainScheme(tpName4); + assertEquals(res4, java.util.regex.Pattern.quote("public/default/tp")); + + // persistent & "java.util.regex.Pattern.quote" & "^$". + final String tpName5 = "^" + java.util.regex.Pattern.quote(tpName1) + "$"; + String res5 = TopicList.removeTopicDomainScheme(tpName5); + assertEquals(res5, "^" + java.util.regex.Pattern.quote("public/default/tp") + "$"); + + // persistent & "com.google.re2j.Pattern.quote". + final String tpName6 = Pattern.quote(tpName1); + String res6 = TopicList.removeTopicDomainScheme(tpName6); + assertEquals(res6, Pattern.quote("public/default/tp")); + + // non-persistent & "java.util.regex.Pattern.quote". + final String tpName7 = java.util.regex.Pattern.quote(tpName2); + String res7 = TopicList.removeTopicDomainScheme(tpName7); + assertEquals(res7, java.util.regex.Pattern.quote("public/default/tp")); + + // non-persistent & "com.google.re2j.Pattern.quote". + final String tpName8 = Pattern.quote(tpName2); + String res8 = TopicList.removeTopicDomainScheme(tpName8); + assertEquals(res8, Pattern.quote("public/default/tp")); + + // non-persistent & "com.google.re2j.Pattern.quote" & "^$". + final String tpName9 = "^" + Pattern.quote(tpName2) + "$"; + String res9 = TopicList.removeTopicDomainScheme(tpName9); + assertEquals(res9, "^" + Pattern.quote("public/default/tp") + "$"); + + // wrong topic domain. + final String tpName10 = "xx://public/default/tp"; + try { + TopicList.removeTopicDomainScheme(tpName10); + fail("Does not support the topic domain xx"); + } catch (Exception ex) { + // expected error. + } + } }