From 67bf8fc05b6cdd789f8c5c029de36501670663b2 Mon Sep 17 00:00:00 2001 From: dpcollins-google <40498610+dpcollins-google@users.noreply.github.com> Date: Tue, 12 Mar 2019 18:24:12 -0400 Subject: [PATCH] Change each StreamingSubscriberConnection to have its own executor by default. (#4622) * Change each StreamingSubscriberConnection to have its own executor by default. This increases throughput by reducing contention on the executor queue mutex and makes the Subscriber implementation more accurately reflect the users intent when an InstantiatingExecutorProvider is passed. * Add a comment for executorProvider and alarmsExecutor. --- .../google/cloud/pubsub/v1/Subscriber.java | 37 +++++++++++-------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 487ee0ccb4cb..70d17a4c9884 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -99,7 +99,10 @@ public class Subscriber extends AbstractApiService { private final String subscriptionName; private final FlowControlSettings flowControlSettings; private final Duration maxAckExtensionPeriod; - private final ScheduledExecutorService executor; + // The ExecutorProvider used to generate executors for processing messages. + private final ExecutorProvider executorProvider; + // An instantiation of the SystemExecutorProvider used for processing acks + // and other system actions. @Nullable private final ScheduledExecutorService alarmsExecutor; private final Distribution ackLatencyDistribution = new Distribution(MAX_ACK_DEADLINE_SECONDS + 1); @@ -132,16 +135,7 @@ private Subscriber(Builder builder) { this.numPullers = builder.parallelPullCount; - executor = builder.executorProvider.getExecutor(); - if (builder.executorProvider.shouldAutoClose()) { - closeables.add( - new AutoCloseable() { - @Override - public void close() throws IOException { - executor.shutdown(); - } - }); - } + executorProvider = builder.executorProvider; ExecutorProvider systemExecutorProvider = builder.systemExecutorProvider; if (systemExecutorProvider == null) { @@ -322,6 +316,17 @@ public void run() { private void startStreamingConnections() { synchronized (streamingSubscriberConnections) { for (int i = 0; i < numPullers; i++) { + final ScheduledExecutorService executor = executorProvider.getExecutor(); + if (executorProvider.shouldAutoClose()) { + closeables.add( + new AutoCloseable() { + @Override + public void close() { + executor.shutdown(); + } + }); + } + streamingSubscriberConnections.add( new StreamingSubscriberConnection( subscriptionName, @@ -364,7 +369,7 @@ private void stopAllStreamingConnections() { private void startConnections( List connections, final ApiService.Listener connectionsListener) { for (ApiService subscriber : connections) { - subscriber.addListener(connectionsListener, executor); + subscriber.addListener(connectionsListener, alarmsExecutor); subscriber.startAsync(); } for (ApiService subscriber : connections) { @@ -398,8 +403,7 @@ public static final class Builder { static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER = InstantiatingExecutorProvider.newBuilder() - .setExecutorThreadCount( - THREADS_PER_CHANNEL * Runtime.getRuntime().availableProcessors()) + .setExecutorThreadCount(THREADS_PER_CHANNEL) .build(); String subscriptionName; @@ -502,7 +506,10 @@ public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) { return this; } - /** Gives the ability to set a custom executor. */ + /** + * Gives the ability to set a custom executor. {@link ExecutorProvider#getExecutor()} will be + * called {@link Builder#parallelPullCount} times. + */ public Builder setExecutorProvider(ExecutorProvider executorProvider) { this.executorProvider = Preconditions.checkNotNull(executorProvider); return this;