From c78723b54604f6312ca2009ac0ef5219ae6ba3b1 Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Thu, 10 Nov 2016 16:19:25 +0100 Subject: [PATCH] Use returnImmediately=false and disable timeouts for pullAsync --- .../java/com/google/cloud/pubsub/PubSub.java | 16 ++++++++------ .../com/google/cloud/pubsub/PubSubImpl.java | 21 ++++++++++++------- .../cloud/pubsub/spi/DefaultPubSubRpc.java | 17 ++++++++++++++- .../google/cloud/pubsub/BaseSystemTest.java | 18 ++++++++++++++++ .../google/cloud/pubsub/PubSubImplTest.java | 4 ++-- 5 files changed, 59 insertions(+), 17 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index 207700622fb4..3e45fce3eb11 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -702,9 +702,10 @@ interface MessageConsumer extends AutoCloseable { /** * Pulls messages from the provided subscription. This method possibly returns no messages if no * message was available at the time the request was processed by the Pub/Sub service (i.e. the - * system is not allowed to wait until at least one message is available). Pulled messages have - * their acknowledge deadline automatically renewed until they are explicitly consumed using - * {@link Iterator#next()}. + * system is not allowed to wait until at least one message is available - + * return_immediately + * option is set to {@code true}). Pulled messages have their acknowledge deadline automatically + * renewed until they are explicitly consumed using {@link Iterator#next()}. * *

Example of pulling a maximum number of messages from a subscription. *

 {@code
@@ -728,9 +729,12 @@ interface MessageConsumer extends AutoCloseable {
   /**
    * Sends a request for pulling messages from the provided subscription. This method returns a
    * {@code Future} object to consume the result. {@link Future#get()} returns a message iterator.
-   * This method possibly returns no messages if no message was available at the time the request
-   * was processed by the Pub/Sub service (i.e. the system is not allowed to wait until at least one
-   * message is available).
+   * When using this method the system is allowed to wait until at least one message is available
+   * rather than returning no messages (i.e.
+   * return_immediately
+   * option is set to {@code false}). The client may cancel the request by calling
+   * {@link Future#cancel(boolean)} if it does not wish to wait any longer. Notice that the Pub/Sub
+   * service might still return no messages if a timeout is reached on the service side.
    *
    * 

Example of asynchronously pulling a maximum number of messages from a subscription. *

 {@code
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java
index 494bb3ff43a1..8b878c1e93fd 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java
@@ -512,18 +512,13 @@ public Future> listSubscriptionsAsync(String topic,
     return listSubscriptionsAsync(topic, getOptions(), optionMap(options));
   }
 
-  @Override
-  public Iterator pull(String subscription, int maxMessages) {
-    return get(pullAsync(subscription, maxMessages));
-  }
-
-  @Override
-  public Future> pullAsync(final String subscription, int maxMessages) {
+  private Future> pullAsync(final String subscription,
+      int maxMessages, boolean returnImmediately) {
     PullRequest request = PullRequest.newBuilder().setReturnImmediately(true)
         .setSubscription(
             SubscriberApi.formatSubscriptionName(getOptions().getProjectId(), subscription))
         .setMaxMessages(maxMessages)
-        .setReturnImmediately(true)
+        .setReturnImmediately(returnImmediately)
         .build();
     PullFuture future = rpc.pull(request);
     future.addCallback(new PubSubRpc.PullCallback() {
@@ -555,6 +550,16 @@ public ReceivedMessage apply(com.google.pubsub.v1.ReceivedMessage receivedMessag
     });
   }
 
+  @Override
+  public Iterator pull(String subscription, int maxMessages) {
+    return get(pullAsync(subscription, maxMessages, true));
+  }
+
+  @Override
+  public Future> pullAsync(String subscription, int maxMessages) {
+    return pullAsync(subscription, maxMessages, false);
+  }
+
   @Override
   public MessageConsumer pullAsync(String subscription, MessageProcessor callback,
       PullOption... options) {
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java
index 76f321d0ebf9..bbdaab1bd1b2 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java
@@ -68,6 +68,8 @@
 import io.grpc.netty.NegotiationType;
 import io.grpc.netty.NettyChannelBuilder;
 
+import org.joda.time.Duration;
+
 import java.io.IOException;
 import java.util.Set;
 import java.util.concurrent.Future;
@@ -77,6 +79,7 @@ public class DefaultPubSubRpc implements PubSubRpc {
 
   private final PublisherApi publisherApi;
   private final SubscriberApi subscriberApi;
+  private final SubscriberApi noTimeoutSubscriberApi;
   private final ScheduledExecutorService executor;
   private final ProviderManager providerManager;
   private final ExecutorFactory executorFactory;
@@ -164,6 +167,12 @@ public DefaultPubSubRpc(PubSubOptions options) throws IOException {
           .applyToAllApiMethods(callSettingsBuilder);
       publisherApi = PublisherApi.create(pubBuilder.build());
       subscriberApi = SubscriberApi.create(subBuilder.build());
+      callSettingsBuilder.setRetrySettingsBuilder(callSettingsBuilder.getRetrySettingsBuilder()
+          .setTotalTimeout(Duration.millis(Long.MAX_VALUE))
+          .setInitialRpcTimeout(Duration.millis(Long.MAX_VALUE))
+          .setMaxRpcTimeout(Duration.millis(Long.MAX_VALUE)));
+      subBuilder.applyToAllApiMethods(callSettingsBuilder);
+      noTimeoutSubscriberApi = SubscriberApi.create(subBuilder.build());
     } catch (Exception ex) {
       throw new IOException(ex);
     }
@@ -256,9 +265,14 @@ public Future acknowledge(AcknowledgeRequest request) {
     return translate(subscriberApi.acknowledgeCallable().futureCall(request), false);
   }
 
+  private static PullFuture pull(SubscriberApi subscriberApi, PullRequest request) {
+    return new PullFutureImpl(translate(subscriberApi.pullCallable().futureCall(request), false));
+  }
+
   @Override
   public PullFuture pull(PullRequest request) {
-    return new PullFutureImpl(translate(subscriberApi.pullCallable().futureCall(request), false));
+    return request.getReturnImmediately()
+        ? pull(subscriberApi, request) : pull(noTimeoutSubscriberApi, request);
   }
 
   @Override
@@ -290,6 +304,7 @@ public void close() throws Exception {
     }
     closed = true;
     subscriberApi.close();
+    noTimeoutSubscriberApi.close();
     publisherApi.close();
     providerManager.getChannel().shutdown();
     executorFactory.release(executor);
diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java
index b30d7a0f8c7b..d9524af5925e 100644
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java
+++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java
@@ -549,6 +549,24 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept
     assertTrue(pubsub().deleteTopic(topic));
   }
 
+  @Test
+  public void testPullMessagesAsyncNonImmediately() throws ExecutionException, InterruptedException {
+    String topic = formatForTest("test-pull-messages-async-non-immediately-topic");
+    pubsub().create(TopicInfo.of(topic));
+    String subscription = formatForTest("test-pull-messages-async-subscription");
+    pubsub().create(SubscriptionInfo.of(topic, subscription));
+    Future> future = pubsub().pullAsync(subscription, 2);
+    Message message1 = Message.of("payload1");
+    Message message2 = Message.of("payload2");
+    List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2));
+    assertEquals(2, messageIds.size());
+    Iterator iterator = future.get();
+    assertEquals(message1.getPayloadAsString(), iterator.next().getPayloadAsString());
+    assertEquals(message2.getPayloadAsString(), iterator.next().getPayloadAsString());
+    assertTrue(pubsub().deleteSubscription(subscription));
+    assertTrue(pubsub().deleteTopic(topic));
+  }
+
   @Test
   public void testPullAsyncNonExistingSubscription()
       throws ExecutionException, InterruptedException {
diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java
index 8b5c32bbceae..b3fa6d5178fd 100644
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java
+++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java
@@ -1302,7 +1302,7 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept
     PullRequest request = PullRequest.newBuilder()
         .setSubscription(SUBSCRIPTION_NAME_PB)
         .setMaxMessages(42)
-        .setReturnImmediately(true)
+        .setReturnImmediately(false)
         .build();
     List messageList = ImmutableList.of(
         ReceivedMessage.fromPb(pubsub, SUBSCRIPTION, MESSAGE_PB1),
@@ -1363,7 +1363,7 @@ public void testPullMessagesAsyncError() throws ExecutionException, InterruptedE
     PullRequest request = PullRequest.newBuilder()
         .setSubscription(SUBSCRIPTION_NAME_PB)
         .setMaxMessages(42)
-        .setReturnImmediately(true)
+        .setReturnImmediately(false)
         .build();
     PubSubException exception = new PubSubException(new IOException(), false);
     PullFuture futureMock = EasyMock.createStrictMock(PullFuture.class);