Skip to content

Commit

Permalink
[fix][client] Fix pattern consumer create crash if a part of partitio…
Browse files Browse the repository at this point in the history
…ns of a topic have been deleted (#22854)

(cherry picked from commit 9626e7e)
  • Loading branch information
poorbarcode committed Jul 11, 2024
1 parent 9658163 commit 79f0316
Show file tree
Hide file tree
Showing 17 changed files with 1,195 additions and 197 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@
package org.apache.pulsar.client.api;

import com.google.common.collect.Maps;
import static org.testng.Assert.fail;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

@Test(groups = "broker")
public class PatternMultiTopicsConsumerTest extends ProducerConsumerBase {
Expand Down Expand Up @@ -95,4 +98,38 @@ private void testWithConsumer(Consumer<byte[]> consumer) throws Exception {
consumer.close();
}

@Test(timeOut = 30000)
public void testFailedSubscribe() throws Exception {
final String topicName1 = BrokerTestUtil.newUniqueName("persistent://public/default/tp_test");
final String topicName2 = BrokerTestUtil.newUniqueName("persistent://public/default/tp_test");
final String topicName3 = BrokerTestUtil.newUniqueName("persistent://public/default/tp_test");
final String subName = "s1";
admin.topics().createPartitionedTopic(topicName1, 2);
admin.topics().createPartitionedTopic(topicName2, 3);
admin.topics().createNonPartitionedTopic(topicName3);

// Register a exclusive consumer to makes the pattern consumer failed to subscribe.
Consumer c1 = pulsarClient.newConsumer(Schema.STRING).topic(topicName3).subscriptionType(SubscriptionType.Exclusive)
.subscriptionName(subName).subscribe();

try {
PatternMultiTopicsConsumerImpl<String> consumer =
(PatternMultiTopicsConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topicsPattern("persistent://public/default/tp_test.*")
.subscriptionType(SubscriptionType.Failover)
.subscriptionName(subName)
.subscribe();
fail("Expected a consumer busy error.");
} catch (Exception ex) {
log.info("consumer busy", ex);
}

c1.close();
// Verify all internal consumer will be closed.
// If delete topic without "-f" work, it means the internal consumers were closed.
admin.topics().delete(topicName3);
admin.topics().deletePartitionedTopic(topicName2);
admin.topics().deletePartitionedTopic(topicName1);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
Expand Down Expand Up @@ -958,15 +959,17 @@ public void testAutoUnsubscribePatternConsumer() throws Exception {

// 6. remove producer 1,3; verify only consumer 2 left
// seems no direct way to verify auto-unsubscribe, because this patternConsumer also referenced the topic.
List<String> topicNames = Lists.newArrayList(topicName2);
String tp2p0 = TopicName.get(topicName2).getPartition(0).toString();
String tp2p1 = TopicName.get(topicName2).getPartition(1).toString();
List<String> topicNames = Lists.newArrayList(tp2p0, tp2p1);
NamespaceService nss = pulsar.getNamespaceService();
doReturn(CompletableFuture.completedFuture(topicNames)).when(nss)
.getListOfPersistentTopics(NamespaceName.get("my-property/my-ns"));

// 7. call recheckTopics to unsubscribe topic 1,3, verify topics number: 2=6-1-3
log.debug("recheck topics change");
PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
consumer1.run(consumer1.getRecheckPatternTimeout());
PatternConsumerUpdateQueue taskQueue = WhiteboxImpl.getInternalState(consumer, "updateTaskQueue");
taskQueue.appendRecheckOp();
Thread.sleep(100);
assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getPartitions().size(), 2);
assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().size(), 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.netty.util.Timeout;
import java.time.Duration;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
Expand Down Expand Up @@ -1305,7 +1306,6 @@ public void testPartitionsUpdatesForMultipleTopics() throws Exception {
Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 2);

admin.topics().updatePartitionedTopic(topicName0, 5);
consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());

Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 5);
Expand All @@ -1324,9 +1324,8 @@ public void testPartitionsUpdatesForMultipleTopics() throws Exception {
});

admin.topics().updatePartitionedTopic(topicName1, 5);
consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());

Awaitility.await().untilAsserted(() -> {
Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 10);
Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 10);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
import io.netty.util.Timeout;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -1209,5 +1210,10 @@ public boolean hasBatchReceiveTimeout() {
return batchReceiveTimeout != null;
}

@VisibleForTesting
CompletableFuture<Consumer<T>> getSubscribeFuture() {
return subscribeFuture;
}

private static final Logger log = LoggerFactory.getLogger(ConsumerBase.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,14 @@ public interface LookupService extends AutoCloseable {
InetSocketAddress resolveHost();

/**
* Returns all the topics name for a given namespace.
* Returns all the topics that matches {@param topicPattern} for a given namespace.
*
* Note: {@param topicPattern} it relate to the topic name(without the partition suffix). For example:
* - There is a partitioned topic "tp-a" with two partitions.
* - tp-a-partition-0
* - tp-a-partition-1
* - If {@param topicPattern} is "tp-a", the consumer will subscribe to the two partitions.
* - if {@param topicPattern} is "tp-a-partition-0", the consumer will subscribe nothing.
*
* @param namespace : namespace-name
* @return
Expand Down
Loading

0 comments on commit 79f0316

Please sign in to comment.