Skip to content

Commit

Permalink
Add returnImmediately option to PubSub's pullAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
mziccard committed Oct 3, 2016
1 parent 9c81978 commit c61868b
Show file tree
Hide file tree
Showing 10 changed files with 199 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ public void pull(String subscriptionName) {
/**
* Example of asynchronously pulling a maximum number of messages from a subscription.
*/
// [TARGET pullAsync(String, int)]
// [TARGET pullAsync(String, int, PullOption...)]
// [VARIABLE "my_subscription_name"]
public void pullAsync(String subscriptionName) throws ExecutionException, InterruptedException {
// [START pullAsync]
Expand All @@ -552,7 +552,7 @@ public void pullAsync(String subscriptionName) throws ExecutionException, Interr
/**
* Example of continuously pulling messages from a subscription.
*/
// [TARGET pullAsync(String, MessageProcessor, PullOption...)]
// [TARGET pullAsync(String, MessageProcessor, MessageConsumerOption...)]
// [VARIABLE "my_subscription_name"]
public void pullWithMessageConsumer(String subscriptionName) throws Exception {
// [START pullWithMessageConsumer]
Expand Down Expand Up @@ -929,7 +929,8 @@ public List<Boolean> testTopicPermissions(String topicName) {
}

/**
* Example of asynchronously testing whether the caller has the provided permissions on a topic.
* Example of asynchronously testing whether the caller has the provided permissions on a
* topic.
*/
// [TARGET testTopicPermissionsAsync(String, List)]
// [VARIABLE "my_topic_name"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void pull() {
/**
* Example of asynchronously pulling a maximum number of messages from the subscription.
*/
// [TARGET pullAsync(int)]
// [TARGET pullAsync(int, PullOption...)]
public void pullAsync() throws ExecutionException, InterruptedException {
// [START pullAsync]
Future<Iterator<ReceivedMessage>> future = subscription.pullAsync(100);
Expand All @@ -203,7 +203,7 @@ public void pullAsync() throws ExecutionException, InterruptedException {
/**
* Example of continuously pulling messages from the subscription.
*/
// [TARGET pullAsync(MessageProcessor, PullOption...)]
// [TARGET pullAsync(MessageProcessor, MessageConsumerOption...)]
// [VARIABLE "my_subscription_name"]
public void pullWithMessageConsumer(String subscriptionName) throws Exception {
// [START pullWithMessageConsumer]
Expand Down
106 changes: 75 additions & 31 deletions google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,40 @@ public static ListOption pageToken(String pageToken) {
*/
final class PullOption extends Option {

private static final long serialVersionUID = 8335266553263173237L;

enum OptionType implements Option.OptionType {
RETURN_IMMEDIATELY;

@SuppressWarnings("unchecked")
<T> T get(Map<Option.OptionType, ?> options) {
return (T) options.get(this);
}

Boolean getBoolean(Map<Option.OptionType, ?> options) {
return get(options);
}
}

private PullOption(Option.OptionType option, Object value) {
super(option, value);
}

/**
* Returns an option to specify whether the pull operation should always return immediately or
* should rather wait until at least one message is available for pulling. If not provided,
* pull operations return immediatelly.
*/
public static PullOption returnImmediately(boolean returnImmediately) {
return new PullOption(OptionType.RETURN_IMMEDIATELY, returnImmediately);
}
}

/**
* Class for specifying options for pulling messages with a {@link MessageConsumer}.
*/
final class MessageConsumerOption extends Option {

private static final long serialVersionUID = 4792164134340316582L;

enum OptionType implements Option.OptionType {
Expand All @@ -104,7 +138,7 @@ ExecutorFactory<ExecutorService> getExecutorFactory(Map<Option.OptionType, ?> op
}
}

private PullOption(Option.OptionType option, Object value) {
private MessageConsumerOption(Option.OptionType option, Object value) {
super(option, value);
}

Expand All @@ -115,8 +149,8 @@ private PullOption(Option.OptionType option, Object value) {
* until they are acknowledged or "nacked". If not provided, at most 100 messages can be in the
* queue.
*/
public static PullOption maxQueuedCallbacks(int maxQueuedCallbacks) {
return new PullOption(OptionType.MAX_QUEUED_CALLBACKS, maxQueuedCallbacks);
public static MessageConsumerOption maxQueuedCallbacks(int maxQueuedCallbacks) {
return new MessageConsumerOption(OptionType.MAX_QUEUED_CALLBACKS, maxQueuedCallbacks);
}

/**
Expand All @@ -134,8 +168,8 @@ public static PullOption maxQueuedCallbacks(int maxQueuedCallbacks) {
*
* @param executorFactory the executor factory.
*/
public static PullOption executorFactory(ExecutorFactory executorFactory) {
return new PullOption(OptionType.EXECUTOR_FACTORY, executorFactory);
public static MessageConsumerOption executorFactory(ExecutorFactory executorFactory) {
return new MessageConsumerOption(OptionType.EXECUTOR_FACTORY, executorFactory);
}
}

Expand Down Expand Up @@ -728,9 +762,13 @@ interface MessageConsumer extends AutoCloseable {
/**
* Sends a request for pulling messages from the provided subscription. This method returns a
* {@code Future} object to consume the result. {@link Future#get()} returns a message iterator.
* This method possibly returns no messages if no message was available at the time the request
* If {@link PullOption#returnImmediately(boolean)} is not provided or set to {@code true}, this
* method possibly returns no messages if no message was available at the time the request
* was processed by the Pub/Sub service (i.e. the system is not allowed to wait until at least one
* message is available).
* message is available). If {@link PullOption#returnImmediately(boolean)} is set to {@code false}
* the pull operation waits until at least one message is available or
* {@link PubSubOptions#initialTimeout()} is reached. If {@link PubSubOptions#initialTimeout()} is
* exceeded, {@link Future#get()} throws a {@link java.util.concurrent.ExecutionException}.
*
* <p>Example of asynchronously pulling a maximum number of messages from a subscription.
* <pre> {@code
Expand All @@ -749,9 +787,10 @@ interface MessageConsumer extends AutoCloseable {
* @param subscription the subscription from which to pull messages
* @param maxMessages the maximum number of messages pulled by this method. This method can
* possibly return fewer messages.
* @throws PubSubException upon failure
* @param options pulling options
*/
Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages);
Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages,
PullOption... options);

/**
* Creates a message consumer that pulls messages from the provided subscription. You can stop
Expand All @@ -762,10 +801,10 @@ interface MessageConsumer extends AutoCloseable {
* all pulled messages, the ack deadline is automatically renewed until the message is either
* acknowledged or "nacked".
*
* <p>The {@link PullOption#maxQueuedCallbacks(int)} option can be used to control the maximum
* number of queued messages (messages either being processed or waiting to be processed). The
* {@link PullOption#executorFactory(ExecutorFactory)} can be used to provide an executor to run
* message processor callbacks.
* <p>The {@link MessageConsumerOption#maxQueuedCallbacks(int)} option can be used to control the
* maximum number of queued messages (messages either being processed or waiting to be processed).
* The {@link MessageConsumerOption#executorFactory(ExecutorFactory)} can be used to provide an
* executor to run message processor callbacks.
*
* <p>Example of continuously pulling messages from a subscription.
* <pre> {@code
Expand All @@ -788,12 +827,13 @@ interface MessageConsumer extends AutoCloseable {
* @param options pulling options
* @return a message consumer for the provided subscription and options
*/
MessageConsumer pullAsync(String subscription, MessageProcessor callback, PullOption... options);
MessageConsumer pullAsync(String subscription, MessageProcessor callback,
MessageConsumerOption... options);

/**
* Acknowledges the given messages for the provided subscription. Ack ids identify the messages to
* acknowledge, as returned in {@link ReceivedMessage#ackId()} by {@link #pull(String, int)} and
* {@link #pullAsync(String, int)}.
* {@link #pullAsync(String, int, PullOption...)}.
*
* <p>Example of acking one message.
* <pre> {@code
Expand All @@ -820,8 +860,9 @@ interface MessageConsumer extends AutoCloseable {
/**
* Sends a request to acknowledge the given messages for the provided subscription. Ack ids
* identify the messages to acknowledge, as returned in {@link ReceivedMessage#ackId()} by
* {@link #pull(String, int)} and {@link #pullAsync(String, int)}. The method returns a
* {@code Future} object that can be used to wait for the acknowledge operation to be completed.
* {@link #pull(String, int)} and {@link #pullAsync(String, int, PullOption...)}. The method
* returns a {@code Future} object that can be used to wait for the acknowledge operation to be
* completed.
*
* <p>Example of asynchronously acking one message.
* <pre> {@code
Expand Down Expand Up @@ -851,7 +892,7 @@ interface MessageConsumer extends AutoCloseable {
/**
* Acknowledges the given messages for the provided subscription. Ack ids identify the messages to
* acknowledge, as returned in {@link ReceivedMessage#ackId()} by {@link #pull(String, int)} and
* {@link #pullAsync(String, int)}.
* {@link #pullAsync(String, int, PullOption...)}.
*
* <p>Example of acking a list of messages.
* <pre> {@code
Expand All @@ -873,8 +914,9 @@ interface MessageConsumer extends AutoCloseable {
/**
* Sends a request to acknowledge the given messages for the provided subscription. Ack ids
* identify the messages to acknowledge, as returned in {@link ReceivedMessage#ackId()} by
* {@link #pull(String, int)} and {@link #pullAsync(String, int)}. The method returns a
* {@code Future} object that can be used to wait for the acknowledge operation to be completed.
* {@link #pull(String, int)} and {@link #pullAsync(String, int, PullOption...)}. The method
* returns a {@code Future} object that can be used to wait for the acknowledge operation to be
* completed.
*
* <p>Example of asynchronously acking a list of messages.
* <pre> {@code
Expand All @@ -897,7 +939,7 @@ interface MessageConsumer extends AutoCloseable {
/**
* "Nacks" the given messages for the provided subscription. Ack ids identify the messages to
* "nack", as returned in {@link ReceivedMessage#ackId()} by {@link #pull(String, int)} and
* {@link #pullAsync(String, int)}. This method corresponds to calling
* {@link #pullAsync(String, int, PullOption...)}. This method corresponds to calling
* {@link #modifyAckDeadline(String, int, TimeUnit, String, String...)} with a deadline of 0.
*
* <p>Example of nacking one message.
Expand Down Expand Up @@ -925,10 +967,11 @@ interface MessageConsumer extends AutoCloseable {
/**
* Sends a request to "nack" the given messages for the provided subscription. Ack ids identify
* the messages to "nack", as returned in {@link ReceivedMessage#ackId()} by
* {@link #pull(String, int)} and {@link #pullAsync(String, int)}. This method corresponds to
* calling {@link #modifyAckDeadlineAsync(String, int, TimeUnit, String, String...)} with a
* deadline of 0. The method returns a {@code Future} object that can be used to wait for the
* "nack" operation to be completed.
* {@link #pull(String, int)} and {@link #pullAsync(String, int, PullOption...)}. This method
* corresponds to calling
* {@link #modifyAckDeadlineAsync(String, int, TimeUnit, String, String...)} with a deadline of 0.
* The method returns a {@code Future} object that can be used to wait for the "nack" operation to
* be completed.
*
* <p>Example of asynchronously nacking one message.
* <pre> {@code
Expand Down Expand Up @@ -958,7 +1001,7 @@ interface MessageConsumer extends AutoCloseable {
/**
* "Nacks" the given messages for the provided subscription. Ack ids identify the messages to
* "nack", as returned in {@link ReceivedMessage#ackId()} by {@link #pull(String, int)} and
* {@link #pullAsync(String, int)}. This method corresponds to calling
* {@link #pullAsync(String, int, PullOption...)}. This method corresponds to calling
* {@link #modifyAckDeadline(String, int, TimeUnit, Iterable)} with a deadline of 0.
*
* <p>Example of nacking a list of messages.
Expand All @@ -981,10 +1024,10 @@ interface MessageConsumer extends AutoCloseable {
/**
* Sends a request to "nack" the given messages for the provided subscription. Ack ids identify
* the messages to "nack", as returned in {@link ReceivedMessage#ackId()} by
* {@link #pull(String, int)} and {@link #pullAsync(String, int)}. This method corresponds to
* calling {@link #modifyAckDeadlineAsync(String, int, TimeUnit, Iterable)} with a deadline of 0.
* The method returns a {@code Future} object that can be used to wait for the "nack" operation to
* be completed.
* {@link #pull(String, int)} and {@link #pullAsync(String, int, PullOption...)}. This method
* corresponds to calling {@link #modifyAckDeadlineAsync(String, int, TimeUnit, Iterable)} with a
* deadline of 0. The method returns a {@code Future} object that can be used to wait for the
* "nack" operation to be completed.
*
* <p>Example of asynchronously nacking a list of messages.
* <pre> {@code
Expand Down Expand Up @@ -1269,7 +1312,8 @@ Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit
* such as a customized graphical user interface. For example, the Cloud Platform Console tests
* IAM permissions internally to determine which UI should be available to the logged-in user.
*
* <p>Example of asynchronously testing whether the caller has the provided permissions on a topic.
* <p>Example of asynchronously testing whether the caller has the provided permissions on a
* topic.
* <pre> {@code
* String topicName = "my_topic_name";
* List<String> permissions = new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_SIZE;
import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_TOKEN;
import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.EXECUTOR_FACTORY;
import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.MAX_QUEUED_CALLBACKS;
import static com.google.cloud.pubsub.PubSub.MessageConsumerOption.OptionType.EXECUTOR_FACTORY;
import static com.google.cloud.pubsub.PubSub.MessageConsumerOption.OptionType.MAX_QUEUED_CALLBACKS;
import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.RETURN_IMMEDIATELY;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;

Expand Down Expand Up @@ -496,12 +497,16 @@ public Iterator<ReceivedMessage> pull(String subscription, int maxMessages) {
}

@Override
public Future<Iterator<ReceivedMessage>> pullAsync(final String subscription, int maxMessages) {
PullRequest request = PullRequest.newBuilder().setReturnImmediately(true)
public Future<Iterator<ReceivedMessage>> pullAsync(final String subscription, int maxMessages,
PullOption... options) {
PullRequest.Builder builder = PullRequest.newBuilder().setReturnImmediately(true)
.setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription))
.setMaxMessages(maxMessages)
.setReturnImmediately(true)
.build();
.setMaxMessages(maxMessages);
Map<Option.OptionType, ?> optionMap = optionMap(options);
if (RETURN_IMMEDIATELY.getBoolean(optionMap) != null) {
builder.setReturnImmediately(RETURN_IMMEDIATELY.getBoolean(optionMap));
}
PullRequest request = builder.build();
PullFuture future = rpc.pull(request);
future.addCallback(new PubSubRpc.PullCallback() {
@Override
Expand Down Expand Up @@ -534,7 +539,7 @@ public ReceivedMessage apply(com.google.pubsub.v1.ReceivedMessage receivedMessag

@Override
public MessageConsumer pullAsync(String subscription, MessageProcessor callback,
PullOption... options) {
MessageConsumerOption... options) {
Map<Option.OptionType, ?> optionMap = optionMap(options);
return MessageConsumerImpl.builder(options(), subscription, ackDeadlineRenewer, callback)
.maxQueuedCallbacks(MAX_QUEUED_CALLBACKS.getInteger(optionMap))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.cloud.GrpcServiceOptions;
import com.google.cloud.Policy;
import com.google.cloud.pubsub.PubSub.MessageConsumer;
import com.google.cloud.pubsub.PubSub.MessageConsumerOption;
import com.google.cloud.pubsub.PubSub.MessageProcessor;
import com.google.cloud.pubsub.PubSub.PullOption;
import com.google.common.base.Function;
Expand All @@ -44,11 +45,11 @@
* indicates that the Pub/Sub server should resend it (implicit "nack").
*
* <p>In a pull subscription, the subscribing application must explicitly pull messages using one of
* {@link PubSub#pull(String, int)}, {@link PubSub#pullAsync(String, int)} or
* {@link PubSub#pullAsync(String, PubSub.MessageProcessor callback, PubSub.PullOption...)}.
* {@link PubSub#pull(String, int)}, {@link PubSub#pullAsync(String, int, PullOption...)} or
* {@link PubSub#pullAsync(String, PubSub.MessageProcessor callback, MessageConsumerOption...)}.
* When messages are pulled with {@link PubSub#pull(String, int)} or
* {@link PubSub#pullAsync(String, int)} the subscribing application must also explicitly
* acknowledge them using one of {@link PubSub#ack(String, Iterable)},
* {@link PubSub#pullAsync(String, int, PullOption...)} the subscribing application must also
* explicitly acknowledge them using one of {@link PubSub#ack(String, Iterable)},
* {@link PubSub#ack(String, String, String...)}, {@link PubSub#ackAsync(String, Iterable)} or
* {@link PubSub#ackAsync(String, String, String...)}.
*
Expand Down Expand Up @@ -349,8 +350,8 @@ public Iterator<ReceivedMessage> pull(int maxMessages) {
* possibly return fewer messages.
* @throws PubSubException upon failure
*/
public Future<Iterator<ReceivedMessage>> pullAsync(int maxMessages) {
return pubsub.pullAsync(name(), maxMessages);
public Future<Iterator<ReceivedMessage>> pullAsync(int maxMessages, PullOption... options) {
return pubsub.pullAsync(name(), maxMessages, options);
}

/**
Expand All @@ -362,10 +363,10 @@ public Future<Iterator<ReceivedMessage>> pullAsync(int maxMessages) {
* all pulled messages, the ack deadline is automatically renewed until the message is either
* acknowledged or "nacked".
*
* <p>The {@link PullOption#maxQueuedCallbacks(int)} option can be used to control the maximum
* number of queued messages (messages either being processed or waiting to be processed). The
* {@link PullOption#executorFactory(GrpcServiceOptions.ExecutorFactory)} can be used to provide
* an executor to run message processor callbacks.
* <p>The {@link MessageConsumerOption#maxQueuedCallbacks(int)} option can be used to control the
* maximum number of queued messages (messages either being processed or waiting to be processed).
* The {@link MessageConsumerOption#executorFactory(GrpcServiceOptions.ExecutorFactory)} can be
* used to provide an executor to run message processor callbacks.
*
* <p>Example of continuously pulling messages from the subscription.
* <pre> {@code
Expand All @@ -387,7 +388,7 @@ public Future<Iterator<ReceivedMessage>> pullAsync(int maxMessages) {
* @param options pulling options
* @return a message consumer for the provided subscription and options
*/
public MessageConsumer pullAsync(MessageProcessor callback, PullOption... options) {
public MessageConsumer pullAsync(MessageProcessor callback, MessageConsumerOption... options) {
return pubsub.pullAsync(name(), callback, options);
}

Expand Down
Loading

0 comments on commit c61868b

Please sign in to comment.