diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java
index 6a6df7ac6231..62b5fa6ae08d 100644
--- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java
+++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java
@@ -88,7 +88,7 @@ final class PullOption extends Option {
private static final long serialVersionUID = 4792164134340316582L;
enum OptionType implements Option.OptionType {
- EXECUTOR,
+ EXECUTOR_FACTORY,
MAX_QUEUED_CALLBACKS;
@SuppressWarnings("unchecked")
@@ -122,31 +122,21 @@ public static PullOption maxQueuedCallbacks(int maxQueuedCallbacks) {
/**
* Returns an option to specify the executor used to execute message processor callbacks. The
- * executor determines the number of messages that can be processed at the same time. The
- * {@code shouldAutoClose} parameter sets whether the executor should be shutdown when the
- * message consumer is closed. If not provided, a single-threaded executor is used.
+ * executor determines the number of messages that can be processed at the same time. If not
+ * provided, a single-threaded executor is used to execute message processor callbacks.
*
- * @param executor the executor used to run message processor callbacks
- * @param shouldAutoClose if {@code true}, the executor is shutdown when the message consumer is
- * closed. If {@code false}, the user must take care of shutting the executor down.
+ *
The {@link ExecutorFactory} object can be used to handle creation and release of the
+ * executor, possibly reusing existing executors. {@link ExecutorFactory#get()} is called when
+ * the message consumer is created. {@link ExecutorFactory#release(ExecutorService)} is called
+ * when the message consumer is closed.
+ *
+ *
For the created option to be serializable, the provided executor factory should implement
+ * {@link java.io.Serializable}.
+ *
+ * @param executorFactory the executor factory.
*/
- public static PullOption executor(final ExecutorService executor,
- final boolean shouldAutoClose) {
- return new PullOption(OptionType.EXECUTOR, new ExecutorFactory() {
-
- @Override
- public ExecutorService get() {
- return executor;
- }
-
- @Override
- public void release(ExecutorService toRelease) {
- checkArgument(executor == toRelease, "Releasing the wrong executor");
- if (shouldAutoClose) {
- executor.shutdown();
- }
- }
- });
+ public static PullOption executorFactory(ExecutorFactory executorFactory) {
+ return new PullOption(OptionType.EXECUTOR_FACTORY, executorFactory);
}
}
@@ -485,7 +475,7 @@ interface MessageConsumer extends AutoCloseable {
*
* The {@link PullOption#maxQueuedCallbacks(int)} option can be used to control the maximum
* number of queued messages (messages either being processed or waiting to be processed). The
- * {@link PullOption#executor(ExecutorService, boolean)} can be used to provide an executor to run
+ * {@link PullOption#executorFactory(ExecutorFactory)} can be used to provide an executor to run
* message processor callbacks.
*
* @param subscription the subscription from which to pull messages
diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java
index f16e46f729e1..355d7c36aed1 100644
--- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java
+++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java
@@ -18,7 +18,7 @@
import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_SIZE;
import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_TOKEN;
-import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.EXECUTOR;
+import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.EXECUTOR_FACTORY;
import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.MAX_QUEUED_CALLBACKS;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.lazyTransform;
@@ -515,7 +515,7 @@ public MessageConsumer pullAsync(String subscription, MessageProcessor callback,
Map optionMap = optionMap(options);
return MessageConsumerImpl.builder(options(), subscription, ackDeadlineRenewer, callback)
.maxQueuedCallbacks(MAX_QUEUED_CALLBACKS.getInteger(optionMap))
- .executorFactory(EXECUTOR.getExecutorFactory(optionMap))
+ .executorFactory(EXECUTOR_FACTORY.getExecutorFactory(optionMap))
.build();
}
diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java
index 9c846ab7c05a..7d6dc928d883 100644
--- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java
+++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java
@@ -26,6 +26,7 @@
import static org.junit.Assert.fail;
import com.google.cloud.AsyncPage;
+import com.google.cloud.GrpcServiceOptions.ExecutorFactory;
import com.google.cloud.Page;
import com.google.cloud.RetryParams;
import com.google.cloud.pubsub.MessageConsumerImplTest.TestPullFuture;
@@ -1405,8 +1406,10 @@ public void testMessageConsumerWithOptions() throws Exception {
EasyMock.expect(options.rpc()).andReturn(pubsubRpcMock);
EasyMock.expect(options.projectId()).andReturn(PROJECT);
EasyMock.replay(options);
+ ExecutorFactory executorFactoryMock = EasyMock.createStrictMock(ExecutorFactory.class);
ExecutorService executorServiceMock = EasyMock.createStrictMock(ExecutorService.class);
- executorServiceMock.shutdown();
+ EasyMock.expect(executorFactoryMock.get()).andReturn(executorServiceMock);
+ executorFactoryMock.release(executorServiceMock);
PullRequest request = PullRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
.setMaxMessages(42)
@@ -1421,9 +1424,9 @@ public PullFuture answer() throws Throwable {
return new TestPullFuture(response);
}
});
- EasyMock.replay(pubsubRpcMock, renewerMock, executorServiceMock);
+ EasyMock.replay(pubsubRpcMock, renewerMock, executorFactoryMock, executorServiceMock);
PullOption[] options =
- {PullOption.maxQueuedCallbacks(42), PullOption.executor(executorServiceMock, true)};
+ {PullOption.maxQueuedCallbacks(42), PullOption.executorFactory(executorFactoryMock)};
try (MessageConsumer consumer = pubsub.pullAsync(SUBSCRIPTION, DO_NOTHING, options)) {
latch.await();
}
diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java
index 49fda68ec6fb..30db61717c9e 100644
--- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java
+++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java
@@ -53,24 +53,9 @@ public void testPullOptions() {
PullOption pullOption = PullOption.maxQueuedCallbacks(MAX_QUEUED_CALLBACKS);
assertEquals(MAX_QUEUED_CALLBACKS, pullOption.value());
assertEquals(PullOption.OptionType.MAX_QUEUED_CALLBACKS, pullOption.optionType());
- // auto-closing executor
- ExecutorService executor = EasyMock.createNiceMock(ExecutorService.class);
- pullOption = PullOption.executor(executor, true);
- ExecutorFactory factory = (ExecutorFactory) pullOption.value();
- assertSame(executor, factory.get());
- executor.shutdown();
- EasyMock.expectLastCall();
- EasyMock.replay(executor);
- factory.release(executor);
- EasyMock.verify(executor);
- assertEquals(PullOption.OptionType.EXECUTOR, pullOption.optionType());
- // auto-closing executor
- EasyMock.reset(executor);
- pullOption = PullOption.executor(executor, false);
- factory = (ExecutorFactory) pullOption.value();
- assertSame(executor, factory.get());
- EasyMock.replay(executor);
- factory.release(executor);
- EasyMock.verify(executor);
+ ExecutorFactory executorFactory = EasyMock.createStrictMock(ExecutorFactory.class);
+ pullOption = PullOption.executorFactory(executorFactory);
+ assertSame(executorFactory, pullOption.value());
+ assertEquals(PullOption.OptionType.EXECUTOR_FACTORY, pullOption.optionType());
}
}