diff --git a/src/main/java/com/google/api/gax/grpc/ApiCallSettings.java b/src/main/java/com/google/api/gax/grpc/ApiCallSettings.java new file mode 100644 index 000000000..c76bb9b65 --- /dev/null +++ b/src/main/java/com/google/api/gax/grpc/ApiCallSettings.java @@ -0,0 +1,310 @@ +/* + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.google.api.gax.grpc; + +import com.google.api.gax.core.ConnectionSettings; +import com.google.api.gax.core.RetryParams; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.MoreExecutors; + +import io.grpc.ClientInterceptor; +import io.grpc.ManagedChannel; +import io.grpc.Status; +import io.grpc.auth.ClientAuthInterceptor; +import io.grpc.netty.NegotiationType; +import io.grpc.netty.NettyChannelBuilder; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +// TODO(pongad): Don't close the channel if the user gives one to us +/** + * A settings class to configure API method calls for either a single method or a + * whole service. + * + * A note on channels: whichever service API class that this instance of ServiceApiSettings + * is passed to will call shutdown() on the channel provided by {@link getChannel}. + * Setting a channel is intended for use by unit tests to override the channel, + * and should not be used in production. + */ +public class ApiCallSettings { + + private ChannelProvider channelProvider; + private ExecutorProvider executorProvider; + + private Set defaultRetryableCodes = null; + private Set retryableCodes = null; + + private RetryParams defaultRetryParams = null; + private RetryParams retryParams = null; + + public static final int DEFAULT_EXECUTOR_THREADS = 4; + + /** + * Constructs an instance of ApiCallSettings. + */ + public ApiCallSettings() { + channelProvider = new ChannelProvider() { + @Override + public ManagedChannel getChannel(Executor executor) { + throw new RuntimeException("No Channel or ConnectionSettings provided."); + } + + @Override + public boolean isOverridden() { + return false; + } + }; + executorProvider = new ExecutorProvider() { + private ScheduledExecutorService executor = null; + @Override + public ScheduledExecutorService getExecutor() { + if (executor != null) { + return executor; + } + executor = MoreExecutors.getExitingScheduledExecutorService( + new ScheduledThreadPoolExecutor(DEFAULT_EXECUTOR_THREADS)); + return executor; + } + + @Override + public boolean isOverridden() { + return false; + } + }; + retryableCodes = new HashSet<>(); + } + + private interface ChannelProvider { + ManagedChannel getChannel(Executor executor) throws IOException; + boolean isOverridden(); + } + + /** + * Sets a channel for this ApiCallSettings to use. This prevents a channel + * from being created. + * + * See class documentation for more details on channels. + */ + public ApiCallSettings provideChannelWith(final ManagedChannel channel) { + channelProvider = new ChannelProvider() { + @Override + public ManagedChannel getChannel(Executor executor) { + return channel; + } + + @Override + public boolean isOverridden() { + return true; + } + }; + return this; + } + + /** + * Provides the connection settings necessary to create a channel. + */ + public ApiCallSettings provideChannelWith(final ConnectionSettings settings) { + channelProvider = new ChannelProvider() { + private ManagedChannel channel = null; + @Override + public ManagedChannel getChannel(Executor executor) throws IOException { + if (channel != null) { + return channel; + } + + List interceptors = Lists.newArrayList(); + interceptors.add(new ClientAuthInterceptor(settings.getCredentials(), executor)); + + channel = NettyChannelBuilder.forAddress(settings.getServiceAddress(), settings.getPort()) + .negotiationType(NegotiationType.TLS) + .intercept(interceptors) + .build(); + return channel; + } + + @Override + public boolean isOverridden() { + return true; + } + }; + return this; + } + + /** + * The channel used to send requests to the service. + * + * If no channel was set, a default channel will be instantiated, using + * the connection settings provided. + * + * See class documentation for more details on channels. + */ + public ManagedChannel getChannel() throws IOException { + return channelProvider.getChannel(getExecutor()); + } + + /** + * Returns true if either a channel was set or connection settings were + * provided to create a channel. + */ + public boolean isChannelOverridden() { + return channelProvider.isOverridden(); + } + + private interface ExecutorProvider { + ScheduledExecutorService getExecutor(); + boolean isOverridden(); + } + + /** + * Sets the executor to use for channels, retries, and bundling. + * + * It is up to the user to terminate the {@code Executor} when it is no longer needed. + */ + public ApiCallSettings setExecutor(final ScheduledExecutorService executor) { + executorProvider = new ExecutorProvider() { + @Override + public ScheduledExecutorService getExecutor() { + return executor; + } + + @Override + public boolean isOverridden() { + return true; + } + }; + return this; + } + + /** + * The executor to be used by the client. + * + * If no executor was set, a default {@link java.util.concurrent.ScheduledThreadPoolExecutor} + * with {@link DEFAULT_EXECUTOR_THREADS} will be instantiated. + * + * The default executor is guaranteed to not prevent JVM from normally exiting, + * but may wait for up to 120 seconds after all non-daemon threads exit to give received tasks + * time to complete. + * + * If this behavior is not desirable, the user may specify a custom {@code Executor}. + */ + public ScheduledExecutorService getExecutor() { + return executorProvider.getExecutor(); + } + + /** + * Returns true if an executor was set. + */ + public boolean isExecutorOverridden() { + return executorProvider.isOverridden(); + } + + /** + * Sets the defaults to use for retryable codes and retry params. + * + * If setRetryableCodes is not called, the default retryableCodes provided here will be returned + * from getRetryableCodes. Likewise, if setRetryParams is not called, the default retryParams + * provided here will be returned from getRetryParams. + */ + public void setRetryDefaults(Set retryableCodes, RetryParams retryParams) { + this.defaultRetryableCodes = retryableCodes; + this.retryParams = retryParams; + } + + /** + * Sets the retryable codes. + */ + public ApiCallSettings setRetryableCodes(Set retryableCodes) { + this.retryableCodes = retryableCodes; + return this; + } + + /** + * Sets the retryable codes. + */ + public ApiCallSettings setRetryableCodes(Status.Code... codes) { + this.retryableCodes = Sets.newHashSet(codes); + return this; + } + + /** + * Gets the retryable codes that were set previously, or if they were not, then returns + * the default retryable codes provided previously. + */ + public Set getRetryableCodes() { + if (isRetryableCodesOverridden()) { + return retryableCodes; + } else { + return defaultRetryableCodes; + } + } + + /** + * Returns true if the retryable codes were set. + */ + public boolean isRetryableCodesOverridden() { + return retryableCodes != null; + } + + /** + * Sets the retry params. + */ + public ApiCallSettings setRetryParams(RetryParams retryParams) { + this.retryParams = retryParams; + return this; + } + + /** + * Returns the retry params that were set previously, or if they were not, then returns + * the default retry params provided previously. + */ + public RetryParams getRetryParams() { + if (isRetryParamsOverridden()) { + return retryParams; + } else { + return defaultRetryParams; + } + } + + /** + * Returns true if the retry params were set. + */ + public boolean isRetryParamsOverridden() { + return retryParams != null; + } +} diff --git a/src/main/java/com/google/api/gax/grpc/ApiCallable.java b/src/main/java/com/google/api/gax/grpc/ApiCallable.java index 030b80a3d..bb0f39d72 100644 --- a/src/main/java/com/google/api/gax/grpc/ApiCallable.java +++ b/src/main/java/com/google/api/gax/grpc/ApiCallable.java @@ -31,18 +31,21 @@ package com.google.api.gax.grpc; -import io.grpc.Channel; -import io.grpc.ExperimentalApi; -import io.grpc.MethodDescriptor; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; - import com.google.api.gax.core.RetryParams; +import com.google.auto.value.AutoValue; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import io.grpc.Channel; +import io.grpc.ExperimentalApi; +import io.grpc.ManagedChannel; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; + +import java.io.IOException; import java.util.concurrent.ScheduledExecutorService; /** @@ -221,4 +224,190 @@ public ApiCallable bundling( return new ApiCallable( new BundlingCallable(callable, bundlingDescriptor, bundlerFactory)); } + + /** + * A builder for ApiCallable. + */ + public static class ApiCallableBuilder extends ApiCallSettings { + private final ApiCallable baseCallable; + + /** + * Constructs an instance of ApiCallableBuilder. + * + * @param grpcMethodDescriptor A method descriptor obtained from the generated GRPC + * class. + */ + public ApiCallableBuilder(MethodDescriptor grpcMethodDescriptor) { + this.baseCallable = ApiCallable.create(grpcMethodDescriptor); + } + + /** + * Builds an ApiCallable using the settings provided. + * + * @param serviceLevelSettings Settings provided in serviceLevelSettings override default + * values from this ApiCallableBuilder, but explicitly-set values set on this + * ApiCallableBuilder override both. + */ + public ApiCallable build(ApiCallSettings serviceLevelSettings) + throws IOException { + ApiCallable callable = baseCallable; + + ManagedChannel channel = null; + if (isChannelOverridden()) { + channel = getChannel(); + } else { + channel = serviceLevelSettings.getChannel(); + } + + ScheduledExecutorService executor = null; + if (isExecutorOverridden()) { + executor = getExecutor(); + } else { + executor = serviceLevelSettings.getExecutor(); + } + + ImmutableSet retryableCodes = null; + if (isRetryableCodesOverridden() || !serviceLevelSettings.isRetryableCodesOverridden()) { + retryableCodes = ImmutableSet.copyOf(getRetryableCodes()); + } else { + retryableCodes = ImmutableSet.copyOf(serviceLevelSettings.getRetryableCodes()); + } + if (retryableCodes != null) { + callable = callable.retryableOn(retryableCodes); + } + + RetryParams retryParams = null; + if (isRetryParamsOverridden() || !serviceLevelSettings.isRetryParamsOverridden()) { + retryParams = getRetryParams(); + } else { + retryParams = serviceLevelSettings.getRetryParams(); + } + if (retryParams != null) { + callable = callable.retrying(getRetryParams(), executor); + } + + callable = callable.bind(channel); + + return callable; + } + } + + /** + * A builder for an ApiCallable which presents an Iterable backed by page + * streaming calls to an API method. + */ + public static class PageStreamingApiCallableBuilder + extends ApiCallableBuilder { + private final PageDescriptor pageDescriptor; + + /** + * Constructs an instance of ApiCallableBuilder. + * + * @param grpcMethodDescriptor A method descriptor obtained from the generated GRPC + * class. + * @param pageDescriptor An object which injects and extracts fields related + * to page streaming for a particular API method. + */ + public PageStreamingApiCallableBuilder( + MethodDescriptor grpcMethodDescriptor, + PageDescriptor pageDescriptor) { + super(grpcMethodDescriptor); + this.pageDescriptor = pageDescriptor; + } + + /** + * Builds an ApiCallable with an Iterable response using the settings provided. + * + * @param serviceLevelSettings Settings provided in serviceLevelSettings override default + * values from this ApiCallableBuilder, but explicitly-set values set on this + * ApiCallableBuilder override both. + */ + public ApiCallable> buildPageStreaming( + ApiCallSettings serviceLevelSettings) throws IOException { + return build(serviceLevelSettings).pageStreaming(pageDescriptor); + } + } + + /** + * A pair of an ApiCallable and its associated BundlerFactory. + */ + @AutoValue + public static abstract class BundlableApiCallableInfo { + public static BundlableApiCallableInfo create( + ApiCallable apiCallable, + BundlerFactory bundlerFactory) { + return new AutoValue_ApiCallable_BundlableApiCallableInfo<>(apiCallable, bundlerFactory); + } + + /** + * Returns the ApiCallable. + */ + public abstract ApiCallable getApiCallable(); + + /** + * Returns the BundlerFactory. + */ + public abstract BundlerFactory getBundlerFactory(); + } + + /** + * A builder for an ApiCallable with bundling support. + */ + public static class BundlableApiCallableBuilder + extends ApiCallableBuilder { + private final BundlingDescriptor bundlingDescriptor; + private BundlingSettings bundlingSettings; + + + /** + * Constructs an instance of BundlableApiCallableBuilder. + * + * @param grpcMethodDescriptor A method descriptor obtained from the generated GRPC + * class. + * @param bundlingDescriptor An object which splits and merges requests for the + * purpose of bundling. + */ + public BundlableApiCallableBuilder( + MethodDescriptor grpcMethodDescriptor, + BundlingDescriptor bundlingDescriptor) { + super(grpcMethodDescriptor); + this.bundlingDescriptor = bundlingDescriptor; + this.bundlingSettings = null; + } + + /** + * Provides the bundling settings to use. + */ + public BundlableApiCallableBuilder setBundlingSettings( + BundlingSettings bundlingSettings) { + this.bundlingSettings = bundlingSettings; + return this; + } + + /** + * Returns the bundling settings that have been previously provided. + */ + public BundlingSettings getBundlingSettings() { + return bundlingSettings; + } + + /** + * Builds an ApiCallable which supports bundling, using the settings provided. + * + * @param serviceLevelSettings Settings provided in serviceLevelSettings override default + * values from this ApiCallableBuilder, but explicitly-set values set on this + * ApiCallableBuilder override both. + */ + public BundlableApiCallableInfo buildBundlable( + ApiCallSettings serviceLevelSettings) throws IOException { + ApiCallable callable = build(serviceLevelSettings); + BundlerFactory bundlerFactory = null; + if (bundlingSettings != null) { + bundlerFactory = new BundlerFactory<>(bundlingDescriptor, bundlingSettings); + callable = callable.bundling(bundlingDescriptor, bundlerFactory); + } + return BundlableApiCallableInfo.create(callable, bundlerFactory); + } + } + } diff --git a/src/main/java/com/google/api/gax/grpc/ServiceApiSettings.java b/src/main/java/com/google/api/gax/grpc/ServiceApiSettings.java deleted file mode 100644 index 3046f7b58..000000000 --- a/src/main/java/com/google/api/gax/grpc/ServiceApiSettings.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -package com.google.api.gax.grpc; - -import com.google.api.gax.core.ConnectionSettings; -import com.google.api.gax.core.RetryParams; -import com.google.auto.value.AutoValue; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.MoreExecutors; - -import io.grpc.auth.ClientAuthInterceptor; -import io.grpc.ClientInterceptor; -import io.grpc.ManagedChannel; -import io.grpc.netty.NegotiationType; -import io.grpc.netty.NettyChannelBuilder; -import io.grpc.Status; - -import java.io.IOException; -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.List; - -// TODO(pongad): Don't close the channel if the user gives one to us -/** - * A settings class to configure a service api class. - * - * A note on channels: whichever service API class that this instance of ServiceApiSettings - * is passed to will call shutdown() on the channel provided by {@link getChannel}. - * Setting a channel is intended for use by unit tests to override the channel, - * and should not be used in production. - */ -@AutoValue -public abstract class ServiceApiSettings { - - interface ChannelProvider { - ManagedChannel getChannel(Executor executor) throws IOException; - } - - public static final int DEFAULT_EXECUTOR_THREADS = 4; - private static final ScheduledExecutorService DEFAULT_EXECUTOR = - MoreExecutors.getExitingScheduledExecutorService( - new ScheduledThreadPoolExecutor(DEFAULT_EXECUTOR_THREADS)); - - /** - * Status codes that are considered to be retryable by the given methods - */ - public abstract ImmutableMap> getRetryableCodes(); - - /** - * Retry/backoff configuration for each method - */ - public abstract ImmutableMap getRetryParams(); - - /** - * The executor to be used by the client. - * - * If none is set by the corresponding method in {@link Builder}, - * a default {@link java.util.concurrent.ScheduledThreadPoolExecutor} - * with {@link DEFAULT_EXECUTOR_THREADS} is used. - * The default executor is guaranteed to not prevent JVM from normally exitting, - * but may wait for up to 120 seconds after all non-daemon threads exit to give received tasks - * time to complete. - * If this behavior is not desirable, the user may specify a custom {@code Executor}. - * - * If a custom {@code Executor} is specified by the corresponding method, - * it is up to the user to terminate the {@code Executor} when it is no longer needed. - */ - public abstract ScheduledExecutorService getExecutor(); - - /** - * The channel used to send requests to the service. - * See class documentation on channels. - */ - public ManagedChannel getChannel() throws IOException { - return getChannelProvider().getChannel(getExecutor()); - } - - abstract ChannelProvider getChannelProvider(); - - public static Builder builder() { - return new AutoValue_ServiceApiSettings.Builder() - .setRetryableCodes(ImmutableMap.>of()) - .setRetryParams(ImmutableMap.of()) - .setExecutor(DEFAULT_EXECUTOR); - } - - public Builder toBuilder() { - return new AutoValue_ServiceApiSettings.Builder(this); - } - - @AutoValue.Builder - public abstract static class Builder { - public abstract Builder setRetryableCodes( - ImmutableMap> codes); - - public abstract Builder setRetryParams( - ImmutableMap retryParams); - - public Builder provideChannelWith(final ManagedChannel channel) { - ChannelProvider provider = new ChannelProvider() { - @Override - public ManagedChannel getChannel(Executor executor) { - return channel; - } - }; - return setChannelProvider(provider); - } - - public Builder provideChannelWith(final ConnectionSettings settings) { - ChannelProvider provider = new ChannelProvider() { - @Override - public ManagedChannel getChannel(Executor executor) throws IOException { - List interceptors = Lists.newArrayList(); - interceptors.add(new ClientAuthInterceptor(settings.getCredentials(), executor)); - - return NettyChannelBuilder.forAddress(settings.getServiceAddress(), settings.getPort()) - .negotiationType(NegotiationType.TLS) - .intercept(interceptors) - .build(); - } - }; - return setChannelProvider(provider); - } - - abstract Builder setChannelProvider(ChannelProvider provider); - - public abstract Builder setExecutor(ScheduledExecutorService executor); - - public abstract ServiceApiSettings build(); - } -}