Skip to content

Commit

Permalink
PullOption.executor option changed to PullOption.executorFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
mziccard committed Jun 20, 2016
1 parent 7693456 commit 5260293
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ final class PullOption extends Option {
private static final long serialVersionUID = 4792164134340316582L;

enum OptionType implements Option.OptionType {
EXECUTOR,
EXECUTOR_FACTORY,
MAX_QUEUED_CALLBACKS;

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -122,31 +122,21 @@ public static PullOption maxQueuedCallbacks(int maxQueuedCallbacks) {

/**
* Returns an option to specify the executor used to execute message processor callbacks. The
* executor determines the number of messages that can be processed at the same time. The
* {@code shouldAutoClose} parameter sets whether the executor should be shutdown when the
* message consumer is closed. If not provided, a single-threaded executor is used.
* executor determines the number of messages that can be processed at the same time. If not
* provided, a single-threaded executor is used to execute message processor callbacks.
*
* @param executor the executor used to run message processor callbacks
* @param shouldAutoClose if {@code true}, the executor is shutdown when the message consumer is
* closed. If {@code false}, the user must take care of shutting the executor down.
* <p>The {@link ExecutorFactory} object can be used to handle creation and release of the
* executor, possibly reusing existing executors. {@link ExecutorFactory#get()} is called when
* the message consumer is created. {@link ExecutorFactory#release(ExecutorService)} is called
* when the message consumer is closed.
*
* <p>For the created option to be serializable, the provided executor factory should implement
* {@link java.io.Serializable}.
*
* @param executorFactory the executor factory.
*/
public static PullOption executor(final ExecutorService executor,
final boolean shouldAutoClose) {
return new PullOption(OptionType.EXECUTOR, new ExecutorFactory<ExecutorService>() {

@Override
public ExecutorService get() {
return executor;
}

@Override
public void release(ExecutorService toRelease) {
checkArgument(executor == toRelease, "Releasing the wrong executor");
if (shouldAutoClose) {
executor.shutdown();
}
}
});
public static PullOption executorFactory(ExecutorFactory executorFactory) {
return new PullOption(OptionType.EXECUTOR_FACTORY, executorFactory);
}
}

Expand Down Expand Up @@ -485,7 +475,7 @@ interface MessageConsumer extends AutoCloseable {
*
* <p>The {@link PullOption#maxQueuedCallbacks(int)} option can be used to control the maximum
* number of queued messages (messages either being processed or waiting to be processed). The
* {@link PullOption#executor(ExecutorService, boolean)} can be used to provide an executor to run
* {@link PullOption#executorFactory(ExecutorFactory)} can be used to provide an executor to run
* message processor callbacks.
*
* @param subscription the subscription from which to pull messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_SIZE;
import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_TOKEN;
import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.EXECUTOR;
import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.EXECUTOR_FACTORY;
import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.MAX_QUEUED_CALLBACKS;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.lazyTransform;
Expand Down Expand Up @@ -515,7 +515,7 @@ public MessageConsumer pullAsync(String subscription, MessageProcessor callback,
Map<Option.OptionType, ?> optionMap = optionMap(options);
return MessageConsumerImpl.builder(options(), subscription, ackDeadlineRenewer, callback)
.maxQueuedCallbacks(MAX_QUEUED_CALLBACKS.getInteger(optionMap))
.executorFactory(EXECUTOR.getExecutorFactory(optionMap))
.executorFactory(EXECUTOR_FACTORY.getExecutorFactory(optionMap))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.junit.Assert.fail;

import com.google.cloud.AsyncPage;
import com.google.cloud.GrpcServiceOptions.ExecutorFactory;
import com.google.cloud.Page;
import com.google.cloud.RetryParams;
import com.google.cloud.pubsub.MessageConsumerImplTest.TestPullFuture;
Expand Down Expand Up @@ -1405,8 +1406,10 @@ public void testMessageConsumerWithOptions() throws Exception {
EasyMock.expect(options.rpc()).andReturn(pubsubRpcMock);
EasyMock.expect(options.projectId()).andReturn(PROJECT);
EasyMock.replay(options);
ExecutorFactory executorFactoryMock = EasyMock.createStrictMock(ExecutorFactory.class);
ExecutorService executorServiceMock = EasyMock.createStrictMock(ExecutorService.class);
executorServiceMock.shutdown();
EasyMock.expect(executorFactoryMock.get()).andReturn(executorServiceMock);
executorFactoryMock.release(executorServiceMock);
PullRequest request = PullRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
.setMaxMessages(42)
Expand All @@ -1421,9 +1424,9 @@ public PullFuture answer() throws Throwable {
return new TestPullFuture(response);
}
});
EasyMock.replay(pubsubRpcMock, renewerMock, executorServiceMock);
EasyMock.replay(pubsubRpcMock, renewerMock, executorFactoryMock, executorServiceMock);
PullOption[] options =
{PullOption.maxQueuedCallbacks(42), PullOption.executor(executorServiceMock, true)};
{PullOption.maxQueuedCallbacks(42), PullOption.executorFactory(executorFactoryMock)};
try (MessageConsumer consumer = pubsub.pullAsync(SUBSCRIPTION, DO_NOTHING, options)) {
latch.await();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,9 @@ public void testPullOptions() {
PullOption pullOption = PullOption.maxQueuedCallbacks(MAX_QUEUED_CALLBACKS);
assertEquals(MAX_QUEUED_CALLBACKS, pullOption.value());
assertEquals(PullOption.OptionType.MAX_QUEUED_CALLBACKS, pullOption.optionType());
// auto-closing executor
ExecutorService executor = EasyMock.createNiceMock(ExecutorService.class);
pullOption = PullOption.executor(executor, true);
ExecutorFactory<ExecutorService> factory = (ExecutorFactory) pullOption.value();
assertSame(executor, factory.get());
executor.shutdown();
EasyMock.expectLastCall();
EasyMock.replay(executor);
factory.release(executor);
EasyMock.verify(executor);
assertEquals(PullOption.OptionType.EXECUTOR, pullOption.optionType());
// auto-closing executor
EasyMock.reset(executor);
pullOption = PullOption.executor(executor, false);
factory = (ExecutorFactory) pullOption.value();
assertSame(executor, factory.get());
EasyMock.replay(executor);
factory.release(executor);
EasyMock.verify(executor);
ExecutorFactory executorFactory = EasyMock.createStrictMock(ExecutorFactory.class);
pullOption = PullOption.executorFactory(executorFactory);
assertSame(executorFactory, pullOption.value());
assertEquals(PullOption.OptionType.EXECUTOR_FACTORY, pullOption.optionType());
}
}

0 comments on commit 5260293

Please sign in to comment.