Skip to content

Commit

Permalink
add subscribe and unsubscribe for one topic, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jiazhai committed Jan 31, 2018
1 parent a34ef4d commit 43242c7
Show file tree
Hide file tree
Showing 4 changed files with 489 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Lists;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
Expand All @@ -48,6 +49,8 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.google.common.collect.Lists;

public class TopicsConsumerImplTest extends ProducerConsumerBase {
private static final long testTimeout = 90000; // 1.5 min
private static final Logger log = LoggerFactory.getLogger(TopicsConsumerImplTest.class);
Expand All @@ -65,6 +68,35 @@ public void cleanup() throws Exception {
super.internalCleanup();
}

// Verify subscribe topics from different namespace should return error.
@Test(timeOut = testTimeout)
public void testDifferentTopicsNameSubscribe() throws Exception {
String key = "TopicsFromDifferentNamespace";
final String subscriptionName = "my-ex-subscription-" + key;

final String topicName1 = "persistent://prop/use/ns-abc1/topic-1-" + key;
final String topicName2 = "persistent://prop/use/ns-abc2/topic-2-" + key;
final String topicName3 = "persistent://prop/use/ns-abc3/topic-3-" + key;
List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);

admin.properties().createProperty("prop", new PropertyAdmin());
admin.persistentTopics().createPartitionedTopic(topicName2, 2);
admin.persistentTopics().createPartitionedTopic(topicName3, 3);

// 2. Create consumer
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setReceiverQueueSize(4);
conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
conf.setSubscriptionType(SubscriptionType.Shared);
try {
Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get();
fail("subscribe for topics from different namespace should fail.");
} catch (IllegalArgumentException e) {
// expected for have different namespace
}
}


@Test(timeOut = testTimeout)
public void testGetConsumersAndGetTopics() throws Exception {
String key = "TopicsConsumerGet";
Expand Down Expand Up @@ -97,6 +129,9 @@ public void testGetConsumersAndGetTopics() throws Exception {
assertTrue(topics.get(index).equals(consumers.get(index).getTopic())));

assertTrue(((TopicsConsumerImpl) consumer).getTopics().size() == 3);

consumer.unsubscribe();
consumer.close();
}

@Test(timeOut = testTimeout)
Expand Down Expand Up @@ -138,16 +173,18 @@ public void testSyncProducerAndConsumer() throws Exception {
producer3.send((messagePredicate + "producer3-" + i).getBytes());
}

HashSet<String> messageSet = new HashSet<>();
int messageSet = 0;
Message message = consumer.receive();
do {
assertTrue(message instanceof TopicMessageImpl);
messageSet.add(new String(message.getData()));
messageSet ++;
consumer.acknowledge(message);
log.info("Consumer acknowledged : " + new String(message.getData()));
log.debug("Consumer acknowledged : " + new String(message.getData()));
message = consumer.receive(500, TimeUnit.MILLISECONDS);
} while (message != null);
assertEquals(messageSet.size(), totalMessages);
assertEquals(messageSet, totalMessages);

consumer.unsubscribe();
consumer.close();
producer1.close();
producer2.close();
Expand Down Expand Up @@ -219,9 +256,10 @@ public void testAsyncConsumer() throws Exception {
return null;
})));

log.info("start wait");
latch.await();
log.info("success latch wait");

consumer.unsubscribe();
consumer.close();
producer1.close();
producer2.close();
Expand Down Expand Up @@ -271,28 +309,26 @@ public void testConsumerUnackedRedelivery() throws Exception {
Message message = consumer.receive();
while (message != null) {
assertTrue(message instanceof TopicMessageImpl);
log.info("Consumer received : " + new String(message.getData()));
log.debug("Consumer received : " + new String(message.getData()));
message = consumer.receive(500, TimeUnit.MILLISECONDS);
}
long size = ((TopicsConsumerImpl) consumer).getUnAckedMessageTracker().size();
log.info(key + " Unacked Message Tracker size is " + size);
log.debug(key + " Unacked Message Tracker size is " + size);
assertEquals(size, totalMessages);

// 5. Blocking call, redeliver should kick in, after receive and ack, Unacked Message Tracker size should be 0.
message = consumer.receive();
log.info("Consumer received : " + new String(message.getData()));

HashSet<String> hSet = new HashSet<>();
do {
assertTrue(message instanceof TopicMessageImpl);
hSet.add(new String(message.getData()));
consumer.acknowledge(message);
log.info("Consumer acknowledged : " + new String(message.getData()));
log.debug("Consumer acknowledged : " + new String(message.getData()));
message = consumer.receive(500, TimeUnit.MILLISECONDS);
} while (message != null);

size = ((TopicsConsumerImpl) consumer).getUnAckedMessageTracker().size();
log.info(key + " Unacked Message Tracker size is " + size);
log.debug(key + " Unacked Message Tracker size is " + size);
assertEquals(size, 0);
assertEquals(hSet.size(), totalMessages);

Expand All @@ -310,12 +346,12 @@ public void testConsumerUnackedRedelivery() throws Exception {
assertTrue(message instanceof TopicMessageImpl);
received++;
String data = new String(message.getData());
log.info("Consumer received : " + data);
log.debug("Consumer received : " + data);
consumer.acknowledge(message);
message = consumer.receive(100, TimeUnit.MILLISECONDS);
}
size = ((TopicsConsumerImpl) consumer).getUnAckedMessageTracker().size();
log.info(key + " Unacked Message Tracker size is " + size);
log.debug(key + " Unacked Message Tracker size is " + size);
assertEquals(size, 0);
assertEquals(received, totalMessages);

Expand All @@ -334,11 +370,11 @@ public void testConsumerUnackedRedelivery() throws Exception {
message = consumer.receive();
while (message != null) {
String data = new String(message.getData());
log.info("Consumer received : " + data);
log.debug("Consumer received : " + data);
message = consumer.receive(100, TimeUnit.MILLISECONDS);
}
size = ((TopicsConsumerImpl) consumer).getUnAckedMessageTracker().size();
log.info(key + " Unacked Message Tracker size is " + size);
log.debug(key + " Unacked Message Tracker size is " + size);
assertEquals(size, 30);

Thread.sleep(ackTimeOutMillis);
Expand All @@ -350,7 +386,7 @@ public void testConsumerUnackedRedelivery() throws Exception {
assertTrue(message instanceof TopicMessageImpl);
redelivered++;
String data = new String(message.getData());
log.info("Consumer received : " + data);
log.debug("Consumer received : " + data);
consumer.acknowledge(message);
message = consumer.receive(100, TimeUnit.MILLISECONDS);
}
Expand All @@ -359,6 +395,126 @@ public void testConsumerUnackedRedelivery() throws Exception {
log.info(key + " Unacked Message Tracker size is " + size);
assertEquals(size, 0);

consumer.unsubscribe();
consumer.close();
producer1.close();
producer2.close();
producer3.close();
}

@Test
public void testSubscribeUnsubscribeSingleTopic() throws Exception {
String key = "TopicsConsumerSubscribeUnsubscribeSingleTopicTest";
final String subscriptionName = "my-ex-subscription-" + key;
final String messagePredicate = "my-message-" + key + "-";
final int totalMessages = 30;

final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);

admin.properties().createProperty("prop", new PropertyAdmin());
admin.persistentTopics().createPartitionedTopic(topicName2, 2);
admin.persistentTopics().createPartitionedTopic(topicName3, 3);

ProducerConfiguration producerConfiguration = new ProducerConfiguration();
producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);

// 1. producer connect
Producer producer1 = pulsarClient.createProducer(topicName1);
Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration);
Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration);

// 2. Create consumer
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setReceiverQueueSize(4);
conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
conf.setSubscriptionType(SubscriptionType.Shared);
Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get();
assertTrue(consumer instanceof TopicsConsumerImpl);

// 3. producer publish messages
for (int i = 0; i < totalMessages / 3; i++) {
producer1.send((messagePredicate + "producer1-" + i).getBytes());
producer2.send((messagePredicate + "producer2-" + i).getBytes());
producer3.send((messagePredicate + "producer3-" + i).getBytes());
}

int messageSet = 0;
Message message = consumer.receive();
do {
assertTrue(message instanceof TopicMessageImpl);
messageSet ++;
consumer.acknowledge(message);
log.debug("Consumer acknowledged : " + new String(message.getData()));
message = consumer.receive(500, TimeUnit.MILLISECONDS);
} while (message != null);
assertEquals(messageSet, totalMessages);

// 4, unsubscribe topic3
CompletableFuture<Void> unsubFuture = ((TopicsConsumerImpl)consumer).unsubscribeAsync(topicName3);
unsubFuture.get();

// 5. producer publish messages
for (int i = 0; i < totalMessages / 3; i++) {
producer1.send((messagePredicate + "producer1-round2" + i).getBytes());
producer2.send((messagePredicate + "producer2-round2" + i).getBytes());
producer3.send((messagePredicate + "producer3-round2" + i).getBytes());
}

// 6. should not receive messages from topic3, verify get 2/3 of all messages
messageSet = 0;
message = consumer.receive();
do {
assertTrue(message instanceof TopicMessageImpl);
messageSet ++;
consumer.acknowledge(message);
log.debug("Consumer acknowledged : " + new String(message.getData()));
message = consumer.receive(500, TimeUnit.MILLISECONDS);
} while (message != null);
assertEquals(messageSet, totalMessages * 2 / 3);

// 7. use getter to verify internal topics number after un-subscribe topic3
List<String> topics = ((TopicsConsumerImpl) consumer).getPartitionedTopics();
List<ConsumerImpl> consumers = ((TopicsConsumerImpl) consumer).getConsumers();

assertEquals(topics.size(), 3);
assertEquals(consumers.size(), 3);
assertTrue(((TopicsConsumerImpl) consumer).getTopics().size() == 2);

// 8. re-subscribe topic3
CompletableFuture<Void> subFuture = ((TopicsConsumerImpl)consumer).subscribeAsync(topicName3);
subFuture.get();

// 9. producer publish messages
for (int i = 0; i < totalMessages / 3; i++) {
producer1.send((messagePredicate + "producer1-round3" + i).getBytes());
producer2.send((messagePredicate + "producer2-round3" + i).getBytes());
producer3.send((messagePredicate + "producer3-round3" + i).getBytes());
}

// 10. should receive messages from all 3 topics
messageSet = 0;
message = consumer.receive();
do {
assertTrue(message instanceof TopicMessageImpl);
messageSet ++;
consumer.acknowledge(message);
log.debug("Consumer acknowledged : " + new String(message.getData()));
message = consumer.receive(500, TimeUnit.MILLISECONDS);
} while (message != null);
assertEquals(messageSet, totalMessages);

// 11. use getter to verify internal topics number after subscribe topic3
topics = ((TopicsConsumerImpl) consumer).getPartitionedTopics();
consumers = ((TopicsConsumerImpl) consumer).getConsumers();

assertEquals(topics.size(), 6);
assertEquals(consumers.size(), 6);
assertTrue(((TopicsConsumerImpl) consumer).getTopics().size() == 3);

consumer.unsubscribe();
consumer.close();
producer1.close();
producer2.close();
Expand Down
Loading

0 comments on commit 43242c7

Please sign in to comment.