Skip to content

Commit

Permalink
Add system tests for PullOption.returnImmediately(false)
Browse files Browse the repository at this point in the history
  • Loading branch information
mziccard committed Oct 3, 2016
1 parent 142caaf commit 30fd965
Showing 1 changed file with 40 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,20 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.cloud.AsyncPage;
import com.google.cloud.Page;
import com.google.cloud.pubsub.PubSub.MessageConsumer;
import com.google.cloud.pubsub.PubSub.MessageProcessor;
import com.google.cloud.pubsub.PubSub.PullOption;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import io.grpc.Status;

import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -545,6 +549,42 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept
assertTrue(pubsub().deleteTopic(topic));
}

@Test
public void testPullMessagesAsyncNotImmediately()
throws ExecutionException, InterruptedException {
String topic = formatForTest("test-pull-messages-not-immediately-topic");
pubsub().create(TopicInfo.of(topic));
String subscription = formatForTest("test-pull-messages-not-immediately-subscription");
pubsub().create(SubscriptionInfo.of(topic, subscription));
Future<Iterator<ReceivedMessage>> future =
pubsub().pullAsync(subscription, 2, PullOption.returnImmediately(false));
Message message1 = Message.of("payload1");
Message message2 = Message.of("payload2");
List<String> messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2));
assertEquals(2, messageIds.size());
Iterator<ReceivedMessage> iterator = future.get();
assertEquals(message1.payloadAsString(), iterator.next().payloadAsString());
assertEquals(message2.payloadAsString(), iterator.next().payloadAsString());
assertTrue(pubsub().deleteSubscription(subscription));
assertTrue(pubsub().deleteTopic(topic));
}

@Test
public void testPullMessagesAsyncNotImmediately_NoMessages()
throws ExecutionException, InterruptedException {
String topic = formatForTest("test-pull-messages-not-immediately-topic");
pubsub().create(TopicInfo.of(topic));
String subscription = formatForTest("test-pull-messages-not-immediately-subscription");
pubsub().create(SubscriptionInfo.of(topic, subscription));
try {
pubsub().pullAsync(subscription, 2, PullOption.returnImmediately(false)).get();
fail("Expected timeout exception");
} catch (ExecutionException ex) {
PubSubException cause = (PubSubException) ex.getCause();
assertEquals(Status.Code.DEADLINE_EXCEEDED.value(), cause.code());
}
}

@Test
public void testPullAsyncNonExistingSubscription()
throws ExecutionException, InterruptedException {
Expand Down

0 comments on commit 30fd965

Please sign in to comment.