From c61868b8f58f93160af6182e768353d126dde2c5 Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Mon, 3 Oct 2016 16:01:27 +0200 Subject: [PATCH] Add returnImmediately option to PubSub's pullAsync --- .../pubsub/snippets/PubSubSnippets.java | 7 +- .../pubsub/snippets/SubscriptionSnippets.java | 4 +- .../java/com/google/cloud/pubsub/PubSub.java | 106 +++++++++++++----- .../com/google/cloud/pubsub/PubSubImpl.java | 21 ++-- .../com/google/cloud/pubsub/Subscription.java | 23 ++-- .../google/cloud/pubsub/SubscriptionInfo.java | 11 +- .../google/cloud/pubsub/PubSubImplTest.java | 44 +++++++- .../com/google/cloud/pubsub/PubSubTest.java | 22 +++- .../cloud/pubsub/SerializationTest.java | 12 +- .../google/cloud/pubsub/SubscriptionTest.java | 23 +++- 10 files changed, 199 insertions(+), 74 deletions(-) diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/PubSubSnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/PubSubSnippets.java index d03cbe8347c8..155e71871fce 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/PubSubSnippets.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/PubSubSnippets.java @@ -533,7 +533,7 @@ public void pull(String subscriptionName) { /** * Example of asynchronously pulling a maximum number of messages from a subscription. */ - // [TARGET pullAsync(String, int)] + // [TARGET pullAsync(String, int, PullOption...)] // [VARIABLE "my_subscription_name"] public void pullAsync(String subscriptionName) throws ExecutionException, InterruptedException { // [START pullAsync] @@ -552,7 +552,7 @@ public void pullAsync(String subscriptionName) throws ExecutionException, Interr /** * Example of continuously pulling messages from a subscription. */ - // [TARGET pullAsync(String, MessageProcessor, PullOption...)] + // [TARGET pullAsync(String, MessageProcessor, MessageConsumerOption...)] // [VARIABLE "my_subscription_name"] public void pullWithMessageConsumer(String subscriptionName) throws Exception { // [START pullWithMessageConsumer] @@ -929,7 +929,8 @@ public List testTopicPermissions(String topicName) { } /** - * Example of asynchronously testing whether the caller has the provided permissions on a topic. + * Example of asynchronously testing whether the caller has the provided permissions on a + * topic. */ // [TARGET testTopicPermissionsAsync(String, List)] // [VARIABLE "my_topic_name"] diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriptionSnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriptionSnippets.java index 3e0ac0189fbb..7ce381ab875e 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriptionSnippets.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriptionSnippets.java @@ -185,7 +185,7 @@ public void pull() { /** * Example of asynchronously pulling a maximum number of messages from the subscription. */ - // [TARGET pullAsync(int)] + // [TARGET pullAsync(int, PullOption...)] public void pullAsync() throws ExecutionException, InterruptedException { // [START pullAsync] Future> future = subscription.pullAsync(100); @@ -203,7 +203,7 @@ public void pullAsync() throws ExecutionException, InterruptedException { /** * Example of continuously pulling messages from the subscription. */ - // [TARGET pullAsync(MessageProcessor, PullOption...)] + // [TARGET pullAsync(MessageProcessor, MessageConsumerOption...)] // [VARIABLE "my_subscription_name"] public void pullWithMessageConsumer(String subscriptionName) throws Exception { // [START pullWithMessageConsumer] diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index 207700622fb4..cfa154776428 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -84,6 +84,40 @@ public static ListOption pageToken(String pageToken) { */ final class PullOption extends Option { + private static final long serialVersionUID = 8335266553263173237L; + + enum OptionType implements Option.OptionType { + RETURN_IMMEDIATELY; + + @SuppressWarnings("unchecked") + T get(Map options) { + return (T) options.get(this); + } + + Boolean getBoolean(Map options) { + return get(options); + } + } + + private PullOption(Option.OptionType option, Object value) { + super(option, value); + } + + /** + * Returns an option to specify whether the pull operation should always return immediately or + * should rather wait until at least one message is available for pulling. If not provided, + * pull operations return immediatelly. + */ + public static PullOption returnImmediately(boolean returnImmediately) { + return new PullOption(OptionType.RETURN_IMMEDIATELY, returnImmediately); + } + } + + /** + * Class for specifying options for pulling messages with a {@link MessageConsumer}. + */ + final class MessageConsumerOption extends Option { + private static final long serialVersionUID = 4792164134340316582L; enum OptionType implements Option.OptionType { @@ -104,7 +138,7 @@ ExecutorFactory getExecutorFactory(Map op } } - private PullOption(Option.OptionType option, Object value) { + private MessageConsumerOption(Option.OptionType option, Object value) { super(option, value); } @@ -115,8 +149,8 @@ private PullOption(Option.OptionType option, Object value) { * until they are acknowledged or "nacked". If not provided, at most 100 messages can be in the * queue. */ - public static PullOption maxQueuedCallbacks(int maxQueuedCallbacks) { - return new PullOption(OptionType.MAX_QUEUED_CALLBACKS, maxQueuedCallbacks); + public static MessageConsumerOption maxQueuedCallbacks(int maxQueuedCallbacks) { + return new MessageConsumerOption(OptionType.MAX_QUEUED_CALLBACKS, maxQueuedCallbacks); } /** @@ -134,8 +168,8 @@ public static PullOption maxQueuedCallbacks(int maxQueuedCallbacks) { * * @param executorFactory the executor factory. */ - public static PullOption executorFactory(ExecutorFactory executorFactory) { - return new PullOption(OptionType.EXECUTOR_FACTORY, executorFactory); + public static MessageConsumerOption executorFactory(ExecutorFactory executorFactory) { + return new MessageConsumerOption(OptionType.EXECUTOR_FACTORY, executorFactory); } } @@ -728,9 +762,13 @@ interface MessageConsumer extends AutoCloseable { /** * Sends a request for pulling messages from the provided subscription. This method returns a * {@code Future} object to consume the result. {@link Future#get()} returns a message iterator. - * This method possibly returns no messages if no message was available at the time the request + * If {@link PullOption#returnImmediately(boolean)} is not provided or set to {@code true}, this + * method possibly returns no messages if no message was available at the time the request * was processed by the Pub/Sub service (i.e. the system is not allowed to wait until at least one - * message is available). + * message is available). If {@link PullOption#returnImmediately(boolean)} is set to {@code false} + * the pull operation waits until at least one message is available or + * {@link PubSubOptions#initialTimeout()} is reached. If {@link PubSubOptions#initialTimeout()} is + * exceeded, {@link Future#get()} throws a {@link java.util.concurrent.ExecutionException}. * *

Example of asynchronously pulling a maximum number of messages from a subscription. *

 {@code
@@ -749,9 +787,10 @@ interface MessageConsumer extends AutoCloseable {
    * @param subscription the subscription from which to pull messages
    * @param maxMessages the maximum number of messages pulled by this method. This method can
    *     possibly return fewer messages.
-   * @throws PubSubException upon failure
+   * @param options pulling options
    */
-  Future> pullAsync(String subscription, int maxMessages);
+  Future> pullAsync(String subscription, int maxMessages,
+      PullOption... options);
 
   /**
    * Creates a message consumer that pulls messages from the provided subscription. You can stop
@@ -762,10 +801,10 @@ interface MessageConsumer extends AutoCloseable {
    * all pulled messages, the ack deadline is automatically renewed until the message is either
    * acknowledged or "nacked".
    *
-   * 

The {@link PullOption#maxQueuedCallbacks(int)} option can be used to control the maximum - * number of queued messages (messages either being processed or waiting to be processed). The - * {@link PullOption#executorFactory(ExecutorFactory)} can be used to provide an executor to run - * message processor callbacks. + *

The {@link MessageConsumerOption#maxQueuedCallbacks(int)} option can be used to control the + * maximum number of queued messages (messages either being processed or waiting to be processed). + * The {@link MessageConsumerOption#executorFactory(ExecutorFactory)} can be used to provide an + * executor to run message processor callbacks. * *

Example of continuously pulling messages from a subscription. *

 {@code
@@ -788,12 +827,13 @@ interface MessageConsumer extends AutoCloseable {
    * @param options pulling options
    * @return a message consumer for the provided subscription and options
    */
-  MessageConsumer pullAsync(String subscription, MessageProcessor callback, PullOption... options);
+  MessageConsumer pullAsync(String subscription, MessageProcessor callback,
+      MessageConsumerOption... options);
 
   /**
    * Acknowledges the given messages for the provided subscription. Ack ids identify the messages to
    * acknowledge, as returned in {@link ReceivedMessage#ackId()} by {@link #pull(String, int)} and
-   * {@link #pullAsync(String, int)}.
+   * {@link #pullAsync(String, int, PullOption...)}.
    *
    * 

Example of acking one message. *

 {@code
@@ -820,8 +860,9 @@ interface MessageConsumer extends AutoCloseable {
   /**
    * Sends a request to acknowledge the given messages for the provided subscription. Ack ids
    * identify the messages to acknowledge, as returned in {@link ReceivedMessage#ackId()} by
-   * {@link #pull(String, int)} and {@link #pullAsync(String, int)}. The method returns a
-   * {@code Future} object that can be used to wait for the acknowledge operation to be completed.
+   * {@link #pull(String, int)} and {@link #pullAsync(String, int, PullOption...)}. The method
+   * returns a {@code Future} object that can be used to wait for the acknowledge operation to be
+   * completed.
    *
    * 

Example of asynchronously acking one message. *

 {@code
@@ -851,7 +892,7 @@ interface MessageConsumer extends AutoCloseable {
   /**
    * Acknowledges the given messages for the provided subscription. Ack ids identify the messages to
    * acknowledge, as returned in {@link ReceivedMessage#ackId()} by {@link #pull(String, int)} and
-   * {@link #pullAsync(String, int)}.
+   * {@link #pullAsync(String, int, PullOption...)}.
    *
    * 

Example of acking a list of messages. *

 {@code
@@ -873,8 +914,9 @@ interface MessageConsumer extends AutoCloseable {
   /**
    * Sends a request to acknowledge the given messages for the provided subscription. Ack ids
    * identify the messages to acknowledge, as returned in {@link ReceivedMessage#ackId()} by
-   * {@link #pull(String, int)} and {@link #pullAsync(String, int)}. The method returns a
-   * {@code Future} object that can be used to wait for the acknowledge operation to be completed.
+   * {@link #pull(String, int)} and {@link #pullAsync(String, int, PullOption...)}. The method
+   * returns a {@code Future} object that can be used to wait for the acknowledge operation to be
+   * completed.
    *
    * 

Example of asynchronously acking a list of messages. *

 {@code
@@ -897,7 +939,7 @@ interface MessageConsumer extends AutoCloseable {
   /**
    * "Nacks" the given messages for the provided subscription. Ack ids identify the messages to
    * "nack", as returned in {@link ReceivedMessage#ackId()} by {@link #pull(String, int)} and
-   * {@link #pullAsync(String, int)}. This method corresponds to calling
+   * {@link #pullAsync(String, int, PullOption...)}. This method corresponds to calling
    * {@link #modifyAckDeadline(String, int, TimeUnit, String, String...)} with a deadline of 0.
    *
    * 

Example of nacking one message. @@ -925,10 +967,11 @@ interface MessageConsumer extends AutoCloseable { /** * Sends a request to "nack" the given messages for the provided subscription. Ack ids identify * the messages to "nack", as returned in {@link ReceivedMessage#ackId()} by - * {@link #pull(String, int)} and {@link #pullAsync(String, int)}. This method corresponds to - * calling {@link #modifyAckDeadlineAsync(String, int, TimeUnit, String, String...)} with a - * deadline of 0. The method returns a {@code Future} object that can be used to wait for the - * "nack" operation to be completed. + * {@link #pull(String, int)} and {@link #pullAsync(String, int, PullOption...)}. This method + * corresponds to calling + * {@link #modifyAckDeadlineAsync(String, int, TimeUnit, String, String...)} with a deadline of 0. + * The method returns a {@code Future} object that can be used to wait for the "nack" operation to + * be completed. * *

Example of asynchronously nacking one message. *

 {@code
@@ -958,7 +1001,7 @@ interface MessageConsumer extends AutoCloseable {
   /**
    * "Nacks" the given messages for the provided subscription. Ack ids identify the messages to
    * "nack", as returned in {@link ReceivedMessage#ackId()} by {@link #pull(String, int)} and
-   * {@link #pullAsync(String, int)}. This method corresponds to calling
+   * {@link #pullAsync(String, int, PullOption...)}. This method corresponds to calling
    * {@link #modifyAckDeadline(String, int, TimeUnit, Iterable)} with a deadline of 0.
    *
    * 

Example of nacking a list of messages. @@ -981,10 +1024,10 @@ interface MessageConsumer extends AutoCloseable { /** * Sends a request to "nack" the given messages for the provided subscription. Ack ids identify * the messages to "nack", as returned in {@link ReceivedMessage#ackId()} by - * {@link #pull(String, int)} and {@link #pullAsync(String, int)}. This method corresponds to - * calling {@link #modifyAckDeadlineAsync(String, int, TimeUnit, Iterable)} with a deadline of 0. - * The method returns a {@code Future} object that can be used to wait for the "nack" operation to - * be completed. + * {@link #pull(String, int)} and {@link #pullAsync(String, int, PullOption...)}. This method + * corresponds to calling {@link #modifyAckDeadlineAsync(String, int, TimeUnit, Iterable)} with a + * deadline of 0. The method returns a {@code Future} object that can be used to wait for the + * "nack" operation to be completed. * *

Example of asynchronously nacking a list of messages. *

 {@code
@@ -1269,7 +1312,8 @@ Future modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit
    * such as a customized graphical user interface. For example, the Cloud Platform Console tests
    * IAM permissions internally to determine which UI should be available to the logged-in user.
    *
-   * 

Example of asynchronously testing whether the caller has the provided permissions on a topic. + *

Example of asynchronously testing whether the caller has the provided permissions on a + * topic. *

 {@code
    * String topicName = "my_topic_name";
    * List permissions = new LinkedList<>();
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java
index d830290eb732..2982f0f5636a 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java
@@ -18,8 +18,9 @@
 
 import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_SIZE;
 import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_TOKEN;
-import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.EXECUTOR_FACTORY;
-import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.MAX_QUEUED_CALLBACKS;
+import static com.google.cloud.pubsub.PubSub.MessageConsumerOption.OptionType.EXECUTOR_FACTORY;
+import static com.google.cloud.pubsub.PubSub.MessageConsumerOption.OptionType.MAX_QUEUED_CALLBACKS;
+import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.RETURN_IMMEDIATELY;
 import static com.google.common.base.MoreObjects.firstNonNull;
 import static com.google.common.base.Preconditions.checkArgument;
 
@@ -496,12 +497,16 @@ public Iterator pull(String subscription, int maxMessages) {
   }
 
   @Override
-  public Future> pullAsync(final String subscription, int maxMessages) {
-    PullRequest request = PullRequest.newBuilder().setReturnImmediately(true)
+  public Future> pullAsync(final String subscription, int maxMessages,
+      PullOption... options) {
+    PullRequest.Builder builder = PullRequest.newBuilder().setReturnImmediately(true)
         .setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription))
-        .setMaxMessages(maxMessages)
-        .setReturnImmediately(true)
-        .build();
+        .setMaxMessages(maxMessages);
+    Map optionMap = optionMap(options);
+    if (RETURN_IMMEDIATELY.getBoolean(optionMap) != null) {
+      builder.setReturnImmediately(RETURN_IMMEDIATELY.getBoolean(optionMap));
+    }
+    PullRequest request = builder.build();
     PullFuture future = rpc.pull(request);
     future.addCallback(new PubSubRpc.PullCallback() {
       @Override
@@ -534,7 +539,7 @@ public ReceivedMessage apply(com.google.pubsub.v1.ReceivedMessage receivedMessag
 
   @Override
   public MessageConsumer pullAsync(String subscription, MessageProcessor callback,
-      PullOption... options) {
+      MessageConsumerOption... options) {
     Map optionMap = optionMap(options);
     return MessageConsumerImpl.builder(options(), subscription, ackDeadlineRenewer, callback)
         .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS.getInteger(optionMap))
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java
index 4860ac829fdb..19e97346195c 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java
@@ -21,6 +21,7 @@
 import com.google.cloud.GrpcServiceOptions;
 import com.google.cloud.Policy;
 import com.google.cloud.pubsub.PubSub.MessageConsumer;
+import com.google.cloud.pubsub.PubSub.MessageConsumerOption;
 import com.google.cloud.pubsub.PubSub.MessageProcessor;
 import com.google.cloud.pubsub.PubSub.PullOption;
 import com.google.common.base.Function;
@@ -44,11 +45,11 @@
  * indicates that the Pub/Sub server should resend it (implicit "nack").
  *
  * 

In a pull subscription, the subscribing application must explicitly pull messages using one of - * {@link PubSub#pull(String, int)}, {@link PubSub#pullAsync(String, int)} or - * {@link PubSub#pullAsync(String, PubSub.MessageProcessor callback, PubSub.PullOption...)}. + * {@link PubSub#pull(String, int)}, {@link PubSub#pullAsync(String, int, PullOption...)} or + * {@link PubSub#pullAsync(String, PubSub.MessageProcessor callback, MessageConsumerOption...)}. * When messages are pulled with {@link PubSub#pull(String, int)} or - * {@link PubSub#pullAsync(String, int)} the subscribing application must also explicitly - * acknowledge them using one of {@link PubSub#ack(String, Iterable)}, + * {@link PubSub#pullAsync(String, int, PullOption...)} the subscribing application must also + * explicitly acknowledge them using one of {@link PubSub#ack(String, Iterable)}, * {@link PubSub#ack(String, String, String...)}, {@link PubSub#ackAsync(String, Iterable)} or * {@link PubSub#ackAsync(String, String, String...)}. * @@ -349,8 +350,8 @@ public Iterator pull(int maxMessages) { * possibly return fewer messages. * @throws PubSubException upon failure */ - public Future> pullAsync(int maxMessages) { - return pubsub.pullAsync(name(), maxMessages); + public Future> pullAsync(int maxMessages, PullOption... options) { + return pubsub.pullAsync(name(), maxMessages, options); } /** @@ -362,10 +363,10 @@ public Future> pullAsync(int maxMessages) { * all pulled messages, the ack deadline is automatically renewed until the message is either * acknowledged or "nacked". * - *

The {@link PullOption#maxQueuedCallbacks(int)} option can be used to control the maximum - * number of queued messages (messages either being processed or waiting to be processed). The - * {@link PullOption#executorFactory(GrpcServiceOptions.ExecutorFactory)} can be used to provide - * an executor to run message processor callbacks. + *

The {@link MessageConsumerOption#maxQueuedCallbacks(int)} option can be used to control the + * maximum number of queued messages (messages either being processed or waiting to be processed). + * The {@link MessageConsumerOption#executorFactory(GrpcServiceOptions.ExecutorFactory)} can be + * used to provide an executor to run message processor callbacks. * *

Example of continuously pulling messages from the subscription. *

 {@code
@@ -387,7 +388,7 @@ public Future> pullAsync(int maxMessages) {
    * @param options pulling options
    * @return a message consumer for the provided subscription and options
    */
-  public MessageConsumer pullAsync(MessageProcessor callback, PullOption... options) {
+  public MessageConsumer pullAsync(MessageProcessor callback, MessageConsumerOption... options) {
     return pubsub.pullAsync(name(), callback, options);
   }
 
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java
index e85a20b53334..ecb1a7a4d3e2 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java
@@ -18,6 +18,7 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.cloud.pubsub.PubSub.PullOption;
 import com.google.cloud.pubsub.spi.v1.SubscriberApi;
 import com.google.common.base.MoreObjects;
 
@@ -37,11 +38,11 @@
  * indicates that the Pub/Sub server should resend it (implicit "nack").
  *
  * 

In a pull subscription, the subscribing application must explicitly pull messages using one of - * {@link PubSub#pull(String, int)}, {@link PubSub#pullAsync(String, int)} or - * {@link PubSub#pullAsync(String, PubSub.MessageProcessor callback, PubSub.PullOption...)}. - * When messages are pulled with {@link PubSub#pull(String, int)} or - * {@link PubSub#pullAsync(String, int)} the subscribing application must also explicitly - * acknowledge them using one of {@link PubSub#ack(String, Iterable)}, + * {@link PubSub#pull(String, int)}, {@link PubSub#pullAsync(String, int, PullOption...)} or + * {@link PubSub#pullAsync(String, PubSub.MessageProcessor callback, + * PubSub.MessageConsumerOption...)}. When messages are pulled with {@link PubSub#pull(String, int)} + * or {@link PubSub#pullAsync(String, int, PullOption...)} the subscribing application must also + * explicitly acknowledge them using one of {@link PubSub#ack(String, Iterable)}, * {@link PubSub#ack(String, String, String...)}, {@link PubSub#ackAsync(String, Iterable)} or * {@link PubSub#ackAsync(String, String, String...)}. * diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java index b46379cc402a..3ec47d9ba816 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java @@ -36,6 +36,7 @@ import com.google.cloud.pubsub.PubSub.ListOption; import com.google.cloud.pubsub.PubSub.MessageConsumer; import com.google.cloud.pubsub.PubSub.MessageProcessor; +import com.google.cloud.pubsub.PubSub.MessageConsumerOption; import com.google.cloud.pubsub.PubSub.PullOption; import com.google.cloud.pubsub.spi.PubSubRpc; import com.google.cloud.pubsub.spi.PubSubRpc.PullCallback; @@ -74,6 +75,7 @@ import org.easymock.IAnswer; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -1332,6 +1334,44 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept EasyMock.verify(futureMock); } + @Test + public void testPullMessagesAsyncWithOptions() throws ExecutionException, InterruptedException { + pubsub = new PubSubImpl(options, renewerMock); + PullRequest request = PullRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .setMaxMessages(42) + .setReturnImmediately(false) + .build(); + List messageList = ImmutableList.of( + ReceivedMessage.fromPb(pubsub, SUBSCRIPTION, MESSAGE_PB1), + ReceivedMessage.fromPb(pubsub, SUBSCRIPTION, MESSAGE_PB2)); + PullResponse response = PullResponse.newBuilder() + .addReceivedMessages(MESSAGE_PB1) + .addReceivedMessages(MESSAGE_PB2) + .build(); + Capture callback = Capture.newInstance(); + PullFuture futureMock = EasyMock.createStrictMock(PullFuture.class); + futureMock.addCallback(EasyMock.capture(callback)); + EasyMock.expectLastCall(); + EasyMock.expect(futureMock.get()).andReturn(response); + EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(futureMock); + renewerMock.add(SUBSCRIPTION, ImmutableList.of("ackId1", "ackId2")); + EasyMock.replay(pubsubRpcMock, renewerMock, futureMock); + Iterator messageIterator = + pubsub.pullAsync(SUBSCRIPTION, 42, PullOption.returnImmediately(false)).get(); + callback.getValue().success(response); + EasyMock.reset(renewerMock); + for (ReceivedMessage message : messageList) { + renewerMock.remove(SUBSCRIPTION, message.ackId()); + EasyMock.expectLastCall(); + } + EasyMock.replay(renewerMock); + while (messageIterator.hasNext()) { + messageIterator.next(); + } + EasyMock.verify(futureMock); + } + @Test public void testPullMessagesError() throws ExecutionException, InterruptedException { pubsub = new PubSubImpl(options, renewerMock); @@ -1435,8 +1475,8 @@ public PullFuture answer() throws Throwable { } }); EasyMock.replay(pubsubRpcMock, renewerMock, executorFactoryMock, executorServiceMock); - PullOption[] options = - {PullOption.maxQueuedCallbacks(42), PullOption.executorFactory(executorFactoryMock)}; + MessageConsumerOption[] options = {MessageConsumerOption.maxQueuedCallbacks(42), + MessageConsumerOption.executorFactory(executorFactoryMock)}; try (MessageConsumer consumer = pubsub.pullAsync(SUBSCRIPTION, DO_NOTHING, options)) { latch.await(); } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java index f99cfa2f728e..2718a7f9fc60 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java @@ -21,6 +21,7 @@ import com.google.cloud.GrpcServiceOptions.ExecutorFactory; import com.google.cloud.pubsub.PubSub.ListOption; +import com.google.cloud.pubsub.PubSub.MessageConsumerOption; import com.google.cloud.pubsub.PubSub.PullOption; import org.easymock.EasyMock; @@ -31,6 +32,7 @@ public class PubSubTest { private static final int PAGE_SIZE = 42; private static final String PAGE_TOKEN = "page token"; private static final int MAX_QUEUED_CALLBACKS = 42; + private static final boolean RETURN_IMMEDIATELY = false; @Test public void testListOption() { @@ -45,15 +47,23 @@ public void testListOption() { } @Test - @SuppressWarnings("unchecked") - public void testPullOptions() { + public void testMessageConsumerOptions() { // max queued callbacks - PullOption pullOption = PullOption.maxQueuedCallbacks(MAX_QUEUED_CALLBACKS); + MessageConsumerOption pullOption = + MessageConsumerOption.maxQueuedCallbacks(MAX_QUEUED_CALLBACKS); assertEquals(MAX_QUEUED_CALLBACKS, pullOption.value()); - assertEquals(PullOption.OptionType.MAX_QUEUED_CALLBACKS, pullOption.optionType()); + assertEquals(MessageConsumerOption.OptionType.MAX_QUEUED_CALLBACKS, pullOption.optionType()); ExecutorFactory executorFactory = EasyMock.createStrictMock(ExecutorFactory.class); - pullOption = PullOption.executorFactory(executorFactory); + // executor factory + pullOption = MessageConsumerOption.executorFactory(executorFactory); assertSame(executorFactory, pullOption.value()); - assertEquals(PullOption.OptionType.EXECUTOR_FACTORY, pullOption.optionType()); + assertEquals(MessageConsumerOption.OptionType.EXECUTOR_FACTORY, pullOption.optionType()); + } + + @Test + public void testPullOptions() { + PullOption pullOption = PullOption.returnImmediately(RETURN_IMMEDIATELY); + assertEquals(RETURN_IMMEDIATELY, pullOption.value()); + assertEquals(PullOption.OptionType.RETURN_IMMEDIATELY, pullOption.optionType()); } } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/SerializationTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/SerializationTest.java index 8201f0d6413c..dcf836720caa 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/SerializationTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/SerializationTest.java @@ -21,6 +21,7 @@ import com.google.cloud.GrpcServiceOptions.ExecutorFactory; import com.google.cloud.Restorable; import com.google.cloud.pubsub.PubSub.ListOption; +import com.google.cloud.pubsub.PubSub.MessageConsumerOption; import com.google.cloud.pubsub.PubSub.PullOption; import java.io.Serializable; @@ -50,9 +51,12 @@ public class SerializationTest extends BaseSerializationTest { new Topic(PUB_SUB, new TopicInfo.BuilderImpl(TOPIC_INFO)); private static final ListOption PAGE_TOKEN_OPTION = ListOption.pageToken("cursor"); private static final ListOption PAGE_SIZE_OPTION = ListOption.pageSize(42); - private static final PullOption MAX_QUEUED_CALLBACKS_OPTION = PullOption.maxQueuedCallbacks(42); - private static final PullOption EXECUTOR_FACTORY_OPTION = - PullOption.executorFactory(new TestExecutorFactory()); + private static final MessageConsumerOption MAX_QUEUED_CALLBACKS_OPTION = + MessageConsumerOption.maxQueuedCallbacks(42); + private static final MessageConsumerOption EXECUTOR_FACTORY_OPTION = + MessageConsumerOption.executorFactory(new TestExecutorFactory()); + private static final PullOption RETURN_IMMEDIATELY_OPTION = + PullOption.returnImmediately(false); public static class TestExecutorFactory implements ExecutorFactory, Serializable { @@ -92,7 +96,7 @@ protected Serializable[] serializableObjects() { .build(); return new Serializable[]{options, otherOptions, MESSAGE, RECEIVED_MESSAGE, SUBSCRIPTION_INFO, SUBSCRIPTION, SUBSCRIPTION_ID, TOPIC_INFO, TOPIC, PAGE_TOKEN_OPTION, PAGE_SIZE_OPTION, - MAX_QUEUED_CALLBACKS_OPTION, EXECUTOR_FACTORY_OPTION}; + MAX_QUEUED_CALLBACKS_OPTION, EXECUTOR_FACTORY_OPTION, RETURN_IMMEDIATELY_OPTION}; } @Override diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/SubscriptionTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/SubscriptionTest.java index 7d0dbb1529c2..d526d50eec48 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/SubscriptionTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/SubscriptionTest.java @@ -32,6 +32,7 @@ import com.google.cloud.Role; import com.google.cloud.pubsub.PubSub.MessageConsumer; import com.google.cloud.pubsub.PubSub.MessageProcessor; +import com.google.cloud.pubsub.PubSub.MessageConsumerOption; import com.google.cloud.pubsub.PubSub.PullOption; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -292,6 +293,24 @@ public void testPullAsync() throws ExecutionException, InterruptedException { assertEquals(messages, Lists.newArrayList(subscription.pullAsync(42).get())); } + @Test + public void testPullAsyncWithOptions() throws ExecutionException, InterruptedException { + initializeExpectedSubscription(1); + expect(pubsub.options()).andReturn(mockOptions).times(2); + replay(pubsub); + ReceivedMessage message1 = ReceivedMessage.fromPb(pubsub, NAME, MESSAGE_PB1); + ReceivedMessage message2 = ReceivedMessage.fromPb(pubsub, NAME, MESSAGE_PB2); + reset(pubsub); + expect(pubsub.options()).andReturn(mockOptions); + List messages = ImmutableList.of(message1, message2); + expect(pubsub.pullAsync(NAME, 42, PullOption.returnImmediately(false))) + .andReturn(Futures.immediateFuture(messages.iterator())); + replay(pubsub); + initializeSubscription(); + assertEquals(messages, + Lists.newArrayList(subscription.pullAsync(42, PullOption.returnImmediately(false)).get())); + } + @Test public void testMessageConsumer() throws ExecutionException, InterruptedException { initializeExpectedSubscription(1); @@ -313,12 +332,12 @@ public void testMessageConsumerWithOptions() throws ExecutionException, Interrup MessageProcessor messageProcessor = createStrictMock(MessageProcessor.class); replay(messageConsumer, messageProcessor); expect(pubsub.options()).andReturn(mockOptions); - expect(pubsub.pullAsync(NAME, messageProcessor, PullOption.maxQueuedCallbacks(2))) + expect(pubsub.pullAsync(NAME, messageProcessor, MessageConsumerOption.maxQueuedCallbacks(2))) .andReturn(messageConsumer); replay(pubsub); initializeSubscription(); assertSame(messageConsumer, - subscription.pullAsync(messageProcessor, PullOption.maxQueuedCallbacks(2))); + subscription.pullAsync(messageProcessor, MessageConsumerOption.maxQueuedCallbacks(2))); verify(messageConsumer, messageProcessor); }