From 7693456c16400ab7a5142dad107a95213fa1858a Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Tue, 7 Jun 2016 17:42:02 +0200 Subject: [PATCH] Add MessageConsumerImpl class, implement pullAsync, add tests --- .../com/google/cloud/GrpcServiceOptions.java | 30 +- .../google/cloud/GrpcServiceOptionsTest.java | 2 +- .../cloud/pubsub/AckDeadlineRenewer.java | 2 +- .../cloud/pubsub/MessageConsumerImpl.java | 281 ++++++++++++ .../java/com/google/cloud/pubsub/PubSub.java | 70 ++- .../com/google/cloud/pubsub/PubSubImpl.java | 9 +- .../google/cloud/pubsub/PubSubOptions.java | 3 +- .../cloud/pubsub/spi/DefaultPubSubRpc.java | 4 +- .../google/cloud/pubsub/BaseSystemTest.java | 149 +++++++ .../cloud/pubsub/MessageConsumerImplTest.java | 400 ++++++++++++++++++ .../google/cloud/pubsub/PubSubImplTest.java | 74 ++++ .../com/google/cloud/pubsub/PubSubTest.java | 34 +- 12 files changed, 1029 insertions(+), 29 deletions(-) create mode 100644 gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java create mode 100644 gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/MessageConsumerImplTest.java diff --git a/gcloud-java-core/src/main/java/com/google/cloud/GrpcServiceOptions.java b/gcloud-java-core/src/main/java/com/google/cloud/GrpcServiceOptions.java index 2a1110c9362e..67361dba6e64 100644 --- a/gcloud-java-core/src/main/java/com/google/cloud/GrpcServiceOptions.java +++ b/gcloud-java-core/src/main/java/com/google/cloud/GrpcServiceOptions.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.util.Objects; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -50,7 +51,7 @@ public abstract class GrpcServiceOptions, Ser private final double timeoutMultiplier; private final int maxTimeout; - private transient ExecutorFactory executorFactory; + private transient ExecutorFactory executorFactory; /** * Shared thread pool executor. @@ -73,30 +74,32 @@ public void close(ScheduledExecutorService instance) { }; /** - * An interface for {@link ScheduledExecutorService} factories. Implementations of this interface - * can be used to provide an user-defined scheduled executor to execute requests. Any - * implementation of this interface must override the {@code get()} method to return the desired - * executor. The {@code release(executor)} method should be overriden to free resources used by - * the executor (if needed) according to application's logic. + * An interface for {@link ExecutorService} factories. Implementations of this interface can be + * used to provide an user-defined executor to execute requests. Any implementation of this + * interface must override the {@code get()} method to return the desired executor. The + * {@code release(executor)} method should be overriden to free resources used by the executor (if + * needed) according to application's logic. * *

Implementation must provide a public no-arg constructor. Loading of a factory implementation * is done via {@link java.util.ServiceLoader}. + * + * @param the {@link ExecutorService} subclass created by this factory */ - public interface ExecutorFactory { + public interface ExecutorFactory { /** - * Gets a scheduled executor service instance. + * Gets an executor service instance. */ - ScheduledExecutorService get(); + T get(); /** * Releases resources used by the executor and possibly shuts it down. */ - void release(ScheduledExecutorService executor); + void release(T executor); } @VisibleForTesting - static class DefaultExecutorFactory implements ExecutorFactory { + static class DefaultExecutorFactory implements ExecutorFactory { private static final DefaultExecutorFactory INSTANCE = new DefaultExecutorFactory(); @@ -148,7 +151,7 @@ protected Builder(GrpcServiceOptions options) { * * @return the builder */ - public B executorFactory(ExecutorFactory executorFactory) { + public B executorFactory(ExecutorFactory executorFactory) { this.executorFactory = executorFactory; return self(); } @@ -192,6 +195,7 @@ public B maxTimeout(int maxTimeout) { } } + @SuppressWarnings("unchecked") protected GrpcServiceOptions( Class> serviceFactoryClass, Class> rpcFactoryClass, Builder executorFactory() { return executorFactory; } diff --git a/gcloud-java-core/src/test/java/com/google/cloud/GrpcServiceOptionsTest.java b/gcloud-java-core/src/test/java/com/google/cloud/GrpcServiceOptionsTest.java index 0a3c34f87916..457f05cd0ba9 100644 --- a/gcloud-java-core/src/test/java/com/google/cloud/GrpcServiceOptionsTest.java +++ b/gcloud-java-core/src/test/java/com/google/cloud/GrpcServiceOptionsTest.java @@ -211,7 +211,7 @@ public void testBaseHashCode() { @Test public void testDefaultExecutorFactory() { - ExecutorFactory executorFactory = new DefaultExecutorFactory(); + ExecutorFactory executorFactory = new DefaultExecutorFactory(); ScheduledExecutorService executorService = executorFactory.get(); assertSame(executorService, executorFactory.get()); } diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java index b51fef7890e2..d72f788cff65 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java @@ -48,7 +48,7 @@ class AckDeadlineRenewer implements AutoCloseable { private final PubSub pubsub; private final ScheduledExecutorService executor; - private final ExecutorFactory executorFactory; + private final ExecutorFactory executorFactory; private final Clock clock; private final Queue messageQueue; private final Map messageDeadlines; diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java new file mode 100644 index 000000000000..ac120147d98e --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java @@ -0,0 +1,281 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import static com.google.cloud.pubsub.spi.v1.SubscriberApi.formatSubscriptionName; +import static com.google.common.base.MoreObjects.firstNonNull; + +import com.google.cloud.GrpcServiceOptions.ExecutorFactory; +import com.google.cloud.pubsub.PubSub.MessageConsumer; +import com.google.cloud.pubsub.PubSub.MessageProcessor; +import com.google.cloud.pubsub.spi.PubSubRpc; +import com.google.cloud.pubsub.spi.PubSubRpc.PullCallback; +import com.google.cloud.pubsub.spi.PubSubRpc.PullFuture; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; + +import io.grpc.internal.SharedResourceHolder; + +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Default implementation for a message consumer. + */ +final class MessageConsumerImpl implements MessageConsumer { + + private static final int MAX_QUEUED_CALLBACKS = 100; + // shared scheduled executor, used to schedule pulls + private static final SharedResourceHolder.Resource TIMER = + new SharedResourceHolder.Resource() { + @Override + public ScheduledExecutorService create() { + ScheduledThreadPoolExecutor timer = new ScheduledThreadPoolExecutor(1); + timer.setRemoveOnCancelPolicy(true); + return timer; + } + + @Override + public void close(ScheduledExecutorService instance) { + instance.shutdown(); + } + }; + + private final PubSubOptions pubsubOptions; + private final PubSubRpc pubsubRpc; + private final PubSub pubsub; + private final AckDeadlineRenewer deadlineRenewer; + private final String subscription; + private final MessageProcessor messageProcessor; + private final ScheduledExecutorService timer; + private final ExecutorFactory executorFactory; + private final ExecutorService executor; + private final AtomicInteger queuedCallbacks; + private final int maxQueuedCallbacks; + private final Object futureLock = new Object(); + private final Runnable scheduleRunnable; + private boolean closed; + private Future scheduledFuture; + private PullFuture pullerFuture; + private boolean stopped = true; + + /** + * Default executor factory for the message processor executor. By default a single-threaded + * executor is used. + */ + static class DefaultExecutorFactory implements ExecutorFactory { + + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + + @Override + public ExecutorService get() { + return executor; + } + + @Override + public void release(ExecutorService executor) { + executor.shutdownNow(); + } + } + + private MessageConsumerImpl(Builder builder) { + this.pubsubOptions = builder.pubsubOptions; + this.subscription = builder.subscription; + this.messageProcessor = builder.messageProcessor; + this.pubsubRpc = pubsubOptions.rpc(); + this.pubsub = pubsubOptions.service(); + this.deadlineRenewer = builder.deadlineRenewer; + this.queuedCallbacks = new AtomicInteger(); + this.timer = SharedResourceHolder.get(TIMER); + this.executorFactory = firstNonNull(builder.executorFactory, new DefaultExecutorFactory()); + this.executor = executorFactory.get(); + this.maxQueuedCallbacks = firstNonNull(builder.maxQueuedCallbacks, MAX_QUEUED_CALLBACKS); + this.scheduleRunnable = new Runnable() { + @Override + public void run() { + synchronized (futureLock) { + if (closed) { + return; + } + pull(); + } + } + }; + nextPull(); + } + + private Runnable ackingRunnable(final ReceivedMessage receivedMessage) { + return new Runnable() { + @Override + public void run() { + try { + messageProcessor.process(receivedMessage); + pubsub.ackAsync(receivedMessage.subscription(), receivedMessage.ackId()); + } catch (Exception ex) { + pubsub.nackAsync(receivedMessage.subscription(), receivedMessage.ackId()); + } finally { + deadlineRenewer.remove(receivedMessage.subscription(), receivedMessage.ackId()); + queuedCallbacks.decrementAndGet(); + // We can now pull more messages. We do not pull immediately to possibly wait for other + // callbacks to end + scheduleNextPull(500, TimeUnit.MILLISECONDS); + } + } + }; + } + + private PullRequest createPullRequest() { + return PullRequest.newBuilder() + .setSubscription(formatSubscriptionName(pubsubOptions.projectId(), subscription)) + .setMaxMessages(maxQueuedCallbacks - queuedCallbacks.get()) + .setReturnImmediately(false) + .build(); + } + + private void scheduleNextPull(long delay, TimeUnit timeUnit) { + synchronized (futureLock) { + if (!closed && stopped) { + scheduledFuture = timer.schedule(scheduleRunnable, delay, timeUnit); + } + } + } + + private void nextPull() { + synchronized (futureLock) { + if (closed) { + return; + } + if (queuedCallbacks.get() == maxQueuedCallbacks) { + stopped = true; + } else { + stopped = false; + scheduledFuture = timer.submit(scheduleRunnable); + } + } + } + + private void pull() { + pullerFuture = pubsubRpc.pull(createPullRequest()); + pullerFuture.addCallback(new PullCallback() { + @Override + public void success(PullResponse response) { + List messages = response.getReceivedMessagesList(); + queuedCallbacks.addAndGet(messages.size()); + for (com.google.pubsub.v1.ReceivedMessage message : messages) { + deadlineRenewer.add(subscription, message.getAckId()); + final ReceivedMessage receivedMessage = + ReceivedMessage.fromPb(pubsub, subscription, message); + executor.execute(ackingRunnable(receivedMessage)); + } + nextPull(); + } + + @Override + public void failure(Throwable error) { + if (!(error instanceof CancellationException)) { + nextPull(); + } + } + }); + } + + @Override + public void close() { + synchronized (futureLock) { + if (closed) { + return; + } + closed = true; + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + } + if (pullerFuture != null) { + pullerFuture.cancel(true); + } + } + SharedResourceHolder.release(TIMER, timer); + executorFactory.release(executor); + } + + static final class Builder { + private final PubSubOptions pubsubOptions; + private final String subscription; + private final AckDeadlineRenewer deadlineRenewer; + private final MessageProcessor messageProcessor; + private Integer maxQueuedCallbacks; + private ExecutorFactory executorFactory; + + Builder(PubSubOptions pubsubOptions, String subscription, AckDeadlineRenewer deadlineRenewer, + MessageProcessor messageProcessor) { + this.pubsubOptions = pubsubOptions; + this.subscription = subscription; + this.deadlineRenewer = deadlineRenewer; + this.messageProcessor = messageProcessor; + } + + /** + * Sets the maximum number of callbacks either being executed or waiting for execution. + */ + Builder maxQueuedCallbacks(Integer maxQueuedCallbacks) { + this.maxQueuedCallbacks = maxQueuedCallbacks; + return this; + } + + /** + * Sets the executor factory, used to manage the executor that will run message processor + * callbacks message consumer. + */ + Builder executorFactory(ExecutorFactory executorFactory) { + this.executorFactory = executorFactory; + return this; + } + + /** + * Creates a {@code MessageConsumerImpl} object. + */ + MessageConsumerImpl build() { + return new MessageConsumerImpl(this); + } + } + + /** + * Returns a builder for {@code MessageConsumerImpl} objects given the service options, the + * subscription from which messages must be pulled, the acknowledge deadline renewer and a message + * processor used to process messages. + */ + static Builder builder(PubSubOptions pubsubOptions, String subscription, + AckDeadlineRenewer deadlineRenewer, MessageProcessor messageProcessor) { + return new Builder(pubsubOptions, subscription, deadlineRenewer, messageProcessor); + } + + /** + * Returns a {@code MessageConsumerImpl} objects given the service options, the subscription from + * which messages must be pulled, the acknowledge deadline renewer and a message processor used to + * process messages. + */ + static Builder of(PubSubOptions pubsubOptions, String subscription, + AckDeadlineRenewer deadlineRenewer, MessageProcessor messageProcessor) { + return new Builder(pubsubOptions, subscription, deadlineRenewer, messageProcessor); + } +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index f86817c26345..6a6df7ac6231 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -16,13 +16,17 @@ package com.google.cloud.pubsub; +import static com.google.common.base.Preconditions.checkArgument; + import com.google.cloud.AsyncPage; +import com.google.cloud.GrpcServiceOptions.ExecutorFactory; import com.google.cloud.Page; import com.google.cloud.Service; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -84,7 +88,8 @@ final class PullOption extends Option { private static final long serialVersionUID = 4792164134340316582L; enum OptionType implements Option.OptionType { - MAX_CONCURRENT_CALLBACKS; + EXECUTOR, + MAX_QUEUED_CALLBACKS; @SuppressWarnings("unchecked") T get(Map options) { @@ -94,6 +99,10 @@ T get(Map options) { Integer getInteger(Map options) { return get(options); } + + ExecutorFactory getExecutorFactory(Map options) { + return get(options); + } } private PullOption(Option.OptionType option, Object value) { @@ -101,11 +110,43 @@ private PullOption(Option.OptionType option, Object value) { } /** - * Returns an option to specify the maximum number of messages that can be processed - * concurrently at any time. + * Returns an option to specify the maximum number of messages that can be queued in the message + * consumer at any time. Queued messages are already pulled messages that are either waiting to + * be processed or being processed. Queued messages will have their acknowledge deadline renewed + * until they are acknowledged or "nacked". If not provided, at most 100 messages can be in the + * queue. */ - public static PullOption maxConcurrentCallbacks(int maxConcurrency) { - return new PullOption(OptionType.MAX_CONCURRENT_CALLBACKS, maxConcurrency); + public static PullOption maxQueuedCallbacks(int maxQueuedCallbacks) { + return new PullOption(OptionType.MAX_QUEUED_CALLBACKS, maxQueuedCallbacks); + } + + /** + * Returns an option to specify the executor used to execute message processor callbacks. The + * executor determines the number of messages that can be processed at the same time. The + * {@code shouldAutoClose} parameter sets whether the executor should be shutdown when the + * message consumer is closed. If not provided, a single-threaded executor is used. + * + * @param executor the executor used to run message processor callbacks + * @param shouldAutoClose if {@code true}, the executor is shutdown when the message consumer is + * closed. If {@code false}, the user must take care of shutting the executor down. + */ + public static PullOption executor(final ExecutorService executor, + final boolean shouldAutoClose) { + return new PullOption(OptionType.EXECUTOR, new ExecutorFactory() { + + @Override + public ExecutorService get() { + return executor; + } + + @Override + public void release(ExecutorService toRelease) { + checkArgument(executor == toRelease, "Releasing the wrong executor"); + if (shouldAutoClose) { + executor.shutdown(); + } + } + }); } } @@ -433,6 +474,25 @@ interface MessageConsumer extends AutoCloseable { */ Future> pullAsync(String subscription, int maxMessages); + /** + * Creates a message consumer that pulls messages for the provided subscription. You can stop + * pulling messages by calling {@link MessageConsumer#close()}. The returned message consumer + * executes {@link MessageProcessor#process(Message)} on each pulled message. If + * {@link MessageProcessor#process(Message)} executes correctly, the message is acknowledged. If + * {@link MessageProcessor#process(Message)} throws an exception, the message is "nacked". For + * all pulled messages, the ack deadline is automatically renewed until the message is either + * acknowledged or "nacked". + * + *

The {@link PullOption#maxQueuedCallbacks(int)} option can be used to control the maximum + * number of queued messages (messages either being processed or waiting to be processed). The + * {@link PullOption#executor(ExecutorService, boolean)} can be used to provide an executor to run + * message processor callbacks. + * + * @param subscription the subscription from which to pull messages + * @param callback the callback to be executed on each message + * @param options pulling options + * @return a message consumer for the provided subscription and options + */ MessageConsumer pullAsync(String subscription, MessageProcessor callback, PullOption... options); /** diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java index 166aad008c99..f16e46f729e1 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java @@ -18,6 +18,8 @@ import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_SIZE; import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_TOKEN; +import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.EXECUTOR; +import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.MAX_QUEUED_CALLBACKS; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.util.concurrent.Futures.lazyTransform; @@ -510,8 +512,11 @@ public ReceivedMessage apply(com.google.pubsub.v1.ReceivedMessage receivedMessag @Override public MessageConsumer pullAsync(String subscription, MessageProcessor callback, PullOption... options) { - // this method should use the VTKIT thread-pool (maybe getting it should be part of the spi) - return null; + Map optionMap = optionMap(options); + return MessageConsumerImpl.builder(options(), subscription, ackDeadlineRenewer, callback) + .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS.getInteger(optionMap)) + .executorFactory(EXECUTOR.getExecutorFactory(optionMap)) + .build(); } @Override diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java index 87a25bf0dc36..07691e09e1a4 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; public class PubSubOptions extends GrpcServiceOptions { @@ -86,7 +87,7 @@ protected PubSubOptions(Builder builder) { } @Override - protected ExecutorFactory executorFactory() { + protected ExecutorFactory executorFactory() { return super.executorFactory(); } diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java index b0f9b76e3920..d174e5172a23 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java @@ -73,7 +73,7 @@ public class DefaultPubSubRpc implements PubSubRpc { private final PublisherApi publisherApi; private final SubscriberApi subscriberApi; private final ScheduledExecutorService executor; - private final ExecutorFactory executorFactory; + private final ExecutorFactory executorFactory; private boolean closed; @@ -86,7 +86,7 @@ private InternalPubSubOptions(PubSubOptions options) { } @Override - protected ExecutorFactory executorFactory() { + protected ExecutorFactory executorFactory() { return super.executorFactory(); } } diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java index 2f5a71195df0..8b277b5aa900 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java @@ -24,6 +24,8 @@ import com.google.cloud.AsyncPage; import com.google.cloud.Page; +import com.google.cloud.pubsub.PubSub.MessageConsumer; +import com.google.cloud.pubsub.PubSub.MessageProcessor; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -33,9 +35,12 @@ import org.junit.Test; import org.junit.rules.ExpectedException; +import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -531,6 +536,150 @@ public void testPullAsyncNonExistingSubscription() pubsub().pullAsync(formatForTest("non-existing-subscription"), 2).get(); } + @Test + public void testMessageConsumer() throws Exception { + String topic = formatForTest("test-message-consumer-topic"); + pubsub().create(TopicInfo.of(topic)); + String subscription = formatForTest("test-message-consumer-subscription"); + pubsub().create(SubscriptionInfo.of(topic, subscription)); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + Set payloads = Sets.newHashSet("payload1", "payload2"); + List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2)); + assertEquals(2, messageIds.size()); + final List receivedMessages = Collections.synchronizedList(new ArrayList()); + final CountDownLatch countDownLatch = new CountDownLatch(2); + MessageProcessor processor = new MessageProcessor() { + @Override + public void process(Message message) throws Exception { + receivedMessages.add(message); + countDownLatch.countDown(); + } + }; + try(MessageConsumer consumer = pubsub().pullAsync(subscription, processor)) { + countDownLatch.await(); + } + for (Message message : receivedMessages) { + payloads.contains(message.payloadAsString()); + } + // Messages have all been acked, they should not be pulled again + Iterator messages = pubsub().pull(subscription, 2); + assertFalse(messages.hasNext()); + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testMessageConsumerNack() throws Exception { + String topic = formatForTest("test-message-consumer-nack-topic"); + pubsub().create(TopicInfo.of(topic)); + String subscription = formatForTest("test-message-consumer-nack-subscription"); + pubsub().create(SubscriptionInfo.of(topic, subscription)); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + Set payloads = Sets.newHashSet("payload1", "payload2"); + List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2)); + assertEquals(2, messageIds.size()); + final List receivedMessages = Collections.synchronizedList(new ArrayList()); + final CountDownLatch countDownLatch = new CountDownLatch(2); + MessageProcessor processor = new MessageProcessor() { + @Override + public void process(Message message) throws Exception { + receivedMessages.add(message); + countDownLatch.countDown(); + throw new RuntimeException("Force nack"); + } + }; + try (MessageConsumer consumer = pubsub().pullAsync(subscription, processor)) { + countDownLatch.await(); + } + for (Message message : receivedMessages) { + payloads.contains(message.payloadAsString()); + } + // Messages have all been nacked, we should be able to pull them again + Thread.sleep(5000); + Iterator messages = pubsub().pull(subscription, 2); + while (messages.hasNext()) { + payloads.contains(messages.next().payloadAsString()); + } + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testMessageConsumerWithMoreMessages() throws Exception { + String topic = formatForTest("test-message-consumer-more-messages-topic"); + pubsub().create(TopicInfo.of(topic)); + String subscription = formatForTest("test-message-consumer-more-messages-subscriptions"); + pubsub().create(SubscriptionInfo.of(topic, subscription)); + int totalMessages = 200; + Set payloads = Sets.newHashSetWithExpectedSize(totalMessages); + List messagesToSend = Lists.newArrayListWithCapacity(totalMessages); + for (int i = 0; i < totalMessages; i++) { + String payload = "payload" + i; + messagesToSend.add(Message.of(payload)); + payloads.add(payload); + + } + List messageIds = pubsub().publish(topic, messagesToSend); + assertEquals(totalMessages, messageIds.size()); + final List receivedMessages = Collections.synchronizedList(new ArrayList()); + final CountDownLatch countDownLatch = new CountDownLatch(totalMessages); + MessageProcessor processor = new MessageProcessor() { + @Override + public void process(Message message) throws Exception { + receivedMessages.add(message); + countDownLatch.countDown(); + } + }; + try(MessageConsumer consumer = pubsub().pullAsync(subscription, processor)) { + countDownLatch.await(); + } + // Messages have all been acked, they should not be pulled again + Iterator messages = pubsub().pull(subscription, totalMessages); + assertFalse(messages.hasNext()); + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testMessageConsumerAndAutoRenewDeadline() throws Exception { + String topic = formatForTest("test-message-consumer-and-renew-deadline-topic"); + pubsub().create(TopicInfo.of(topic)); + final String subscription = + formatForTest("test-message-consumer-and-renew-deadline-subscription"); + pubsub().create(SubscriptionInfo.builder(topic, subscription).ackDeadLineSeconds(10).build()); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + Set payloads = Sets.newHashSet("payload1", "payload2"); + List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2)); + assertEquals(2, messageIds.size()); + final List receivedMessages = Collections.synchronizedList(new ArrayList()); + final CountDownLatch countDownLatch = new CountDownLatch(2); + MessageProcessor processor = new MessageProcessor() { + @Override + public void process(Message message) throws Exception { + receivedMessages.add(message); + Thread.sleep(15000); + // message deadline is being renewed, it should not be pulled again + Iterator messages = pubsub().pull(subscription, 2); + assertFalse(messages.hasNext()); + countDownLatch.countDown(); + } + }; + try(MessageConsumer consumer = pubsub().pullAsync(subscription, processor)) { + countDownLatch.await(); + } + for (Message message : receivedMessages) { + payloads.contains(message.payloadAsString()); + } + // Messages have all been acked, they should not be pulled again + Iterator messages = pubsub().pull(subscription, 2); + assertFalse(messages.hasNext()); + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); + } + @Test public void testAckAndNackOneMessage() { String topic = formatForTest("test-ack-one-message-topic"); diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/MessageConsumerImplTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/MessageConsumerImplTest.java new file mode 100644 index 000000000000..91d9ca8e78b6 --- /dev/null +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/MessageConsumerImplTest.java @@ -0,0 +1,400 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import com.google.cloud.GrpcServiceOptions.ExecutorFactory; +import com.google.cloud.pubsub.PubSub.MessageConsumer; +import com.google.cloud.pubsub.PubSub.MessageProcessor; +import com.google.cloud.pubsub.spi.PubSubRpc; +import com.google.cloud.pubsub.spi.PubSubRpc.PullCallback; +import com.google.cloud.pubsub.spi.PubSubRpc.PullFuture; +import com.google.common.util.concurrent.ForwardingListenableFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; + +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; + +public class MessageConsumerImplTest { + + private static final String PROJECT = "project"; + private static final String SUBSCRIPTION = "subscription"; + private static final String SUBSCRIPTION_PB = "projects/project/subscriptions/subscription"; + private static final int MAX_QUEUED_CALLBACKS = 42; + private static final Message MESSAGE1 = Message.of("payload1"); + private static final Message MESSAGE2 = Message.of("payload2"); + private static final String ACK_ID1 = "ack-id1"; + private static final String ACK_ID2 = "ack-id2"; + private static final com.google.pubsub.v1.ReceivedMessage MESSAGE1_PB = + com.google.pubsub.v1.ReceivedMessage.newBuilder() + .setAckId(ACK_ID1) + .setMessage(MESSAGE1.toPb()) + .build(); + private static final com.google.pubsub.v1.ReceivedMessage MESSAGE2_PB = + com.google.pubsub.v1.ReceivedMessage.newBuilder() + .setAckId(ACK_ID2) + .setMessage(MESSAGE2.toPb()) + .build(); + private static final PullResponse PULL_RESPONSE = PullResponse.newBuilder() + .addReceivedMessages(MESSAGE1_PB) + .addReceivedMessages(MESSAGE2_PB) + .build(); + private static final MessageProcessor DO_NOTHING_PROCESSOR = new MessageProcessor() { + @Override + public void process(Message message) throws Exception { + // do nothing + } + }; + private static final MessageProcessor THROW_PROCESSOR = new MessageProcessor() { + @Override + public void process(Message message) throws Exception { + throw new RuntimeException(); + } + }; + private static final PullResponse EMPTY_RESPONSE = PullResponse.getDefaultInstance(); + + private PubSubRpc pubsubRpc; + private PubSub pubsub; + private PubSubOptions options; + private AckDeadlineRenewer renewer; + + @Rule + public Timeout globalTimeout = Timeout.seconds(60); + + static final class TestPullFuture + extends ForwardingListenableFuture.SimpleForwardingListenableFuture + implements PullFuture { + + TestPullFuture(PullResponse response) { + super(Futures.immediateFuture(response)); + } + + @Override + public void addCallback(final PullCallback callback) { + Futures.addCallback(delegate(), new FutureCallback() { + @Override + public void onSuccess(PullResponse result) { + callback.success(result); + } + + @Override + public void onFailure(Throwable error) { + callback.failure(error); + } + }); + } + } + + @Before + public void setUp() { + pubsubRpc = EasyMock.createStrictMock(PubSubRpc.class); + pubsub = EasyMock.createMock(PubSub.class); + options = EasyMock.createStrictMock(PubSubOptions.class); + renewer = EasyMock.createMock(AckDeadlineRenewer.class); + } + + @After + public void tearDown() { + EasyMock.verify(pubsubRpc); + EasyMock.verify(pubsub); + EasyMock.verify(options); + EasyMock.verify(renewer); + + } + + private static PullRequest pullRequest(int maxQueuedCallbacks) { + return PullRequest.newBuilder() + .setMaxMessages(maxQueuedCallbacks) + .setSubscription(SUBSCRIPTION_PB) + .setReturnImmediately(false) + .build(); + } + + private static IAnswer createAnswer(final CountDownLatch latch) { + return new IAnswer() { + @Override + public Void answer() throws Throwable { + latch.countDown(); + return null; + } + }; + } + + @Test + public void testMessageConsumerAck() throws Exception { + PullRequest request = pullRequest(MAX_QUEUED_CALLBACKS); + EasyMock.expect(options.rpc()).andReturn(pubsubRpc); + EasyMock.expect(options.service()).andReturn(pubsub); + EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes(); + EasyMock.expect(pubsub.options()).andReturn(options).times(2); + final CountDownLatch latch = new CountDownLatch(2); + EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); + EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null); + EasyMock.replay(pubsub); + EasyMock.expect(pubsubRpc.pull(request)).andReturn(new TestPullFuture(PULL_RESPONSE)); + EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject())) + .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes(); + renewer.add(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall(); + renewer.add(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + renewer.remove(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + EasyMock.replay(pubsubRpc, options, renewer); + try (MessageConsumer consumer = + MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, DO_NOTHING_PROCESSOR) + .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS) + .build()) { + latch.await(); + } + } + + @Test + public void testMessageConsumerNack() throws Exception { + PullRequest request = pullRequest(MAX_QUEUED_CALLBACKS); + EasyMock.expect(options.rpc()).andReturn(pubsubRpc); + EasyMock.expect(options.service()).andReturn(pubsub); + EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes(); + EasyMock.expect(pubsub.options()).andReturn(options).times(2); + final CountDownLatch latch = new CountDownLatch(2); + EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); + EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null); + EasyMock.replay(pubsub); + EasyMock.expect(pubsubRpc.pull(request)).andReturn(new TestPullFuture(PULL_RESPONSE)); + EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject())) + .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes(); + renewer.add(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall(); + renewer.add(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + renewer.remove(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + EasyMock.replay(pubsubRpc, options, renewer); + try (MessageConsumer consumer = + MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, THROW_PROCESSOR) + .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS) + .build()) { + latch.await(); + } + } + + @Test + public void testMessageConsumerMultipleCallsAck() throws Exception { + PullRequest request1 = pullRequest(MAX_QUEUED_CALLBACKS); + PullRequest request2 = pullRequest(MAX_QUEUED_CALLBACKS - 1); + PullResponse response1 = PullResponse.newBuilder() + .addReceivedMessages(MESSAGE1_PB) + .build(); + PullResponse response2 = PullResponse.newBuilder() + .addReceivedMessages(MESSAGE2_PB) + .build(); + EasyMock.expect(options.rpc()).andReturn(pubsubRpc); + EasyMock.expect(options.service()).andReturn(pubsub); + EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes(); + final CountDownLatch latch = new CountDownLatch(2); + EasyMock.expect(pubsub.options()).andReturn(options); + EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); + EasyMock.expect(pubsub.options()).andReturn(options); + EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null); + EasyMock.replay(pubsub); + EasyMock.expect(pubsubRpc.pull(request1)).andReturn(new TestPullFuture(response1)); + EasyMock.expect(pubsubRpc.pull(request2)).andReturn(new TestPullFuture(response2)); + EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject())) + .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes(); + renewer.add(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + renewer.add(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + EasyMock.replay(pubsubRpc, options, renewer); + try (MessageConsumer consumer = + MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, DO_NOTHING_PROCESSOR) + .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS) + .build()) { + latch.await(); + } + } + + @Test + public void testMessageConsumerMultipleCallsNack() throws Exception { + PullRequest request1 = pullRequest(MAX_QUEUED_CALLBACKS); + PullRequest request2 = pullRequest(MAX_QUEUED_CALLBACKS - 1); + PullResponse response1 = PullResponse.newBuilder() + .addReceivedMessages(MESSAGE1_PB) + .build(); + PullResponse response2 = PullResponse.newBuilder() + .addReceivedMessages(MESSAGE2_PB) + .build(); + EasyMock.expect(options.rpc()).andReturn(pubsubRpc); + EasyMock.expect(options.service()).andReturn(pubsub); + EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes(); + final CountDownLatch latch = new CountDownLatch(2); + EasyMock.expect(pubsub.options()).andReturn(options); + EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); + EasyMock.expect(pubsub.options()).andReturn(options); + EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null); + EasyMock.replay(pubsub); + EasyMock.expect(pubsubRpc.pull(request1)).andReturn(new TestPullFuture(response1)); + EasyMock.expect(pubsubRpc.pull(request2)).andReturn(new TestPullFuture(response2)); + EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject())) + .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes(); + renewer.add(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + renewer.add(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + EasyMock.replay(pubsubRpc, options, renewer); + try (MessageConsumer consumer = + MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, THROW_PROCESSOR) + .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS) + .build()) { + latch.await(); + } + } + + @Test + public void testMessageConsumerMaxCallbacksAck() throws Exception { + PullRequest request1 = pullRequest(2); + PullRequest request2 = pullRequest(2); + PullResponse otherPullResponse = PullResponse.newBuilder() + .addReceivedMessages(MESSAGE1_PB) + .build(); + EasyMock.expect(options.rpc()).andReturn(pubsubRpc); + EasyMock.expect(options.service()).andReturn(pubsub); + EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes(); + EasyMock.expect(pubsub.options()).andReturn(options).times(2); + final CountDownLatch latch = new CountDownLatch(3); + EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); + EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null); + EasyMock.expect(pubsub.options()).andReturn(options); + EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); + EasyMock.replay(pubsub); + EasyMock.expect(pubsubRpc.pull(request1)).andReturn(new TestPullFuture(PULL_RESPONSE)); + EasyMock.expect(pubsubRpc.pull(request2)).andReturn(new TestPullFuture(otherPullResponse)); + EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject())) + .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes(); + renewer.add(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall(); + renewer.add(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + renewer.remove(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + renewer.add(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + EasyMock.replay(pubsubRpc, options, renewer); + try (MessageConsumer consumer = + MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, DO_NOTHING_PROCESSOR) + .maxQueuedCallbacks(2) + .build()) { + latch.await(); + } + } + + @Test + public void testMessageConsumerMaxCallbacksNack() throws Exception { + PullRequest request1 = pullRequest(2); + PullRequest request2 = pullRequest(2); + PullResponse otherPullResponse = PullResponse.newBuilder() + .addReceivedMessages(MESSAGE1_PB) + .build(); + EasyMock.expect(options.rpc()).andReturn(pubsubRpc); + EasyMock.expect(options.service()).andReturn(pubsub); + EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes(); + EasyMock.expect(pubsub.options()).andReturn(options).times(2); + final CountDownLatch latch = new CountDownLatch(3); + EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); + EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null); + EasyMock.expect(pubsub.options()).andReturn(options); + EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); + EasyMock.replay(pubsub); + EasyMock.expect(pubsubRpc.pull(request1)).andReturn(new TestPullFuture(PULL_RESPONSE)); + EasyMock.expect(pubsubRpc.pull(request2)).andReturn(new TestPullFuture(otherPullResponse)); + EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject())) + .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes(); + renewer.add(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall(); + renewer.add(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + renewer.remove(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + renewer.add(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + EasyMock.replay(pubsubRpc, options, renewer); + try (MessageConsumer consumer = + MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, THROW_PROCESSOR) + .maxQueuedCallbacks(2) + .build()) { + latch.await(); + } + } + + @Test + public void testClose() throws Exception { + EasyMock.expect(options.rpc()).andReturn(pubsubRpc); + EasyMock.expect(options.service()).andReturn(pubsub); + final ExecutorService executor = EasyMock.createStrictMock(ExecutorService.class); + executor.shutdown(); + EasyMock.expectLastCall(); + EasyMock.replay(pubsubRpc, pubsub, options, executor, renewer); + MessageConsumer consumer = + MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, DO_NOTHING_PROCESSOR) + .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS) + .executorFactory(new ExecutorFactory() { + @Override + public ExecutorService get() { + return executor; + } + + @Override + public void release(ExecutorService executor) { + executor.shutdown(); + } + }).build(); + consumer.close(); + // closing again should do nothing + consumer.close(); + EasyMock.verify(executor); + } +} diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java index b310c72fe850..9c846ab7c05a 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java @@ -28,7 +28,11 @@ import com.google.cloud.AsyncPage; import com.google.cloud.Page; import com.google.cloud.RetryParams; +import com.google.cloud.pubsub.MessageConsumerImplTest.TestPullFuture; import com.google.cloud.pubsub.PubSub.ListOption; +import com.google.cloud.pubsub.PubSub.MessageConsumer; +import com.google.cloud.pubsub.PubSub.MessageProcessor; +import com.google.cloud.pubsub.PubSub.PullOption; import com.google.cloud.pubsub.spi.PubSubRpc; import com.google.cloud.pubsub.spi.PubSubRpc.PullCallback; import com.google.cloud.pubsub.spi.PubSubRpc.PullFuture; @@ -60,6 +64,7 @@ import org.easymock.Capture; import org.easymock.EasyMock; +import org.easymock.IAnswer; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -69,7 +74,9 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -128,6 +135,12 @@ public String apply(SubscriptionId subscriptionId) { return formatSubscriptionName(subscriptionId.project(), subscriptionId.subscription()); } }; + private static final MessageProcessor DO_NOTHING = new MessageProcessor() { + @Override + public void process(Message message) throws Exception { + // do nothing + } + }; private PubSubOptions options; private PubSubRpcFactory rpcFactoryMock; @@ -139,6 +152,7 @@ public String apply(SubscriptionId subscriptionId) { public ExpectedException thrown = ExpectedException.none(); @Before + @SuppressWarnings("unchecked") public void setUp() { rpcFactoryMock = EasyMock.createStrictMock(PubSubRpcFactory.class); pubsubRpcMock = EasyMock.createStrictMock(PubSubRpc.class); @@ -1355,6 +1369,66 @@ public void testPullMessagesAsyncError() throws ExecutionException, InterruptedE EasyMock.verify(futureMock); } + @Test + public void testMessageConsumer() throws Exception { + pubsub = new PubSubImpl(options, renewerMock); + EasyMock.reset(options); + EasyMock.expect(options.service()).andReturn(pubsub); + EasyMock.expect(options.rpc()).andReturn(pubsubRpcMock); + EasyMock.expect(options.projectId()).andReturn(PROJECT); + EasyMock.replay(options); + PullRequest request = PullRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .setMaxMessages(100) + .setReturnImmediately(false) + .build(); + final PullResponse response = PullResponse.getDefaultInstance(); + final CountDownLatch latch = new CountDownLatch(1); + EasyMock.expect(pubsubRpcMock.pull(request)).andAnswer(new IAnswer() { + @Override + public PullFuture answer() throws Throwable { + latch.countDown(); + return new TestPullFuture(response); + } + }); + EasyMock.replay(pubsubRpcMock, renewerMock); + try (MessageConsumer consumer = pubsub.pullAsync(SUBSCRIPTION, DO_NOTHING)) { + latch.await(); + } + } + + @Test + public void testMessageConsumerWithOptions() throws Exception { + pubsub = new PubSubImpl(options, renewerMock); + EasyMock.reset(options); + EasyMock.expect(options.service()).andReturn(pubsub); + EasyMock.expect(options.rpc()).andReturn(pubsubRpcMock); + EasyMock.expect(options.projectId()).andReturn(PROJECT); + EasyMock.replay(options); + ExecutorService executorServiceMock = EasyMock.createStrictMock(ExecutorService.class); + executorServiceMock.shutdown(); + PullRequest request = PullRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .setMaxMessages(42) + .setReturnImmediately(false) + .build(); + final PullResponse response = PullResponse.getDefaultInstance(); + final CountDownLatch latch = new CountDownLatch(1); + EasyMock.expect(pubsubRpcMock.pull(request)).andAnswer(new IAnswer() { + @Override + public PullFuture answer() throws Throwable { + latch.countDown(); + return new TestPullFuture(response); + } + }); + EasyMock.replay(pubsubRpcMock, renewerMock, executorServiceMock); + PullOption[] options = + {PullOption.maxQueuedCallbacks(42), PullOption.executor(executorServiceMock, true)}; + try (MessageConsumer consumer = pubsub.pullAsync(SUBSCRIPTION, DO_NOTHING, options)) { + latch.await(); + } + } + @Test public void testAckOneMessage() { pubsub = new PubSubImpl(options, renewerMock); diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java index 620e737111f7..49fda68ec6fb 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java @@ -17,17 +17,22 @@ package com.google.cloud.pubsub; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import com.google.cloud.GrpcServiceOptions.ExecutorFactory; import com.google.cloud.pubsub.PubSub.ListOption; import com.google.cloud.pubsub.PubSub.PullOption; +import org.easymock.EasyMock; import org.junit.Test; +import java.util.concurrent.ExecutorService; + public class PubSubTest { private static final int PAGE_SIZE = 42; private static final String PAGE_TOKEN = "page token"; - private static final int MAX_CONCURRENT_CALLBACKS = 42; + private static final int MAX_QUEUED_CALLBACKS = 42; @Test public void testListOption() { @@ -42,9 +47,30 @@ public void testListOption() { } @Test + @SuppressWarnings("unchecked") public void testPullOptions() { - PullOption pullOption = PullOption.maxConcurrentCallbacks(MAX_CONCURRENT_CALLBACKS); - assertEquals(MAX_CONCURRENT_CALLBACKS, pullOption.value()); - assertEquals(PullOption.OptionType.MAX_CONCURRENT_CALLBACKS, pullOption.optionType()); + // max queued callbacks + PullOption pullOption = PullOption.maxQueuedCallbacks(MAX_QUEUED_CALLBACKS); + assertEquals(MAX_QUEUED_CALLBACKS, pullOption.value()); + assertEquals(PullOption.OptionType.MAX_QUEUED_CALLBACKS, pullOption.optionType()); + // auto-closing executor + ExecutorService executor = EasyMock.createNiceMock(ExecutorService.class); + pullOption = PullOption.executor(executor, true); + ExecutorFactory factory = (ExecutorFactory) pullOption.value(); + assertSame(executor, factory.get()); + executor.shutdown(); + EasyMock.expectLastCall(); + EasyMock.replay(executor); + factory.release(executor); + EasyMock.verify(executor); + assertEquals(PullOption.OptionType.EXECUTOR, pullOption.optionType()); + // auto-closing executor + EasyMock.reset(executor); + pullOption = PullOption.executor(executor, false); + factory = (ExecutorFactory) pullOption.value(); + assertSame(executor, factory.get()); + EasyMock.replay(executor); + factory.release(executor); + EasyMock.verify(executor); } }