diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java index 98fda1550ee3..59482ad2c3d6 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java @@ -21,16 +21,20 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import com.google.cloud.AsyncPage; import com.google.cloud.Page; import com.google.cloud.pubsub.PubSub.MessageConsumer; import com.google.cloud.pubsub.PubSub.MessageProcessor; +import com.google.cloud.pubsub.PubSub.PullOption; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import io.grpc.Status; + import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -545,6 +549,42 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept assertTrue(pubsub().deleteTopic(topic)); } +@Test +public void testPullMessagesAsyncNotImmediately() + throws ExecutionException, InterruptedException { + String topic = formatForTest("test-pull-messages-not-immediately-topic"); + pubsub().create(TopicInfo.of(topic)); + String subscription = formatForTest("test-pull-messages-not-immediately-subscription"); + pubsub().create(SubscriptionInfo.of(topic, subscription)); + Future> future = + pubsub().pullAsync(subscription, 2, PullOption.returnImmediately(false)); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2)); + assertEquals(2, messageIds.size()); + Iterator iterator = future.get(); + assertEquals(message1.payloadAsString(), iterator.next().payloadAsString()); + assertEquals(message2.payloadAsString(), iterator.next().payloadAsString()); + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); +} + + @Test + public void testPullMessagesAsyncNotImmediately_NoMessages() + throws ExecutionException, InterruptedException { + String topic = formatForTest("test-pull-messages-not-immediately-topic"); + pubsub().create(TopicInfo.of(topic)); + String subscription = formatForTest("test-pull-messages-not-immediately-subscription"); + pubsub().create(SubscriptionInfo.of(topic, subscription)); + try { + pubsub().pullAsync(subscription, 2, PullOption.returnImmediately(false)).get(); + fail("Expected timeout exception"); + } catch (ExecutionException ex) { + PubSubException cause = (PubSubException) ex.getCause(); + assertEquals(Status.Code.DEADLINE_EXCEEDED.value(), cause.code()); + } + } + @Test public void testPullAsyncNonExistingSubscription() throws ExecutionException, InterruptedException {