diff --git a/gcloud-java-core/src/main/java/com/google/cloud/GrpcServiceOptions.java b/gcloud-java-core/src/main/java/com/google/cloud/GrpcServiceOptions.java index 2a1110c9362e..67361dba6e64 100644 --- a/gcloud-java-core/src/main/java/com/google/cloud/GrpcServiceOptions.java +++ b/gcloud-java-core/src/main/java/com/google/cloud/GrpcServiceOptions.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.util.Objects; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -50,7 +51,7 @@ public abstract class GrpcServiceOptions, Ser private final double timeoutMultiplier; private final int maxTimeout; - private transient ExecutorFactory executorFactory; + private transient ExecutorFactory executorFactory; /** * Shared thread pool executor. @@ -73,30 +74,32 @@ public void close(ScheduledExecutorService instance) { }; /** - * An interface for {@link ScheduledExecutorService} factories. Implementations of this interface - * can be used to provide an user-defined scheduled executor to execute requests. Any - * implementation of this interface must override the {@code get()} method to return the desired - * executor. The {@code release(executor)} method should be overriden to free resources used by - * the executor (if needed) according to application's logic. + * An interface for {@link ExecutorService} factories. Implementations of this interface can be + * used to provide an user-defined executor to execute requests. Any implementation of this + * interface must override the {@code get()} method to return the desired executor. The + * {@code release(executor)} method should be overriden to free resources used by the executor (if + * needed) according to application's logic. * *

Implementation must provide a public no-arg constructor. Loading of a factory implementation * is done via {@link java.util.ServiceLoader}. + * + * @param the {@link ExecutorService} subclass created by this factory */ - public interface ExecutorFactory { + public interface ExecutorFactory { /** - * Gets a scheduled executor service instance. + * Gets an executor service instance. */ - ScheduledExecutorService get(); + T get(); /** * Releases resources used by the executor and possibly shuts it down. */ - void release(ScheduledExecutorService executor); + void release(T executor); } @VisibleForTesting - static class DefaultExecutorFactory implements ExecutorFactory { + static class DefaultExecutorFactory implements ExecutorFactory { private static final DefaultExecutorFactory INSTANCE = new DefaultExecutorFactory(); @@ -148,7 +151,7 @@ protected Builder(GrpcServiceOptions options) { * * @return the builder */ - public B executorFactory(ExecutorFactory executorFactory) { + public B executorFactory(ExecutorFactory executorFactory) { this.executorFactory = executorFactory; return self(); } @@ -192,6 +195,7 @@ public B maxTimeout(int maxTimeout) { } } + @SuppressWarnings("unchecked") protected GrpcServiceOptions( Class> serviceFactoryClass, Class> rpcFactoryClass, Builder executorFactory() { return executorFactory; } diff --git a/gcloud-java-core/src/test/java/com/google/cloud/GrpcServiceOptionsTest.java b/gcloud-java-core/src/test/java/com/google/cloud/GrpcServiceOptionsTest.java index 0a3c34f87916..457f05cd0ba9 100644 --- a/gcloud-java-core/src/test/java/com/google/cloud/GrpcServiceOptionsTest.java +++ b/gcloud-java-core/src/test/java/com/google/cloud/GrpcServiceOptionsTest.java @@ -211,7 +211,7 @@ public void testBaseHashCode() { @Test public void testDefaultExecutorFactory() { - ExecutorFactory executorFactory = new DefaultExecutorFactory(); + ExecutorFactory executorFactory = new DefaultExecutorFactory(); ScheduledExecutorService executorService = executorFactory.get(); assertSame(executorService, executorFactory.get()); } diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java index b51fef7890e2..d72f788cff65 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java @@ -48,7 +48,7 @@ class AckDeadlineRenewer implements AutoCloseable { private final PubSub pubsub; private final ScheduledExecutorService executor; - private final ExecutorFactory executorFactory; + private final ExecutorFactory executorFactory; private final Clock clock; private final Queue messageQueue; private final Map messageDeadlines; diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java new file mode 100644 index 000000000000..cb08dde00327 --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java @@ -0,0 +1,301 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import static com.google.cloud.pubsub.spi.v1.SubscriberApi.formatSubscriptionName; +import static com.google.common.base.MoreObjects.firstNonNull; + +import com.google.cloud.GrpcServiceOptions.ExecutorFactory; +import com.google.cloud.pubsub.PubSub.MessageConsumer; +import com.google.cloud.pubsub.PubSub.MessageProcessor; +import com.google.cloud.pubsub.spi.PubSubRpc; +import com.google.cloud.pubsub.spi.PubSubRpc.PullCallback; +import com.google.cloud.pubsub.spi.PubSubRpc.PullFuture; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; + +import io.grpc.internal.SharedResourceHolder; + +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Default implementation for a message consumer. + */ +final class MessageConsumerImpl implements MessageConsumer { + + private static final int MAX_QUEUED_CALLBACKS = 100; + // shared scheduled executor, used to schedule pulls + private static final SharedResourceHolder.Resource CONSUMER_EXECUTOR = + new SharedResourceHolder.Resource() { + @Override + public ExecutorService create() { + return Executors.newSingleThreadExecutor(); + } + + @Override + public void close(ExecutorService instance) { + instance.shutdown(); + } + }; + + private final PubSubOptions pubsubOptions; + private final PubSubRpc pubsubRpc; + private final PubSub pubsub; + private final AckDeadlineRenewer deadlineRenewer; + private final String subscription; + private final MessageProcessor messageProcessor; + private final ExecutorService consumerExecutor; + private final ExecutorFactory executorFactory; + private final ExecutorService executor; + private final AtomicInteger queuedCallbacks; + private final int maxQueuedCallbacks; + private final Object futureLock = new Object(); + private final Runnable consumerRunnable; + private final NextPullPolicy pullPolicy; + private boolean closed; + private Future scheduledFuture; + private PullFuture pullerFuture; + + /** + * Interface for policies according to which the consumer should pull messages. + */ + interface NextPullPolicy { + + boolean shouldPull(int queuedCallbacks); + } + + /** + * Default pull policy. The consumer will pull again once {@code nextPullThreshold} messages out + * of {@code maxQueuedCallbacks} have been processed. + */ + static class DefaultNextPullPolicy implements NextPullPolicy { + + final int maxQueuedCallbacks; + final int nextPullThreshold; + + DefaultNextPullPolicy(int maxQueuedCallbacks, int nextPullThreshold) { + this.maxQueuedCallbacks = maxQueuedCallbacks; + this.nextPullThreshold = nextPullThreshold; + } + + @Override + public boolean shouldPull(int queuedCallbacks) { + return (maxQueuedCallbacks - queuedCallbacks) >= nextPullThreshold; + } + } + + /** + * Default executor factory for the message processor executor. By default a single-threaded + * executor is used. + */ + static class DefaultExecutorFactory implements ExecutorFactory { + + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + + @Override + public ExecutorService get() { + return executor; + } + + @Override + public void release(ExecutorService executor) { + executor.shutdownNow(); + } + } + + class ConsumerRunnable implements Runnable { + + @Override + public void run() { + if (closed) { + return; + } + pullerFuture = pubsubRpc.pull(createPullRequest()); + pullerFuture.addCallback(new PullCallback() { + @Override + public void success(PullResponse response) { + List messages = response.getReceivedMessagesList(); + queuedCallbacks.addAndGet(messages.size()); + for (com.google.pubsub.v1.ReceivedMessage message : messages) { + deadlineRenewer.add(subscription, message.getAckId()); + ReceivedMessage receivedMessage = ReceivedMessage.fromPb(pubsub, subscription, message); + executor.execute(ackingRunnable(receivedMessage)); + } + nextPull(); + } + + @Override + public void failure(Throwable error) { + if (!(error instanceof CancellationException)) { + nextPull(); + } + } + }); + } + + private PullRequest createPullRequest() { + return PullRequest.newBuilder() + .setSubscription(formatSubscriptionName(pubsubOptions.projectId(), subscription)) + .setMaxMessages(maxQueuedCallbacks - queuedCallbacks.get()) + .setReturnImmediately(false) + .build(); + } + + private Runnable ackingRunnable(final ReceivedMessage receivedMessage) { + return new Runnable() { + @Override + public void run() { + try { + messageProcessor.process(receivedMessage); + pubsub.ackAsync(receivedMessage.subscription(), receivedMessage.ackId()); + } catch (Exception ex) { + pubsub.nackAsync(receivedMessage.subscription(), receivedMessage.ackId()); + } finally { + deadlineRenewer.remove(receivedMessage.subscription(), receivedMessage.ackId()); + queuedCallbacks.decrementAndGet(); + // We can now pull more messages, according to the next pull policy. + pullIfNeeded(); + } + } + }; + } + } + + private MessageConsumerImpl(Builder builder) { + this.pubsubOptions = builder.pubsubOptions; + this.subscription = builder.subscription; + this.messageProcessor = builder.messageProcessor; + this.pubsubRpc = pubsubOptions.rpc(); + this.pubsub = pubsubOptions.service(); + this.deadlineRenewer = builder.deadlineRenewer; + this.queuedCallbacks = new AtomicInteger(); + this.consumerExecutor = SharedResourceHolder.get(CONSUMER_EXECUTOR); + this.executorFactory = + builder.executorFactory != null ? builder.executorFactory : new DefaultExecutorFactory(); + this.executor = executorFactory.get(); + this.maxQueuedCallbacks = firstNonNull(builder.maxQueuedCallbacks, MAX_QUEUED_CALLBACKS); + this.consumerRunnable = new ConsumerRunnable(); + int nextPullThreshold = builder.nextPullThreshold != null ? builder.nextPullThreshold + : this.maxQueuedCallbacks / 2; + this.pullPolicy = new DefaultNextPullPolicy(maxQueuedCallbacks, nextPullThreshold); + nextPull(); + } + + private void pullIfNeeded() { + synchronized (futureLock) { + if (closed || scheduledFuture != null || !pullPolicy.shouldPull(queuedCallbacks.get())) { + return; + } + scheduledFuture = consumerExecutor.submit(consumerRunnable); + } + } + + private void nextPull() { + synchronized (futureLock) { + if (closed || queuedCallbacks.get() == maxQueuedCallbacks) { + scheduledFuture = null; + return; + } + scheduledFuture = consumerExecutor.submit(consumerRunnable); + } + } + + @Override + public void close() { + synchronized (futureLock) { + if (closed) { + return; + } + closed = true; + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + } + if (pullerFuture != null) { + pullerFuture.cancel(true); + } + } + SharedResourceHolder.release(CONSUMER_EXECUTOR, consumerExecutor); + executorFactory.release(executor); + } + + static final class Builder { + private final PubSubOptions pubsubOptions; + private final String subscription; + private final AckDeadlineRenewer deadlineRenewer; + private final MessageProcessor messageProcessor; + private Integer maxQueuedCallbacks; + private ExecutorFactory executorFactory; + private Integer nextPullThreshold; + + Builder(PubSubOptions pubsubOptions, String subscription, AckDeadlineRenewer deadlineRenewer, + MessageProcessor messageProcessor) { + this.pubsubOptions = pubsubOptions; + this.subscription = subscription; + this.deadlineRenewer = deadlineRenewer; + this.messageProcessor = messageProcessor; + } + + /** + * Sets the maximum number of callbacks either being executed or waiting for execution. + */ + Builder maxQueuedCallbacks(Integer maxQueuedCallbacks) { + this.maxQueuedCallbacks = maxQueuedCallbacks; + return this; + } + + /** + * Sets the executor factory, used to manage the executor that will run message processor + * callbacks message consumer. + */ + Builder executorFactory(ExecutorFactory executorFactory) { + this.executorFactory = executorFactory; + return this; + } + + /** + * Sets a threshold for the next pull. If the consumer stopped pulling due to reaching the + * maximum number of queued callbacks, it will be pull again only once at least + * {@code nextPullThreshold} callbacks have completed their execution. + */ + Builder nextPullThreshold(Integer nextPullThreshold) { + this.nextPullThreshold = nextPullThreshold; + return this; + } + + /** + * Creates a {@code MessageConsumerImpl} object. + */ + MessageConsumerImpl build() { + return new MessageConsumerImpl(this); + } + } + + /** + * Returns a builder for {@code MessageConsumerImpl} objects given the service options, the + * subscription from which messages must be pulled, the acknowledge deadline renewer and a message + * processor used to process messages. + */ + static Builder builder(PubSubOptions pubsubOptions, String subscription, + AckDeadlineRenewer deadlineRenewer, MessageProcessor messageProcessor) { + return new Builder(pubsubOptions, subscription, deadlineRenewer, messageProcessor); + } +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index f86817c26345..042f64c317bf 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -17,12 +17,14 @@ package com.google.cloud.pubsub; import com.google.cloud.AsyncPage; +import com.google.cloud.GrpcServiceOptions.ExecutorFactory; import com.google.cloud.Page; import com.google.cloud.Service; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -84,7 +86,8 @@ final class PullOption extends Option { private static final long serialVersionUID = 4792164134340316582L; enum OptionType implements Option.OptionType { - MAX_CONCURRENT_CALLBACKS; + EXECUTOR_FACTORY, + MAX_QUEUED_CALLBACKS; @SuppressWarnings("unchecked") T get(Map options) { @@ -94,6 +97,10 @@ T get(Map options) { Integer getInteger(Map options) { return get(options); } + + ExecutorFactory getExecutorFactory(Map options) { + return get(options); + } } private PullOption(Option.OptionType option, Object value) { @@ -101,11 +108,33 @@ private PullOption(Option.OptionType option, Object value) { } /** - * Returns an option to specify the maximum number of messages that can be processed - * concurrently at any time. + * Returns an option to specify the maximum number of messages that can be queued in the message + * consumer at any time. Queued messages are already pulled messages that are either waiting to + * be processed or being processed. Queued messages will have their acknowledge deadline renewed + * until they are acknowledged or "nacked". If not provided, at most 100 messages can be in the + * queue. + */ + public static PullOption maxQueuedCallbacks(int maxQueuedCallbacks) { + return new PullOption(OptionType.MAX_QUEUED_CALLBACKS, maxQueuedCallbacks); + } + + /** + * Returns an option to specify the executor used to execute message processor callbacks. The + * executor determines the number of messages that can be processed at the same time. If not + * provided, a single-threaded executor is used to execute message processor callbacks. + * + *

The {@link ExecutorFactory} object can be used to handle creation and release of the + * executor, possibly reusing existing executors. {@link ExecutorFactory#get()} is called when + * the message consumer is created. {@link ExecutorFactory#release(ExecutorService)} is called + * when the message consumer is closed. + * + *

For the created option to be serializable, the provided executor factory should implement + * {@link java.io.Serializable}. + * + * @param executorFactory the executor factory. */ - public static PullOption maxConcurrentCallbacks(int maxConcurrency) { - return new PullOption(OptionType.MAX_CONCURRENT_CALLBACKS, maxConcurrency); + public static PullOption executorFactory(ExecutorFactory executorFactory) { + return new PullOption(OptionType.EXECUTOR_FACTORY, executorFactory); } } @@ -433,6 +462,25 @@ interface MessageConsumer extends AutoCloseable { */ Future> pullAsync(String subscription, int maxMessages); + /** + * Creates a message consumer that pulls messages for the provided subscription. You can stop + * pulling messages by calling {@link MessageConsumer#close()}. The returned message consumer + * executes {@link MessageProcessor#process(Message)} on each pulled message. If + * {@link MessageProcessor#process(Message)} executes correctly, the message is acknowledged. If + * {@link MessageProcessor#process(Message)} throws an exception, the message is "nacked". For + * all pulled messages, the ack deadline is automatically renewed until the message is either + * acknowledged or "nacked". + * + *

The {@link PullOption#maxQueuedCallbacks(int)} option can be used to control the maximum + * number of queued messages (messages either being processed or waiting to be processed). The + * {@link PullOption#executorFactory(ExecutorFactory)} can be used to provide an executor to run + * message processor callbacks. + * + * @param subscription the subscription from which to pull messages + * @param callback the callback to be executed on each message + * @param options pulling options + * @return a message consumer for the provided subscription and options + */ MessageConsumer pullAsync(String subscription, MessageProcessor callback, PullOption... options); /** diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java index 166aad008c99..355d7c36aed1 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java @@ -18,6 +18,8 @@ import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_SIZE; import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_TOKEN; +import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.EXECUTOR_FACTORY; +import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.MAX_QUEUED_CALLBACKS; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.util.concurrent.Futures.lazyTransform; @@ -510,8 +512,11 @@ public ReceivedMessage apply(com.google.pubsub.v1.ReceivedMessage receivedMessag @Override public MessageConsumer pullAsync(String subscription, MessageProcessor callback, PullOption... options) { - // this method should use the VTKIT thread-pool (maybe getting it should be part of the spi) - return null; + Map optionMap = optionMap(options); + return MessageConsumerImpl.builder(options(), subscription, ackDeadlineRenewer, callback) + .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS.getInteger(optionMap)) + .executorFactory(EXECUTOR_FACTORY.getExecutorFactory(optionMap)) + .build(); } @Override diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java index 87a25bf0dc36..07691e09e1a4 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; public class PubSubOptions extends GrpcServiceOptions { @@ -86,7 +87,7 @@ protected PubSubOptions(Builder builder) { } @Override - protected ExecutorFactory executorFactory() { + protected ExecutorFactory executorFactory() { return super.executorFactory(); } diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java index b0f9b76e3920..d174e5172a23 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java @@ -73,7 +73,7 @@ public class DefaultPubSubRpc implements PubSubRpc { private final PublisherApi publisherApi; private final SubscriberApi subscriberApi; private final ScheduledExecutorService executor; - private final ExecutorFactory executorFactory; + private final ExecutorFactory executorFactory; private boolean closed; @@ -86,7 +86,7 @@ private InternalPubSubOptions(PubSubOptions options) { } @Override - protected ExecutorFactory executorFactory() { + protected ExecutorFactory executorFactory() { return super.executorFactory(); } } diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java index 2f5a71195df0..8b277b5aa900 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java @@ -24,6 +24,8 @@ import com.google.cloud.AsyncPage; import com.google.cloud.Page; +import com.google.cloud.pubsub.PubSub.MessageConsumer; +import com.google.cloud.pubsub.PubSub.MessageProcessor; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -33,9 +35,12 @@ import org.junit.Test; import org.junit.rules.ExpectedException; +import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -531,6 +536,150 @@ public void testPullAsyncNonExistingSubscription() pubsub().pullAsync(formatForTest("non-existing-subscription"), 2).get(); } + @Test + public void testMessageConsumer() throws Exception { + String topic = formatForTest("test-message-consumer-topic"); + pubsub().create(TopicInfo.of(topic)); + String subscription = formatForTest("test-message-consumer-subscription"); + pubsub().create(SubscriptionInfo.of(topic, subscription)); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + Set payloads = Sets.newHashSet("payload1", "payload2"); + List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2)); + assertEquals(2, messageIds.size()); + final List receivedMessages = Collections.synchronizedList(new ArrayList()); + final CountDownLatch countDownLatch = new CountDownLatch(2); + MessageProcessor processor = new MessageProcessor() { + @Override + public void process(Message message) throws Exception { + receivedMessages.add(message); + countDownLatch.countDown(); + } + }; + try(MessageConsumer consumer = pubsub().pullAsync(subscription, processor)) { + countDownLatch.await(); + } + for (Message message : receivedMessages) { + payloads.contains(message.payloadAsString()); + } + // Messages have all been acked, they should not be pulled again + Iterator messages = pubsub().pull(subscription, 2); + assertFalse(messages.hasNext()); + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testMessageConsumerNack() throws Exception { + String topic = formatForTest("test-message-consumer-nack-topic"); + pubsub().create(TopicInfo.of(topic)); + String subscription = formatForTest("test-message-consumer-nack-subscription"); + pubsub().create(SubscriptionInfo.of(topic, subscription)); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + Set payloads = Sets.newHashSet("payload1", "payload2"); + List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2)); + assertEquals(2, messageIds.size()); + final List receivedMessages = Collections.synchronizedList(new ArrayList()); + final CountDownLatch countDownLatch = new CountDownLatch(2); + MessageProcessor processor = new MessageProcessor() { + @Override + public void process(Message message) throws Exception { + receivedMessages.add(message); + countDownLatch.countDown(); + throw new RuntimeException("Force nack"); + } + }; + try (MessageConsumer consumer = pubsub().pullAsync(subscription, processor)) { + countDownLatch.await(); + } + for (Message message : receivedMessages) { + payloads.contains(message.payloadAsString()); + } + // Messages have all been nacked, we should be able to pull them again + Thread.sleep(5000); + Iterator messages = pubsub().pull(subscription, 2); + while (messages.hasNext()) { + payloads.contains(messages.next().payloadAsString()); + } + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testMessageConsumerWithMoreMessages() throws Exception { + String topic = formatForTest("test-message-consumer-more-messages-topic"); + pubsub().create(TopicInfo.of(topic)); + String subscription = formatForTest("test-message-consumer-more-messages-subscriptions"); + pubsub().create(SubscriptionInfo.of(topic, subscription)); + int totalMessages = 200; + Set payloads = Sets.newHashSetWithExpectedSize(totalMessages); + List messagesToSend = Lists.newArrayListWithCapacity(totalMessages); + for (int i = 0; i < totalMessages; i++) { + String payload = "payload" + i; + messagesToSend.add(Message.of(payload)); + payloads.add(payload); + + } + List messageIds = pubsub().publish(topic, messagesToSend); + assertEquals(totalMessages, messageIds.size()); + final List receivedMessages = Collections.synchronizedList(new ArrayList()); + final CountDownLatch countDownLatch = new CountDownLatch(totalMessages); + MessageProcessor processor = new MessageProcessor() { + @Override + public void process(Message message) throws Exception { + receivedMessages.add(message); + countDownLatch.countDown(); + } + }; + try(MessageConsumer consumer = pubsub().pullAsync(subscription, processor)) { + countDownLatch.await(); + } + // Messages have all been acked, they should not be pulled again + Iterator messages = pubsub().pull(subscription, totalMessages); + assertFalse(messages.hasNext()); + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testMessageConsumerAndAutoRenewDeadline() throws Exception { + String topic = formatForTest("test-message-consumer-and-renew-deadline-topic"); + pubsub().create(TopicInfo.of(topic)); + final String subscription = + formatForTest("test-message-consumer-and-renew-deadline-subscription"); + pubsub().create(SubscriptionInfo.builder(topic, subscription).ackDeadLineSeconds(10).build()); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + Set payloads = Sets.newHashSet("payload1", "payload2"); + List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2)); + assertEquals(2, messageIds.size()); + final List receivedMessages = Collections.synchronizedList(new ArrayList()); + final CountDownLatch countDownLatch = new CountDownLatch(2); + MessageProcessor processor = new MessageProcessor() { + @Override + public void process(Message message) throws Exception { + receivedMessages.add(message); + Thread.sleep(15000); + // message deadline is being renewed, it should not be pulled again + Iterator messages = pubsub().pull(subscription, 2); + assertFalse(messages.hasNext()); + countDownLatch.countDown(); + } + }; + try(MessageConsumer consumer = pubsub().pullAsync(subscription, processor)) { + countDownLatch.await(); + } + for (Message message : receivedMessages) { + payloads.contains(message.payloadAsString()); + } + // Messages have all been acked, they should not be pulled again + Iterator messages = pubsub().pull(subscription, 2); + assertFalse(messages.hasNext()); + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); + } + @Test public void testAckAndNackOneMessage() { String topic = formatForTest("test-ack-one-message-topic"); diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/MessageConsumerImplTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/MessageConsumerImplTest.java new file mode 100644 index 000000000000..1a0d70b8150d --- /dev/null +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/MessageConsumerImplTest.java @@ -0,0 +1,453 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import com.google.cloud.GrpcServiceOptions.ExecutorFactory; +import com.google.cloud.pubsub.PubSub.MessageConsumer; +import com.google.cloud.pubsub.PubSub.MessageProcessor; +import com.google.cloud.pubsub.spi.PubSubRpc; +import com.google.cloud.pubsub.spi.PubSubRpc.PullCallback; +import com.google.cloud.pubsub.spi.PubSubRpc.PullFuture; +import com.google.common.util.concurrent.ForwardingListenableFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; + +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +public class MessageConsumerImplTest { + + private static final String PROJECT = "project"; + private static final String SUBSCRIPTION = "subscription"; + private static final String SUBSCRIPTION_PB = "projects/project/subscriptions/subscription"; + private static final int MAX_QUEUED_CALLBACKS = 42; + private static final Message MESSAGE1 = Message.of("payload1"); + private static final Message MESSAGE2 = Message.of("payload2"); + private static final String ACK_ID1 = "ack-id1"; + private static final String ACK_ID2 = "ack-id2"; + private static final com.google.pubsub.v1.ReceivedMessage MESSAGE1_PB = + com.google.pubsub.v1.ReceivedMessage.newBuilder() + .setAckId(ACK_ID1) + .setMessage(MESSAGE1.toPb()) + .build(); + private static final com.google.pubsub.v1.ReceivedMessage MESSAGE2_PB = + com.google.pubsub.v1.ReceivedMessage.newBuilder() + .setAckId(ACK_ID2) + .setMessage(MESSAGE2.toPb()) + .build(); + private static final PullResponse PULL_RESPONSE = PullResponse.newBuilder() + .addReceivedMessages(MESSAGE1_PB) + .addReceivedMessages(MESSAGE2_PB) + .build(); + private static final MessageProcessor DO_NOTHING_PROCESSOR = new MessageProcessor() { + @Override + public void process(Message message) throws Exception { + // do nothing + } + }; + private static final MessageProcessor THROW_PROCESSOR = new MessageProcessor() { + @Override + public void process(Message message) throws Exception { + throw new RuntimeException(); + } + }; + private static final PullResponse EMPTY_RESPONSE = PullResponse.getDefaultInstance(); + + private PubSubRpc pubsubRpc; + private PubSub pubsub; + private PubSubOptions options; + private AckDeadlineRenewer renewer; + + @Rule + public Timeout globalTimeout = Timeout.seconds(60); + + static final class TestPullFuture + extends ForwardingListenableFuture.SimpleForwardingListenableFuture + implements PullFuture { + + TestPullFuture(PullResponse response) { + super(Futures.immediateFuture(response)); + } + + @Override + public void addCallback(final PullCallback callback) { + Futures.addCallback(delegate(), new FutureCallback() { + @Override + public void onSuccess(PullResponse result) { + callback.success(result); + } + + @Override + public void onFailure(Throwable error) { + callback.failure(error); + } + }); + } + } + + @Before + public void setUp() { + pubsubRpc = EasyMock.createStrictMock(PubSubRpc.class); + pubsub = EasyMock.createMock(PubSub.class); + options = EasyMock.createStrictMock(PubSubOptions.class); + renewer = EasyMock.createMock(AckDeadlineRenewer.class); + } + + @After + public void tearDown() { + EasyMock.verify(pubsubRpc); + EasyMock.verify(pubsub); + EasyMock.verify(options); + EasyMock.verify(renewer); + + } + + private static PullRequest pullRequest(int maxQueuedCallbacks) { + return PullRequest.newBuilder() + .setMaxMessages(maxQueuedCallbacks) + .setSubscription(SUBSCRIPTION_PB) + .setReturnImmediately(false) + .build(); + } + + private static IAnswer createAnswer(final CountDownLatch latch) { + return new IAnswer() { + @Override + public Void answer() throws Throwable { + latch.countDown(); + return null; + } + }; + } + + @Test + public void testMessageConsumerAck() throws Exception { + PullRequest request = pullRequest(MAX_QUEUED_CALLBACKS); + EasyMock.expect(options.rpc()).andReturn(pubsubRpc); + EasyMock.expect(options.service()).andReturn(pubsub); + EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes(); + EasyMock.expect(pubsub.options()).andReturn(options).times(2); + final CountDownLatch latch = new CountDownLatch(2); + EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); + EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null); + EasyMock.replay(pubsub); + EasyMock.expect(pubsubRpc.pull(request)).andReturn(new TestPullFuture(PULL_RESPONSE)); + EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject())) + .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes(); + renewer.add(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall(); + renewer.add(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + renewer.remove(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + EasyMock.replay(pubsubRpc, options, renewer); + try (MessageConsumer consumer = + MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, DO_NOTHING_PROCESSOR) + .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS) + .build()) { + latch.await(); + } + } + + @Test + public void testMessageConsumerNack() throws Exception { + PullRequest request = pullRequest(MAX_QUEUED_CALLBACKS); + EasyMock.expect(options.rpc()).andReturn(pubsubRpc); + EasyMock.expect(options.service()).andReturn(pubsub); + EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes(); + EasyMock.expect(pubsub.options()).andReturn(options).times(2); + final CountDownLatch latch = new CountDownLatch(2); + EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); + EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null); + EasyMock.replay(pubsub); + EasyMock.expect(pubsubRpc.pull(request)).andReturn(new TestPullFuture(PULL_RESPONSE)); + EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject())) + .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes(); + renewer.add(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall(); + renewer.add(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + renewer.remove(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + EasyMock.replay(pubsubRpc, options, renewer); + try (MessageConsumer consumer = + MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, THROW_PROCESSOR) + .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS) + .build()) { + latch.await(); + } + } + + @Test + public void testMessageConsumerMultipleCallsAck() throws Exception { + PullRequest request1 = pullRequest(MAX_QUEUED_CALLBACKS); + PullRequest request2 = pullRequest(MAX_QUEUED_CALLBACKS - 1); + PullResponse response1 = PullResponse.newBuilder() + .addReceivedMessages(MESSAGE1_PB) + .build(); + final PullResponse response2 = PullResponse.newBuilder() + .addReceivedMessages(MESSAGE2_PB) + .build(); + EasyMock.expect(options.rpc()).andReturn(pubsubRpc); + EasyMock.expect(options.service()).andReturn(pubsub); + EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes(); + final CountDownLatch nextPullLatch = new CountDownLatch(1); + final CountDownLatch latch = new CountDownLatch(2); + EasyMock.expect(pubsub.options()).andReturn(options); + EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID1)).andAnswer(new IAnswer>() { + @Override + public Future answer() throws Throwable { + nextPullLatch.await(); + return null; + } + }); + EasyMock.expect(pubsub.options()).andReturn(options); + EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null); + EasyMock.replay(pubsub); + EasyMock.expect(pubsubRpc.pull(request1)).andReturn(new TestPullFuture(response1)); + EasyMock.expect(pubsubRpc.pull(request2)).andAnswer(new IAnswer() { + @Override + public PullFuture answer() throws Throwable { + nextPullLatch.countDown(); + return new TestPullFuture(response2); + } + }); + EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject())) + .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes(); + renewer.add(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + renewer.add(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + EasyMock.replay(pubsubRpc, options, renewer); + try (MessageConsumer consumer = + MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, DO_NOTHING_PROCESSOR) + .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS) + .build()) { + latch.await(); + } + } + + @Test + public void testMessageConsumerMultipleCallsNack() throws Exception { + PullRequest request1 = pullRequest(MAX_QUEUED_CALLBACKS); + PullRequest request2 = pullRequest(MAX_QUEUED_CALLBACKS - 1); + PullResponse response1 = PullResponse.newBuilder() + .addReceivedMessages(MESSAGE1_PB) + .build(); + final PullResponse response2 = PullResponse.newBuilder() + .addReceivedMessages(MESSAGE2_PB) + .build(); + EasyMock.expect(options.rpc()).andReturn(pubsubRpc); + EasyMock.expect(options.service()).andReturn(pubsub); + EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes(); + final CountDownLatch nextPullLatch = new CountDownLatch(1); + final CountDownLatch latch = new CountDownLatch(2); + EasyMock.expect(pubsub.options()).andReturn(options); + EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID1)).andAnswer(new IAnswer>() { + @Override + public Future answer() throws Throwable { + nextPullLatch.await(); + return null; + } + }); + EasyMock.expect(pubsub.options()).andReturn(options); + EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null); + EasyMock.replay(pubsub); + EasyMock.expect(pubsubRpc.pull(request1)).andReturn(new TestPullFuture(response1)); + EasyMock.expect(pubsubRpc.pull(request2)).andAnswer(new IAnswer() { + @Override + public PullFuture answer() throws Throwable { + nextPullLatch.countDown(); + return new TestPullFuture(response2); + } + }); + EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject())) + .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes(); + renewer.add(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + renewer.add(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + EasyMock.replay(pubsubRpc, options, renewer); + try (MessageConsumer consumer = + MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, THROW_PROCESSOR) + .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS) + .build()) { + latch.await(); + } + } + + @Test + public void testMessageConsumerMaxCallbacksAck() throws Exception { + PullRequest request1 = pullRequest(2); + PullRequest request2 = pullRequest(1); + final PullResponse otherPullResponse = PullResponse.newBuilder() + .addReceivedMessages(MESSAGE1_PB) + .build(); + EasyMock.expect(options.rpc()).andReturn(pubsubRpc); + EasyMock.expect(options.service()).andReturn(pubsub); + EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes(); + EasyMock.expect(pubsub.options()).andReturn(options).times(2); + final CountDownLatch nextPullLatch = new CountDownLatch(1); + final CountDownLatch latch = new CountDownLatch(3); + EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); + EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID2)).andAnswer(new IAnswer>() { + @Override + public Future answer() throws Throwable { + nextPullLatch.await(); + return null; + } + }); + EasyMock.expect(pubsub.options()).andReturn(options); + EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); + EasyMock.replay(pubsub); + EasyMock.expect(pubsubRpc.pull(request1)).andReturn(new TestPullFuture(PULL_RESPONSE)); + EasyMock.expect(pubsubRpc.pull(request2)).andAnswer(new IAnswer() { + @Override + public PullFuture answer() throws Throwable { + nextPullLatch.countDown(); + return new TestPullFuture(otherPullResponse); + } + }); + EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject())) + .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes(); + renewer.add(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall(); + renewer.add(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + renewer.remove(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + renewer.add(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + EasyMock.replay(pubsubRpc, options, renewer); + try (MessageConsumer consumer = + MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, DO_NOTHING_PROCESSOR) + .maxQueuedCallbacks(2) + .build()) { + latch.await(); + } + } + + @Test + public void testMessageConsumerMaxCallbacksNack() throws Exception { + PullRequest request1 = pullRequest(2); + PullRequest request2 = pullRequest(1); + final PullResponse otherPullResponse = PullResponse.newBuilder() + .addReceivedMessages(MESSAGE1_PB) + .build(); + EasyMock.expect(options.rpc()).andReturn(pubsubRpc); + EasyMock.expect(options.service()).andReturn(pubsub); + EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes(); + EasyMock.expect(pubsub.options()).andReturn(options).times(2); + final CountDownLatch nextPullLatch = new CountDownLatch(1); + final CountDownLatch latch = new CountDownLatch(3); + EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); + EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID2)).andAnswer(new IAnswer>() { + @Override + public Future answer() throws Throwable { + nextPullLatch.await(); + return null; + } + }); + EasyMock.expect(pubsub.options()).andReturn(options); + EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); + EasyMock.replay(pubsub); + EasyMock.expect(pubsubRpc.pull(request1)).andReturn(new TestPullFuture(PULL_RESPONSE)); + EasyMock.expect(pubsubRpc.pull(request2)).andAnswer(new IAnswer() { + @Override + public PullFuture answer() throws Throwable { + nextPullLatch.countDown(); + return new TestPullFuture(otherPullResponse); + } + }); + EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject())) + .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes(); + renewer.add(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall(); + renewer.add(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + renewer.remove(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + renewer.add(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + EasyMock.replay(pubsubRpc, options, renewer); + try (MessageConsumer consumer = + MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, THROW_PROCESSOR) + .maxQueuedCallbacks(2) + .build()) { + latch.await(); + } + } + + @Test + public void testClose() throws Exception { + EasyMock.expect(options.rpc()).andReturn(pubsubRpc); + EasyMock.expect(options.service()).andReturn(pubsub); + final ExecutorService executor = EasyMock.createStrictMock(ExecutorService.class); + executor.shutdown(); + EasyMock.expectLastCall(); + EasyMock.replay(pubsubRpc, pubsub, options, executor, renewer); + MessageConsumer consumer = + MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, DO_NOTHING_PROCESSOR) + .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS) + .executorFactory(new ExecutorFactory() { + @Override + public ExecutorService get() { + return executor; + } + + @Override + public void release(ExecutorService executor) { + executor.shutdown(); + } + }).build(); + consumer.close(); + // closing again should do nothing + consumer.close(); + EasyMock.verify(executor); + } +} diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java index b310c72fe850..7d6dc928d883 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java @@ -26,9 +26,14 @@ import static org.junit.Assert.fail; import com.google.cloud.AsyncPage; +import com.google.cloud.GrpcServiceOptions.ExecutorFactory; import com.google.cloud.Page; import com.google.cloud.RetryParams; +import com.google.cloud.pubsub.MessageConsumerImplTest.TestPullFuture; import com.google.cloud.pubsub.PubSub.ListOption; +import com.google.cloud.pubsub.PubSub.MessageConsumer; +import com.google.cloud.pubsub.PubSub.MessageProcessor; +import com.google.cloud.pubsub.PubSub.PullOption; import com.google.cloud.pubsub.spi.PubSubRpc; import com.google.cloud.pubsub.spi.PubSubRpc.PullCallback; import com.google.cloud.pubsub.spi.PubSubRpc.PullFuture; @@ -60,6 +65,7 @@ import org.easymock.Capture; import org.easymock.EasyMock; +import org.easymock.IAnswer; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -69,7 +75,9 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -128,6 +136,12 @@ public String apply(SubscriptionId subscriptionId) { return formatSubscriptionName(subscriptionId.project(), subscriptionId.subscription()); } }; + private static final MessageProcessor DO_NOTHING = new MessageProcessor() { + @Override + public void process(Message message) throws Exception { + // do nothing + } + }; private PubSubOptions options; private PubSubRpcFactory rpcFactoryMock; @@ -139,6 +153,7 @@ public String apply(SubscriptionId subscriptionId) { public ExpectedException thrown = ExpectedException.none(); @Before + @SuppressWarnings("unchecked") public void setUp() { rpcFactoryMock = EasyMock.createStrictMock(PubSubRpcFactory.class); pubsubRpcMock = EasyMock.createStrictMock(PubSubRpc.class); @@ -1355,6 +1370,68 @@ public void testPullMessagesAsyncError() throws ExecutionException, InterruptedE EasyMock.verify(futureMock); } + @Test + public void testMessageConsumer() throws Exception { + pubsub = new PubSubImpl(options, renewerMock); + EasyMock.reset(options); + EasyMock.expect(options.service()).andReturn(pubsub); + EasyMock.expect(options.rpc()).andReturn(pubsubRpcMock); + EasyMock.expect(options.projectId()).andReturn(PROJECT); + EasyMock.replay(options); + PullRequest request = PullRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .setMaxMessages(100) + .setReturnImmediately(false) + .build(); + final PullResponse response = PullResponse.getDefaultInstance(); + final CountDownLatch latch = new CountDownLatch(1); + EasyMock.expect(pubsubRpcMock.pull(request)).andAnswer(new IAnswer() { + @Override + public PullFuture answer() throws Throwable { + latch.countDown(); + return new TestPullFuture(response); + } + }); + EasyMock.replay(pubsubRpcMock, renewerMock); + try (MessageConsumer consumer = pubsub.pullAsync(SUBSCRIPTION, DO_NOTHING)) { + latch.await(); + } + } + + @Test + public void testMessageConsumerWithOptions() throws Exception { + pubsub = new PubSubImpl(options, renewerMock); + EasyMock.reset(options); + EasyMock.expect(options.service()).andReturn(pubsub); + EasyMock.expect(options.rpc()).andReturn(pubsubRpcMock); + EasyMock.expect(options.projectId()).andReturn(PROJECT); + EasyMock.replay(options); + ExecutorFactory executorFactoryMock = EasyMock.createStrictMock(ExecutorFactory.class); + ExecutorService executorServiceMock = EasyMock.createStrictMock(ExecutorService.class); + EasyMock.expect(executorFactoryMock.get()).andReturn(executorServiceMock); + executorFactoryMock.release(executorServiceMock); + PullRequest request = PullRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .setMaxMessages(42) + .setReturnImmediately(false) + .build(); + final PullResponse response = PullResponse.getDefaultInstance(); + final CountDownLatch latch = new CountDownLatch(1); + EasyMock.expect(pubsubRpcMock.pull(request)).andAnswer(new IAnswer() { + @Override + public PullFuture answer() throws Throwable { + latch.countDown(); + return new TestPullFuture(response); + } + }); + EasyMock.replay(pubsubRpcMock, renewerMock, executorFactoryMock, executorServiceMock); + PullOption[] options = + {PullOption.maxQueuedCallbacks(42), PullOption.executorFactory(executorFactoryMock)}; + try (MessageConsumer consumer = pubsub.pullAsync(SUBSCRIPTION, DO_NOTHING, options)) { + latch.await(); + } + } + @Test public void testAckOneMessage() { pubsub = new PubSubImpl(options, renewerMock); diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java index 620e737111f7..f99cfa2f728e 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java @@ -17,17 +17,20 @@ package com.google.cloud.pubsub; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import com.google.cloud.GrpcServiceOptions.ExecutorFactory; import com.google.cloud.pubsub.PubSub.ListOption; import com.google.cloud.pubsub.PubSub.PullOption; +import org.easymock.EasyMock; import org.junit.Test; public class PubSubTest { private static final int PAGE_SIZE = 42; private static final String PAGE_TOKEN = "page token"; - private static final int MAX_CONCURRENT_CALLBACKS = 42; + private static final int MAX_QUEUED_CALLBACKS = 42; @Test public void testListOption() { @@ -42,9 +45,15 @@ public void testListOption() { } @Test + @SuppressWarnings("unchecked") public void testPullOptions() { - PullOption pullOption = PullOption.maxConcurrentCallbacks(MAX_CONCURRENT_CALLBACKS); - assertEquals(MAX_CONCURRENT_CALLBACKS, pullOption.value()); - assertEquals(PullOption.OptionType.MAX_CONCURRENT_CALLBACKS, pullOption.optionType()); + // max queued callbacks + PullOption pullOption = PullOption.maxQueuedCallbacks(MAX_QUEUED_CALLBACKS); + assertEquals(MAX_QUEUED_CALLBACKS, pullOption.value()); + assertEquals(PullOption.OptionType.MAX_QUEUED_CALLBACKS, pullOption.optionType()); + ExecutorFactory executorFactory = EasyMock.createStrictMock(ExecutorFactory.class); + pullOption = PullOption.executorFactory(executorFactory); + assertSame(executorFactory, pullOption.value()); + assertEquals(PullOption.OptionType.EXECUTOR_FACTORY, pullOption.optionType()); } }