From 30fd965aa0f5be8af656eaaab2b2ca60e992d315 Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Mon, 3 Oct 2016 16:20:41 +0200 Subject: [PATCH] Add system tests for PullOption.returnImmediately(false) --- .../google/cloud/pubsub/BaseSystemTest.java | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java index 98fda1550ee3..59482ad2c3d6 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java @@ -21,16 +21,20 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import com.google.cloud.AsyncPage; import com.google.cloud.Page; import com.google.cloud.pubsub.PubSub.MessageConsumer; import com.google.cloud.pubsub.PubSub.MessageProcessor; +import com.google.cloud.pubsub.PubSub.PullOption; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import io.grpc.Status; + import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -545,6 +549,42 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept assertTrue(pubsub().deleteTopic(topic)); } +@Test +public void testPullMessagesAsyncNotImmediately() + throws ExecutionException, InterruptedException { + String topic = formatForTest("test-pull-messages-not-immediately-topic"); + pubsub().create(TopicInfo.of(topic)); + String subscription = formatForTest("test-pull-messages-not-immediately-subscription"); + pubsub().create(SubscriptionInfo.of(topic, subscription)); + Future> future = + pubsub().pullAsync(subscription, 2, PullOption.returnImmediately(false)); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2)); + assertEquals(2, messageIds.size()); + Iterator iterator = future.get(); + assertEquals(message1.payloadAsString(), iterator.next().payloadAsString()); + assertEquals(message2.payloadAsString(), iterator.next().payloadAsString()); + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); +} + + @Test + public void testPullMessagesAsyncNotImmediately_NoMessages() + throws ExecutionException, InterruptedException { + String topic = formatForTest("test-pull-messages-not-immediately-topic"); + pubsub().create(TopicInfo.of(topic)); + String subscription = formatForTest("test-pull-messages-not-immediately-subscription"); + pubsub().create(SubscriptionInfo.of(topic, subscription)); + try { + pubsub().pullAsync(subscription, 2, PullOption.returnImmediately(false)).get(); + fail("Expected timeout exception"); + } catch (ExecutionException ex) { + PubSubException cause = (PubSubException) ex.getCause(); + assertEquals(Status.Code.DEADLINE_EXCEEDED.value(), cause.code()); + } + } + @Test public void testPullAsyncNonExistingSubscription() throws ExecutionException, InterruptedException {