From 0093ac3280521aa043c3af4f4c2406d3fce58356 Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Wed, 11 May 2016 19:38:04 +0200 Subject: [PATCH] Make PubSub and PubSubRpc extends AutoCloseable, fix grpc settings --- .../java/com/google/cloud/pubsub/PubSub.java | 2 +- .../com/google/cloud/pubsub/PubSubImpl.java | 5 ++ .../cloud/pubsub/spi/DefaultPubSubRpc.java | 83 ++++++++++++++----- .../google/cloud/pubsub/spi/PubSubRpc.java | 5 +- 4 files changed, 74 insertions(+), 21 deletions(-) diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index 6fde6f4425df..48de7002d54f 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -31,7 +31,7 @@ * * @see Google Cloud Pub/Sub */ -public interface PubSub extends Service { +public interface PubSub extends AutoCloseable, Service { /** * Class for specifying options for listing topics and subscriptions. diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java index 19b2e5a35fec..bd69103b9819 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java @@ -278,4 +278,9 @@ public Future modifyAckDeadlineAsync(String subscription, int deadline, Ti Iterable ackIds) { return null; } + + @Override + public void close() throws Exception { + rpc.close(); + } } diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java index d4f00fd7cf37..6ce27e3de56d 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java @@ -19,6 +19,8 @@ import com.google.api.gax.core.RetrySettings; import com.google.api.gax.grpc.ApiCallSettings; import com.google.api.gax.grpc.ApiException; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.AuthCredentials; import com.google.cloud.RetryParams; import com.google.cloud.pubsub.PubSubException; import com.google.cloud.pubsub.PubSubOptions; @@ -27,8 +29,10 @@ import com.google.cloud.pubsub.spi.v1.SubscriberApi; import com.google.cloud.pubsub.spi.v1.SubscriberSettings; import com.google.common.base.Function; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Empty; import com.google.pubsub.v1.AcknowledgeRequest; import com.google.pubsub.v1.DeleteSubscriptionRequest; @@ -50,48 +54,78 @@ import com.google.pubsub.v1.Subscription; import com.google.pubsub.v1.Topic; +import io.grpc.ManagedChannel; +import io.grpc.Status.Code; +import io.grpc.netty.NegotiationType; +import io.grpc.netty.NettyChannelBuilder; + import org.joda.time.Duration; import java.io.IOException; import java.util.Set; import java.util.concurrent.Future; - -import autovalue.shaded.com.google.common.common.collect.Sets; -import io.grpc.Status.Code; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; public class DefaultPubSubRpc implements PubSubRpc { private final PublisherApi publisherApi; private final SubscriberApi subscriberApi; + private final ScheduledExecutorService executor = + MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(8)); public DefaultPubSubRpc(PubSubOptions options) throws IOException { try { - // Provide (and use a common thread-pool). - // This depends on https://github.com/googleapis/gax-java/issues/73 - PublisherSettings.Builder pbuilder = - PublisherSettings.defaultBuilder() - .provideChannelWith(options.authCredentials().credentials()) - .applyToAllApiMethods(apiCallSettings(options)); - publisherApi = PublisherApi.create(pbuilder.build()); - SubscriberSettings.Builder sBuilder = - SubscriberSettings.defaultBuilder() - .provideChannelWith(options.authCredentials().credentials()) - .applyToAllApiMethods(apiCallSettings(options)); - subscriberApi = SubscriberApi.create(sBuilder.build()); + PublisherSettings.Builder pubBuilder = + PublisherSettings.defaultBuilder().provideExecutorWith(executor, false); + SubscriberSettings.Builder subBuilder = + SubscriberSettings.defaultBuilder().provideExecutorWith(executor, false); + // todo(mziccard): PublisherSettings should support null/absent credentials for testing + if (options.host().contains("localhost") + || options.authCredentials().equals(AuthCredentials.noAuth())) { + ManagedChannel channel = NettyChannelBuilder.forTarget(options.host()) + .negotiationType(NegotiationType.PLAINTEXT) + .build(); + pubBuilder.provideChannelWith(channel, true); + subBuilder.provideChannelWith(channel, true); + } else { + GoogleCredentials credentials = options.authCredentials().credentials(); + pubBuilder.provideChannelWith( + credentials.createScoped(PublisherSettings.DEFAULT_SERVICE_SCOPES)); + subBuilder.provideChannelWith( + credentials.createScoped(SubscriberSettings.DEFAULT_SERVICE_SCOPES)); + } + pubBuilder.applyToAllApiMethods(apiCallSettings(options)); + subBuilder.applyToAllApiMethods(apiCallSettings(options)); + publisherApi = PublisherApi.create(pubBuilder.build()); + subscriberApi = SubscriberApi.create(subBuilder.build()); } catch (Exception ex) { throw new IOException(ex); } } + private static long translateTimeout(long timeout) { + if (timeout < 0) { + return 20000; + } else if (timeout == 0) { + return Long.MAX_VALUE; + } + return timeout; + } + private static ApiCallSettings.Builder apiCallSettings(PubSubOptions options) { // TODO: specify timeout these settings: // retryParams.retryMaxAttempts(), retryParams.retryMinAttempts() RetryParams retryParams = options.retryParams(); + long connectTimeout = translateTimeout(options.connectTimeout()); + long readTimeout = translateTimeout(options.readTimeout()); + long maxTimeout = connectTimeout == Long.MAX_VALUE || readTimeout == Long.MAX_VALUE + ? Long.MAX_VALUE : connectTimeout + readTimeout; final RetrySettings.Builder builder = RetrySettings.newBuilder() .setTotalTimeout(Duration.millis(retryParams.totalRetryPeriodMillis())) - .setInitialRpcTimeout(Duration.millis(options.connectTimeout())) + .setInitialRpcTimeout(Duration.millis(connectTimeout)) .setRpcTimeoutMultiplier(1.5) - .setMaxRpcTimeout(Duration.millis(options.connectTimeout() + options.readTimeout())) + .setMaxRpcTimeout(Duration.millis(maxTimeout)) .setInitialRetryDelay(Duration.millis(retryParams.initialRetryDelayMillis())) .setRetryDelayMultiplier(retryParams.retryDelayBackoffFactor()) .setMaxRetryDelay(Duration.millis(retryParams.maxRetryDelayMillis())); @@ -117,7 +151,7 @@ public V apply(ApiException exception) { @Override public Future create(Topic topic) { - // TODO: it would be nice if we can get the idempotent inforamtion from the ApiCallSettings + // TODO: it would be nice if we can get the idempotent information from the ApiCallSettings // or from the exception return translate(publisherApi.createTopicCallable().futureCall(topic), true); } @@ -149,7 +183,6 @@ public Future list(ListTopicSubscriptionsRequest @Override public Future delete(DeleteTopicRequest request) { - // TODO: check if null is not going to work for Empty return translate(publisherApi.deleteTopicCallable().futureCall(request), true, Code.NOT_FOUND.value()); } @@ -195,4 +228,16 @@ public Future pull(PullRequest request) { public Future modify(ModifyPushConfigRequest request) { return translate(subscriberApi.modifyPushConfigCallable().futureCall(request), false); } + + @Override + public ScheduledExecutorService executor() { + return executor; + } + + @Override + public void close() throws Exception { + subscriberApi.close(); + publisherApi.close(); + executor.shutdown(); + } } diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java index 8474ba042234..5af166a9f1ee 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java @@ -38,8 +38,9 @@ import com.google.pubsub.v1.Topic; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; -public interface PubSubRpc { +public interface PubSubRpc extends AutoCloseable { // in all cases root cause of ExecutionException is PubSubException Future create(Topic topic); @@ -69,4 +70,6 @@ public interface PubSubRpc { Future pull(PullRequest request); Future modify(ModifyPushConfigRequest request); + + ScheduledExecutorService executor(); }