From 5260293c2b6b7dbf772c7204821b87bee9f37413 Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Mon, 20 Jun 2016 12:33:20 +0200 Subject: [PATCH] PullOption.executor option changed to PullOption.executorFactory --- .../java/com/google/cloud/pubsub/PubSub.java | 40 +++++++------------ .../com/google/cloud/pubsub/PubSubImpl.java | 4 +- .../google/cloud/pubsub/PubSubImplTest.java | 9 +++-- .../com/google/cloud/pubsub/PubSubTest.java | 23 ++--------- 4 files changed, 27 insertions(+), 49 deletions(-) diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index 6a6df7ac6231..62b5fa6ae08d 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -88,7 +88,7 @@ final class PullOption extends Option { private static final long serialVersionUID = 4792164134340316582L; enum OptionType implements Option.OptionType { - EXECUTOR, + EXECUTOR_FACTORY, MAX_QUEUED_CALLBACKS; @SuppressWarnings("unchecked") @@ -122,31 +122,21 @@ public static PullOption maxQueuedCallbacks(int maxQueuedCallbacks) { /** * Returns an option to specify the executor used to execute message processor callbacks. The - * executor determines the number of messages that can be processed at the same time. The - * {@code shouldAutoClose} parameter sets whether the executor should be shutdown when the - * message consumer is closed. If not provided, a single-threaded executor is used. + * executor determines the number of messages that can be processed at the same time. If not + * provided, a single-threaded executor is used to execute message processor callbacks. * - * @param executor the executor used to run message processor callbacks - * @param shouldAutoClose if {@code true}, the executor is shutdown when the message consumer is - * closed. If {@code false}, the user must take care of shutting the executor down. + *

The {@link ExecutorFactory} object can be used to handle creation and release of the + * executor, possibly reusing existing executors. {@link ExecutorFactory#get()} is called when + * the message consumer is created. {@link ExecutorFactory#release(ExecutorService)} is called + * when the message consumer is closed. + * + *

For the created option to be serializable, the provided executor factory should implement + * {@link java.io.Serializable}. + * + * @param executorFactory the executor factory. */ - public static PullOption executor(final ExecutorService executor, - final boolean shouldAutoClose) { - return new PullOption(OptionType.EXECUTOR, new ExecutorFactory() { - - @Override - public ExecutorService get() { - return executor; - } - - @Override - public void release(ExecutorService toRelease) { - checkArgument(executor == toRelease, "Releasing the wrong executor"); - if (shouldAutoClose) { - executor.shutdown(); - } - } - }); + public static PullOption executorFactory(ExecutorFactory executorFactory) { + return new PullOption(OptionType.EXECUTOR_FACTORY, executorFactory); } } @@ -485,7 +475,7 @@ interface MessageConsumer extends AutoCloseable { * *

The {@link PullOption#maxQueuedCallbacks(int)} option can be used to control the maximum * number of queued messages (messages either being processed or waiting to be processed). The - * {@link PullOption#executor(ExecutorService, boolean)} can be used to provide an executor to run + * {@link PullOption#executorFactory(ExecutorFactory)} can be used to provide an executor to run * message processor callbacks. * * @param subscription the subscription from which to pull messages diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java index f16e46f729e1..355d7c36aed1 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java @@ -18,7 +18,7 @@ import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_SIZE; import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_TOKEN; -import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.EXECUTOR; +import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.EXECUTOR_FACTORY; import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.MAX_QUEUED_CALLBACKS; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.util.concurrent.Futures.lazyTransform; @@ -515,7 +515,7 @@ public MessageConsumer pullAsync(String subscription, MessageProcessor callback, Map optionMap = optionMap(options); return MessageConsumerImpl.builder(options(), subscription, ackDeadlineRenewer, callback) .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS.getInteger(optionMap)) - .executorFactory(EXECUTOR.getExecutorFactory(optionMap)) + .executorFactory(EXECUTOR_FACTORY.getExecutorFactory(optionMap)) .build(); } diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java index 9c846ab7c05a..7d6dc928d883 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java @@ -26,6 +26,7 @@ import static org.junit.Assert.fail; import com.google.cloud.AsyncPage; +import com.google.cloud.GrpcServiceOptions.ExecutorFactory; import com.google.cloud.Page; import com.google.cloud.RetryParams; import com.google.cloud.pubsub.MessageConsumerImplTest.TestPullFuture; @@ -1405,8 +1406,10 @@ public void testMessageConsumerWithOptions() throws Exception { EasyMock.expect(options.rpc()).andReturn(pubsubRpcMock); EasyMock.expect(options.projectId()).andReturn(PROJECT); EasyMock.replay(options); + ExecutorFactory executorFactoryMock = EasyMock.createStrictMock(ExecutorFactory.class); ExecutorService executorServiceMock = EasyMock.createStrictMock(ExecutorService.class); - executorServiceMock.shutdown(); + EasyMock.expect(executorFactoryMock.get()).andReturn(executorServiceMock); + executorFactoryMock.release(executorServiceMock); PullRequest request = PullRequest.newBuilder() .setSubscription(SUBSCRIPTION_NAME_PB) .setMaxMessages(42) @@ -1421,9 +1424,9 @@ public PullFuture answer() throws Throwable { return new TestPullFuture(response); } }); - EasyMock.replay(pubsubRpcMock, renewerMock, executorServiceMock); + EasyMock.replay(pubsubRpcMock, renewerMock, executorFactoryMock, executorServiceMock); PullOption[] options = - {PullOption.maxQueuedCallbacks(42), PullOption.executor(executorServiceMock, true)}; + {PullOption.maxQueuedCallbacks(42), PullOption.executorFactory(executorFactoryMock)}; try (MessageConsumer consumer = pubsub.pullAsync(SUBSCRIPTION, DO_NOTHING, options)) { latch.await(); } diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java index 49fda68ec6fb..30db61717c9e 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java @@ -53,24 +53,9 @@ public void testPullOptions() { PullOption pullOption = PullOption.maxQueuedCallbacks(MAX_QUEUED_CALLBACKS); assertEquals(MAX_QUEUED_CALLBACKS, pullOption.value()); assertEquals(PullOption.OptionType.MAX_QUEUED_CALLBACKS, pullOption.optionType()); - // auto-closing executor - ExecutorService executor = EasyMock.createNiceMock(ExecutorService.class); - pullOption = PullOption.executor(executor, true); - ExecutorFactory factory = (ExecutorFactory) pullOption.value(); - assertSame(executor, factory.get()); - executor.shutdown(); - EasyMock.expectLastCall(); - EasyMock.replay(executor); - factory.release(executor); - EasyMock.verify(executor); - assertEquals(PullOption.OptionType.EXECUTOR, pullOption.optionType()); - // auto-closing executor - EasyMock.reset(executor); - pullOption = PullOption.executor(executor, false); - factory = (ExecutorFactory) pullOption.value(); - assertSame(executor, factory.get()); - EasyMock.replay(executor); - factory.release(executor); - EasyMock.verify(executor); + ExecutorFactory executorFactory = EasyMock.createStrictMock(ExecutorFactory.class); + pullOption = PullOption.executorFactory(executorFactory); + assertSame(executorFactory, pullOption.value()); + assertEquals(PullOption.OptionType.EXECUTOR_FACTORY, pullOption.optionType()); } }