Skip to content

Commit

Permalink
Merge pull request googleapis#36 from shinfan/dev
Browse files Browse the repository at this point in the history
Fix surface issues based on Java review.
  • Loading branch information
shinfan committed Mar 25, 2016
2 parents 60c22b6 + 6290c8a commit 4dcc19c
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 103 deletions.
14 changes: 8 additions & 6 deletions src/main/java/com/google/api/gax/core/BackoffParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,18 @@

import com.google.auto.value.AutoValue;

import org.joda.time.Duration;

/**
* {@code BackoffParams} encapsulates parameters for exponential backoff.
*/
@AutoValue
public abstract class BackoffParams {
public abstract long getInitialDelayMillis();
public abstract Duration getInitialDelay();

public abstract double getDelayMultiplier();

public abstract long getMaxDelayMillis();
public abstract Duration getMaxDelay();

public static Builder newBuilder() {
return new AutoValue_BackoffParams.Builder();
Expand All @@ -54,23 +56,23 @@ public Builder toBuilder() {

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setInitialDelayMillis(long initialDelayMillis);
public abstract Builder setInitialDelay(Duration initialDelayDuration);

public abstract Builder setDelayMultiplier(double delayMultiplier);

public abstract Builder setMaxDelayMillis(long maxDelayMillis);
public abstract Builder setMaxDelay(Duration maxDelayDuration);

abstract BackoffParams autoBuild();

public BackoffParams build() {
BackoffParams backoff = autoBuild();
if (backoff.getInitialDelayMillis() < 0) {
if (backoff.getInitialDelay().getMillis() < 0) {
throw new IllegalStateException("initial delay must not be negative");
}
if (backoff.getDelayMultiplier() < 1.0) {
throw new IllegalStateException("delay multiplier must be at least 1");
}
if (backoff.getMaxDelayMillis() < backoff.getInitialDelayMillis()) {
if (backoff.getMaxDelay().compareTo(backoff.getInitialDelay()) < 0) {
throw new IllegalStateException("max delay must not be smaller than initial delay");
}
return backoff;
Expand Down
10 changes: 7 additions & 3 deletions src/main/java/com/google/api/gax/core/RetryParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@

import com.google.auto.value.AutoValue;

import org.joda.time.Duration;

import java.util.concurrent.ScheduledExecutorService;

/**
* {@code RetryParams} encapsulates a retry strategy used by
* {@link com.google.api.gax.grpc.ApiCallable#retrying(RetryParams, ScheduledExecutorService)}.
Expand All @@ -43,7 +47,7 @@ public abstract class RetryParams {

public abstract BackoffParams getTimeoutBackoff();

public abstract long getTotalTimeout();
public abstract Duration getTotalTimeout();

public static Builder newBuilder() {
return new AutoValue_RetryParams.Builder();
Expand All @@ -59,13 +63,13 @@ public abstract static class Builder {

public abstract Builder setTimeoutBackoff(BackoffParams timeoutBackoff);

public abstract Builder setTotalTimeout(long totalTimeout);
public abstract Builder setTotalTimeout(Duration totalTimeout);

abstract RetryParams autoBuild();

public RetryParams build() {
RetryParams params = autoBuild();
if (params.getTotalTimeout() < 0) {
if (params.getTotalTimeout().getMillis() < 0) {
throw new IllegalStateException("total timeout must not be negative");
}
return params;
Expand Down
43 changes: 0 additions & 43 deletions src/main/java/com/google/api/gax/grpc/ApiCallable.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.google.api.gax.core.RetryParams;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;

Expand All @@ -43,7 +42,6 @@
import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;

import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -110,47 +108,6 @@ public ResponseT call(RequestT request) {
return Futures.getUnchecked(futureCall(request));
}

/**
* Perform a call asynchronously with the given {@code observer}.
* If the {@link io.grpc.Channel} encapsulated in the given
* {@link com.google.api.gax.grpc.CallContext} is null, a channel must have already been bound,
* using {@link #bind(Channel)}.
*
* @param context {@link com.google.api.gax.grpc.CallContext} to make the call with
* @param observer Observer to interact with the result
*/
public void asyncCall(CallContext<RequestT> context, final StreamObserver<ResponseT> observer) {
Futures.addCallback(
futureCall(context),
new FutureCallback<ResponseT>() {
@Override
public void onFailure(Throwable t) {
if (observer != null) {
observer.onError(t);
}
}

@Override
public void onSuccess(ResponseT result) {
if (observer != null) {
observer.onNext(result);
observer.onCompleted();
}
}
});
}

/**
* Same as {@link #asyncCall(CallContext, StreamObserver)}, with null {@link io.grpc.Channel} and
* default {@link io.grpc.CallOptions}.
*
* @param request request
* @param observer Observer to interact with the result
*/
public void asyncCall(RequestT request, StreamObserver<ResponseT> observer) {
asyncCall(CallContext.<RequestT>of(request), observer);
}

/**
* Creates a callable which can execute the described gRPC method.
*/
Expand Down
44 changes: 26 additions & 18 deletions src/main/java/com/google/api/gax/grpc/RetryingCallable.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;

import org.joda.time.Duration;

import io.grpc.CallOptions;
import io.grpc.Status;

Expand All @@ -63,38 +65,40 @@ class RetryingCallable<RequestT, ResponseT> implements FutureCallable<RequestT,
this.executor = executor;
}

@Override
public ListenableFuture<ResponseT> futureCall(CallContext<RequestT> context) {
SettableFuture<ResponseT> result = SettableFuture.<ResponseT>create();
context =
getCallContextWithDeadlineAfter(
context, retryParams.getTotalTimeout(), TimeUnit.MILLISECONDS);
context, retryParams.getTotalTimeout());
Retryer retryer =
new Retryer(
context,
result,
retryParams.getRetryBackoff().getInitialDelayMillis(),
retryParams.getTimeoutBackoff().getInitialDelayMillis(),
retryParams.getRetryBackoff().getInitialDelay(),
retryParams.getTimeoutBackoff().getInitialDelay(),
null);
retryer.run();
return result;
}

@Override
public String toString() {
return String.format("retrying(%s)", callable);
}

private class Retryer implements Runnable {
private final CallContext<RequestT> context;
private final SettableFuture<ResponseT> result;
private final long retryDelay;
private final long rpcTimeout;
private final Duration retryDelay;
private final Duration rpcTimeout;
private final Throwable savedThrowable;

private Retryer(
CallContext<RequestT> context,
SettableFuture<ResponseT> result,
long retryDelay,
long rpcTimeout,
Duration retryDelay,
Duration rpcTimeout,
Throwable savedThrowable) {
this.context = context;
this.result = result;
Expand All @@ -103,6 +107,7 @@ private Retryer(
this.savedThrowable = savedThrowable;
}

@Override
public void run() {
if (context.getCallOptions().getDeadlineNanoTime() < System.nanoTime()) {
if (savedThrowable == null) {
Expand All @@ -116,7 +121,7 @@ public void run() {
return;
}
CallContext<RequestT> deadlineContext =
getCallContextWithDeadlineAfter(context, rpcTimeout, TimeUnit.MILLISECONDS);
getCallContextWithDeadlineAfter(context, rpcTimeout);
Futures.addCallback(
callable.futureCall(deadlineContext),
new FutureCallback<ResponseT>() {
Expand All @@ -132,29 +137,32 @@ public void onFailure(Throwable throwable) {
return;
}
long newRetryDelay =
(long) (retryDelay * retryParams.getRetryBackoff().getDelayMultiplier());
(long) (retryDelay.getMillis() *
retryParams.getRetryBackoff().getDelayMultiplier());
newRetryDelay =
Math.min(newRetryDelay, retryParams.getRetryBackoff().getMaxDelayMillis());
Math.min(newRetryDelay,
retryParams.getRetryBackoff().getMaxDelay().getMillis());

long newRpcTimeout =
(long) (rpcTimeout * retryParams.getTimeoutBackoff().getDelayMultiplier());
(long) (rpcTimeout.getMillis() *
retryParams.getTimeoutBackoff().getDelayMultiplier());
newRpcTimeout =
Math.min(newRpcTimeout, retryParams.getTimeoutBackoff().getMaxDelayMillis());

long randomRetryDelay = ThreadLocalRandom.current().nextLong(retryDelay);
Math.min(newRpcTimeout,
retryParams.getTimeoutBackoff().getMaxDelay().getMillis());

Retryer retryer =
new Retryer(context, result, newRetryDelay, newRpcTimeout, throwable);
long randomRetryDelay = ThreadLocalRandom.current().nextLong(retryDelay.getMillis());
Retryer retryer = new Retryer(context, result,
Duration.millis(newRetryDelay), Duration.millis(newRpcTimeout), throwable);
executor.schedule(retryer, randomRetryDelay, TimeUnit.MILLISECONDS);
}
});
}
}

private static <T> CallContext<T> getCallContextWithDeadlineAfter(
CallContext<T> oldCtx, long duration, TimeUnit unit) {
CallContext<T> oldCtx, Duration rpcTimeout) {
CallOptions oldOpt = oldCtx.getCallOptions();
CallOptions newOpt = oldOpt.withDeadlineAfter(duration, unit);
CallOptions newOpt = oldOpt.withDeadlineAfter(rpcTimeout.getMillis(), TimeUnit.MILLISECONDS);
CallContext<T> newCtx = oldCtx.withCallOptions(newOpt);

if (oldOpt.getDeadlineNanoTime() == null) {
Expand Down
36 changes: 20 additions & 16 deletions src/main/java/com/google/api/gax/grpc/ServiceApiSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class ServiceApiSettings {
private String serviceGeneratorVersion;
private ChannelProvider channelProvider;
private ExecutorProvider executorProvider;
private final ImmutableList<? extends ApiCallSettings> allMethods;
private final ImmutableList<? extends ApiCallSettings> allCallSettings;

/**
* The number of threads to use with the default executor.
Expand All @@ -54,7 +54,7 @@ public class ServiceApiSettings {
/**
* Constructs an instance of ServiceApiSettings.
*/
public ServiceApiSettings(ImmutableList<? extends ApiCallSettings> allMethods) {
public ServiceApiSettings(ImmutableList<? extends ApiCallSettings> allCallSettings) {
clientLibName = DEFAULT_CLIENT_LIB_NAME;
clientLibVersion = DEFAULT_VERSION;
serviceGeneratorName = DEFAULT_GENERATOR_NAME;
Expand Down Expand Up @@ -82,7 +82,7 @@ public ScheduledExecutorService getExecutor() {
return executor;
}
};
this.allMethods = allMethods;
this.allCallSettings = allCallSettings;
}

private interface ChannelProvider {
Expand All @@ -96,15 +96,16 @@ private interface ChannelProvider {
*
* See class documentation for more details on channels.
*/
public ServiceApiSettings provideChannelWith(final ManagedChannel channel) {
public ServiceApiSettings provideChannelWith(
final ManagedChannel channel, final boolean shouldAutoClose) {
channelProvider = new ChannelProvider() {
@Override
public ManagedChannel getChannel(Executor executor) {
return channel;
}
@Override
public boolean shouldAutoClose() {
return false;
return shouldAutoClose;
}
};
return this;
Expand All @@ -113,7 +114,8 @@ public boolean shouldAutoClose() {
/**
* Provides the connection settings necessary to create a channel.
*/
public ServiceApiSettings provideChannelWith(final ConnectionSettings settings) {
public ServiceApiSettings provideChannelWith(
final ConnectionSettings settings,final boolean shouldAutoClose) {
channelProvider = new ChannelProvider() {
private ManagedChannel channel = null;
@Override
Expand All @@ -135,7 +137,7 @@ public ManagedChannel getChannel(Executor executor) throws IOException {

@Override
public boolean shouldAutoClose() {
return true;
return shouldAutoClose;
}

private String serviceHeader() {
Expand Down Expand Up @@ -208,10 +210,10 @@ public ScheduledExecutorService getExecutor() {
}

/**
* Returns all of the methods of this API, which can be individually configured.
* Returns all of the API call settings of this API, which can be individually configured.
*/
public ImmutableList<? extends ApiCallSettings> allMethods() {
return allMethods;
public ImmutableList<? extends ApiCallSettings> allCallSettings() {
return allCallSettings;
}

/**
Expand All @@ -225,8 +227,8 @@ public ServiceApiSettings setRetryableCodesOnAllMethods(Status.Code... codes) {
* Sets the retry codes on all of the methods of the API.
*/
public ServiceApiSettings setRetryableCodesOnAllMethods(Set<Status.Code> retryableCodes) {
for (ApiCallSettings method : allMethods) {
method.setRetryableCodes(retryableCodes);
for (ApiCallSettings settings : allCallSettings) {
settings.setRetryableCodes(retryableCodes);
}
return this;
}
Expand All @@ -235,25 +237,27 @@ public ServiceApiSettings setRetryableCodesOnAllMethods(Set<Status.Code> retryab
* Sets the retry params for all of the methods of the API.
*/
public ServiceApiSettings setRetryParamsOnAllMethods(RetryParams retryParams) {
for (ApiCallSettings method : allMethods) {
method.setRetryParams(retryParams);
for (ApiCallSettings settings : allCallSettings) {
settings.setRetryParams(retryParams);
}
return this;
}

/**
* Sets the generator name and version for the GRPC custom header.
*/
public void setGeneratorHeader(String name, String version) {
public ServiceApiSettings setGeneratorHeader(String name, String version) {
this.serviceGeneratorName = name;
this.serviceGeneratorVersion = version;
return this;
}

/**
* Sets the client library name and version for the GRPC custom header.
*/
public void setClientLibHeader(String name, String version) {
public ServiceApiSettings setClientLibHeader(String name, String version) {
this.clientLibName = name;
this.clientLibVersion = version;
return this;
}
}
Loading

0 comments on commit 4dcc19c

Please sign in to comment.