diff --git a/src/main/java/com/google/api/gax/grpc/NanoClock.java b/src/main/java/com/google/api/gax/core/ApiClock.java similarity index 88% rename from src/main/java/com/google/api/gax/grpc/NanoClock.java rename to src/main/java/com/google/api/gax/core/ApiClock.java index 6f62b42de..da84d14b9 100644 --- a/src/main/java/com/google/api/gax/grpc/NanoClock.java +++ b/src/main/java/com/google/api/gax/core/ApiClock.java @@ -27,20 +27,25 @@ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -package com.google.api.gax.grpc; +package com.google.api.gax.core; /** * An interface for getting the current value of a high-resolution time source, in nanoseconds. * - * Clocks other than DefaultNanoClock are typically used only for testing. + * Clocks other than {@link NanoClock} are typically used only for testing. * * This interface is required in addition to Java 8's Clock, because nanoTime is required to compare * values with io.grpc.CallOptions.getDeadlineNanoTime(). */ -public interface NanoClock { +public interface ApiClock { /** * Returns the current value of this clock's high-resolution time source, in nanoseconds. */ long nanoTime(); + + /** + * Returns the current value of this clock's high-resolution time source, in milliseconds. + */ + long millisTime(); } diff --git a/src/main/java/com/google/api/gax/core/CurrentMillisClock.java b/src/main/java/com/google/api/gax/core/CurrentMillisClock.java new file mode 100644 index 000000000..219f24c64 --- /dev/null +++ b/src/main/java/com/google/api/gax/core/CurrentMillisClock.java @@ -0,0 +1,65 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.google.api.gax.core; + +import java.io.ObjectStreamException; +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +/** + * Implementation of the {@link ApiClock} interface, which uses {@link System#currentTimeMillis()} + * as time source. + */ +public final class CurrentMillisClock implements ApiClock, Serializable { + + private static final long serialVersionUID = -6019259882852183285L; + private static final ApiClock DEFAULT_CLOCK = new CurrentMillisClock(); + + public static ApiClock getDefaultClock() { + return DEFAULT_CLOCK; + } + + private CurrentMillisClock() {} + + @Override + public long nanoTime() { + return TimeUnit.NANOSECONDS.convert(millisTime(), TimeUnit.MILLISECONDS); + } + + @Override + public long millisTime() { + return System.currentTimeMillis(); + } + + private Object readResolve() throws ObjectStreamException { + return DEFAULT_CLOCK; + } +} diff --git a/src/main/java/com/google/api/gax/grpc/DefaultNanoClock.java b/src/main/java/com/google/api/gax/core/NanoClock.java similarity index 68% rename from src/main/java/com/google/api/gax/grpc/DefaultNanoClock.java rename to src/main/java/com/google/api/gax/core/NanoClock.java index d0d308333..4de6f47c0 100644 --- a/src/main/java/com/google/api/gax/grpc/DefaultNanoClock.java +++ b/src/main/java/com/google/api/gax/core/NanoClock.java @@ -27,20 +27,35 @@ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -package com.google.api.gax.grpc; +package com.google.api.gax.core; -/** - * Default implementation of the NanoClock interface, using call to System.nanoTime(). - */ -public final class DefaultNanoClock implements NanoClock { - public static NanoClock create() { - return new DefaultNanoClock(); +import java.io.ObjectStreamException; +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +/** Default implementation of the ApiClock interface, using call to System.nanoTime(). */ +public final class NanoClock implements ApiClock, Serializable { + + private static final ApiClock DEFAULT_CLOCK = new NanoClock(); + private static final long serialVersionUID = 5541462688633944865L; + + public static ApiClock getDefaultClock() { + return DEFAULT_CLOCK; } - private DefaultNanoClock() {} + private NanoClock() {} @Override public final long nanoTime() { return System.nanoTime(); } + + @Override + public final long millisTime() { + return TimeUnit.MILLISECONDS.convert(nanoTime(), TimeUnit.NANOSECONDS); + } + + private Object readResolve() throws ObjectStreamException { + return DEFAULT_CLOCK; + } } diff --git a/src/main/java/com/google/api/gax/core/RetrySettings.java b/src/main/java/com/google/api/gax/core/RetrySettings.java index d9406b88e..0d219da73 100644 --- a/src/main/java/com/google/api/gax/core/RetrySettings.java +++ b/src/main/java/com/google/api/gax/core/RetrySettings.java @@ -30,6 +30,7 @@ package com.google.api.gax.core; import com.google.auto.value.AutoValue; +import java.io.Serializable; import org.joda.time.Duration; /** @@ -64,7 +65,9 @@ * is computed which will terminate the call when the total time is up. */ @AutoValue -public abstract class RetrySettings { +public abstract class RetrySettings implements Serializable { + + private static final long serialVersionUID = 8258475264439710899L; /** * The TotalTimeout parameter has ultimate control over how long the logic should keep trying the @@ -91,6 +94,13 @@ public abstract class RetrySettings { */ public abstract Duration getMaxRetryDelay(); + /** + * MaxAttempts defines the maximum number of attempts to perform. The default value is 0. If this + * value is greater than 0, and the number of attempts reaches this limit, the logic will give up + * retrying even if the total retry time is still lower than TotalTimeout. + */ + public abstract int getMaxAttempts(); + /** * The InitialRpcTimeout parameter controls the timeout for the initial RPC. Subsequent calls will * use this value adjusted according to the RpcTimeoutMultiplier. @@ -110,7 +120,7 @@ public abstract class RetrySettings { public abstract Duration getMaxRpcTimeout(); public static Builder newBuilder() { - return new AutoValue_RetrySettings.Builder(); + return new AutoValue_RetrySettings.Builder().setMaxAttempts(0); } public Builder toBuilder() { @@ -150,6 +160,13 @@ public abstract static class Builder { */ public abstract Builder setMaxRetryDelay(Duration maxDelay); + /** + * MaxAttempts defines the maximum number of attempts to perform. If number of attempts reaches + * this limit the logic will give up retrying even if the total retry time is still lower than + * TotalTimeout. + */ + public abstract Builder setMaxAttempts(int maxAttempts); + /** * The InitialRpcTimeout parameter controls the timeout for the initial RPC. Subsequent calls * will use this value adjusted according to the RpcTimeoutMultiplier. @@ -188,6 +205,13 @@ public abstract static class Builder { */ public abstract double getRetryDelayMultiplier(); + /** + * MaxAttempts defines the maximum number of attempts to perform. If the number of attempts + * reaches this limit, the logic will give up retrying even if the total retry time is still + * lower than TotalTimeout. + */ + public abstract int getMaxAttempts(); + /** * The MaxRetryDelay puts a limit on the value of the retry delay, so that the * RetryDelayMultiplier can't increase the retry delay higher than this amount. @@ -228,6 +252,9 @@ public RetrySettings build() { if (params.getMaxRetryDelay().compareTo(params.getInitialRetryDelay()) < 0) { throw new IllegalStateException("max retry delay must not be shorter than initial delay"); } + if (params.getMaxAttempts() < 0) { + throw new IllegalStateException("max attempts must be non-negative"); + } if (params.getInitialRpcTimeout().getMillis() < 0) { throw new IllegalStateException("initial rpc timeout must not be negative"); } @@ -253,6 +280,7 @@ public RetrySettings.Builder merge(RetrySettings.Builder newSettings) { if (newSettings.getMaxRetryDelay() != null) { setMaxRetryDelay(newSettings.getMaxRetryDelay()); } + setMaxAttempts(newSettings.getMaxAttempts()); if (newSettings.getInitialRpcTimeout() != null) { setInitialRpcTimeout(newSettings.getInitialRpcTimeout()); } diff --git a/src/main/java/com/google/api/gax/core/internal/ApiFutureToListenableFuture.java b/src/main/java/com/google/api/gax/core/internal/ApiFutureToListenableFuture.java index 88e07d254..dd9b29478 100644 --- a/src/main/java/com/google/api/gax/core/internal/ApiFutureToListenableFuture.java +++ b/src/main/java/com/google/api/gax/core/internal/ApiFutureToListenableFuture.java @@ -42,7 +42,7 @@ */ @ExperimentalApi public class ApiFutureToListenableFuture implements ListenableFuture { - private ApiFuture apiFuture; + private final ApiFuture apiFuture; public ApiFutureToListenableFuture(ApiFuture apiFuture) { this.apiFuture = apiFuture; diff --git a/src/main/java/com/google/api/gax/grpc/RetryingCallable.java b/src/main/java/com/google/api/gax/grpc/RetryingCallable.java index 811f0abd1..1de9bd5af 100644 --- a/src/main/java/com/google/api/gax/grpc/RetryingCallable.java +++ b/src/main/java/com/google/api/gax/grpc/RetryingCallable.java @@ -30,59 +30,60 @@ package com.google.api.gax.grpc; import com.google.api.gax.core.AbstractApiFuture; +import com.google.api.gax.core.ApiClock; import com.google.api.gax.core.ApiFuture; -import com.google.api.gax.core.ApiFutureCallback; import com.google.api.gax.core.ApiFutures; import com.google.api.gax.core.RetrySettings; +import com.google.api.gax.retrying.ExceptionRetryAlgorithm; +import com.google.api.gax.retrying.ExponentialRetryAlgorithm; +import com.google.api.gax.retrying.RetryAlgorithm; +import com.google.api.gax.retrying.RetryingExecutor; +import com.google.api.gax.retrying.RetryingFuture; +import com.google.api.gax.retrying.ScheduledRetryingExecutor; +import com.google.api.gax.retrying.TimedAttemptSettings; import com.google.common.base.Preconditions; import io.grpc.CallOptions; -import io.grpc.Status; -import java.util.concurrent.CancellationException; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadLocalRandom; +import io.grpc.Status.Code; +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; import org.joda.time.Duration; /** - * Implements the retry and timeout functionality used in {@link UnaryCallable}. The behavior is - * controlled by the given {@link RetrySettings}. + * Implements the retry and timeout functionality used in {@link UnaryCallable}. + * + *

+ * The behavior is controlled by the given {@link RetrySettings}. */ class RetryingCallable implements FutureCallable { - // Duration to sleep on if the error is DEADLINE_EXCEEDED. static final Duration DEADLINE_SLEEP_DURATION = Duration.millis(1); private final FutureCallable callable; - private final RetrySettings retryParams; - private final UnaryCallable.Scheduler executor; - private final NanoClock clock; + private final RetryingExecutor scheduler; RetryingCallable( FutureCallable callable, RetrySettings retrySettings, - UnaryCallable.Scheduler executor, - NanoClock clock) { + ScheduledExecutorService scheduler, + ApiClock clock) { this.callable = Preconditions.checkNotNull(callable); - this.retryParams = Preconditions.checkNotNull(retrySettings); - this.executor = executor; - this.clock = clock; + RetryAlgorithm retryAlgorithm = + new RetryAlgorithm( + new GrpcExceptionRetryAlgorithm(), new ExponentialRetryAlgorithm(retrySettings, clock)); + this.scheduler = new ScheduledRetryingExecutor<>(retryAlgorithm, scheduler); } @Override public ApiFuture futureCall(RequestT request, CallContext context) { - RetryingResultFuture resultFuture = new RetryingResultFuture(); - context = getCallContextWithDeadlineAfter(context, retryParams.getTotalTimeout()); - Retryer retryer = - new Retryer( - request, - context, - resultFuture, - retryParams.getInitialRetryDelay(), - retryParams.getInitialRpcTimeout(), - null); - resultFuture.issueCall(request, context, retryer); - return resultFuture; + GrpcRetryCallable retryCallable = + new GrpcRetryCallable<>(callable, request, context); + + RetryingFuture retryingFuture = scheduler.createFuture(retryCallable); + retryCallable.setExternalFuture(retryingFuture); + retryCallable.call(); + + return retryingFuture; } @Override @@ -90,165 +91,89 @@ public String toString() { return String.format("retrying(%s)", callable); } - private class Retryer implements Runnable, ApiFutureCallback { - private final RequestT request; - private final CallContext context; - private final RetryingResultFuture resultFuture; - private final Duration retryDelay; - private final Duration rpcTimeout; - private final Throwable savedThrowable; - - private Retryer( - RequestT request, - CallContext context, - RetryingResultFuture resultFuture, - Duration retryDelay, - Duration rpcTimeout, - Throwable savedThrowable) { - this.request = request; - this.context = context; - this.resultFuture = resultFuture; - this.retryDelay = retryDelay; - this.rpcTimeout = rpcTimeout; - this.savedThrowable = savedThrowable; - } - - @Override - public void run() { - if (context.getCallOptions().getDeadlineNanoTime() < clock.nanoTime()) { - if (savedThrowable == null) { - resultFuture.setException( - Status.DEADLINE_EXCEEDED - .withDescription("Total deadline exceeded without completing any call") - .asException()); - } else { - resultFuture.setException(savedThrowable); - } - return; - } - CallContext deadlineContext = getCallContextWithDeadlineAfter(context, rpcTimeout); - resultFuture.issueCall(request, deadlineContext, this); - } - - @Override - public void onSuccess(ResponseT r) { - resultFuture.set(r); + private static CallContext getCallContextWithDeadlineAfter( + CallContext oldContext, Duration rpcTimeout) { + CallOptions oldOptions = oldContext.getCallOptions(); + CallOptions newOptions = + oldOptions.withDeadlineAfter(rpcTimeout.getMillis(), TimeUnit.MILLISECONDS); + CallContext newContext = oldContext.withCallOptions(newOptions); + + if (oldOptions.getDeadlineNanoTime() == null) { + return newContext; } - - @Override - public void onFailure(Throwable throwable) { - if (!canRetry(throwable)) { - resultFuture.setException(throwable); - return; - } - if (isDeadlineExceeded(throwable)) { - Retryer retryer = - new Retryer(request, context, resultFuture, retryDelay, rpcTimeout, throwable); - resultFuture.scheduleNext( - executor, retryer, DEADLINE_SLEEP_DURATION.getMillis(), TimeUnit.MILLISECONDS); - return; - } - - long newRetryDelay = (long) (retryDelay.getMillis() * retryParams.getRetryDelayMultiplier()); - newRetryDelay = Math.min(newRetryDelay, retryParams.getMaxRetryDelay().getMillis()); - - long newRpcTimeout = (long) (rpcTimeout.getMillis() * retryParams.getRpcTimeoutMultiplier()); - newRpcTimeout = Math.min(newRpcTimeout, retryParams.getMaxRpcTimeout().getMillis()); - - long randomRetryDelay = ThreadLocalRandom.current().nextLong(retryDelay.getMillis()); - Retryer retryer = - new Retryer( - request, - context, - resultFuture, - Duration.millis(newRetryDelay), - Duration.millis(newRpcTimeout), - throwable); - resultFuture.scheduleNext(executor, retryer, randomRetryDelay, TimeUnit.MILLISECONDS); + if (oldOptions.getDeadlineNanoTime() < newOptions.getDeadlineNanoTime()) { + return oldContext; } + return newContext; } - private class RetryingResultFuture extends AbstractApiFuture { - private volatile Future activeFuture = null; - private final Object syncObject = new Object(); + private static class GrpcRetryCallable implements Callable { + private final FutureCallable callable; + private final RequestT request; - @Override - protected void interruptTask() { - synchronized (syncObject) { - activeFuture.cancel(true); - } + private volatile RetryingFuture externalFuture; + private volatile CallContext callContext; + + private GrpcRetryCallable( + FutureCallable callable, RequestT request, CallContext callContext) { + this.callable = callable; + this.request = request; + this.callContext = callContext; } - @Override - public boolean set(@Nullable ResponseT value) { - synchronized (syncObject) { - return super.set(value); - } + private void setExternalFuture(RetryingFuture externalFuture) { + this.externalFuture = externalFuture; } @Override - public boolean setException(Throwable throwable) { - synchronized (syncObject) { - if (throwable instanceof CancellationException) { - super.cancel(false); - return true; - } else { - return super.setException(throwable); + public ResponseT call() { + callContext = + getCallContextWithDeadlineAfter( + callContext, externalFuture.getAttemptSettings().getRpcTimeout()); + + try { + externalFuture.setAttemptFuture(new NonCancelableFuture()); + if (externalFuture.isDone()) { + return null; } + ApiFuture internalFuture = callable.futureCall(request, callContext); + externalFuture.setAttemptFuture(internalFuture); + } catch (Throwable e) { + externalFuture.setAttemptFuture(ApiFutures.immediateFailedFuture(e)); + throw e; } - } - private void scheduleNext( - UnaryCallable.Scheduler executor, Runnable retryer, long delay, TimeUnit unit) { - synchronized (syncObject) { - if (!isCancelled()) { - activeFuture = executor.schedule(retryer, delay, TimeUnit.MILLISECONDS); - } - } - } - - public void issueCall( - RequestT request, - CallContext deadlineContext, - RetryingCallable.Retryer retryer) { - synchronized (syncObject) { - if (!isCancelled()) { - ApiFuture callFuture = callable.futureCall(request, deadlineContext); - ApiFutures.addCallback(callFuture, retryer); - activeFuture = callFuture; - } - } + return null; } } - private static CallContext getCallContextWithDeadlineAfter( - CallContext oldCtx, Duration rpcTimeout) { - CallOptions oldOpt = oldCtx.getCallOptions(); - CallOptions newOpt = oldOpt.withDeadlineAfter(rpcTimeout.getMillis(), TimeUnit.MILLISECONDS); - CallContext newCtx = oldCtx.withCallOptions(newOpt); - - if (oldOpt.getDeadlineNanoTime() == null) { - return newCtx; - } - if (oldOpt.getDeadlineNanoTime() < newOpt.getDeadlineNanoTime()) { - return oldCtx; + private static class GrpcExceptionRetryAlgorithm implements ExceptionRetryAlgorithm { + @Override + public TimedAttemptSettings createNextAttempt( + Throwable prevThrowable, TimedAttemptSettings prevSettings) { + if (((ApiException) prevThrowable).getStatusCode() == Code.DEADLINE_EXCEEDED) { + return new TimedAttemptSettings( + prevSettings.getGlobalSettings(), + prevSettings.getRetryDelay(), + prevSettings.getRpcTimeout(), + DEADLINE_SLEEP_DURATION, + prevSettings.getAttemptCount() + 1, + prevSettings.getFirstAttemptStartTime()); + } + return null; } - return newCtx; - } - private static boolean canRetry(Throwable throwable) { - if (!(throwable instanceof ApiException)) { - return false; + @Override + public boolean accept(Throwable prevThrowable) { + return (prevThrowable instanceof ApiException) + && ((ApiException) prevThrowable).isRetryable(); } - ApiException apiException = (ApiException) throwable; - return apiException.isRetryable(); } - private static boolean isDeadlineExceeded(Throwable throwable) { - if (!(throwable instanceof ApiException)) { + private static class NonCancelableFuture extends AbstractApiFuture { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { return false; } - ApiException apiException = (ApiException) throwable; - return apiException.getStatusCode() == Status.Code.DEADLINE_EXCEEDED; } } diff --git a/src/main/java/com/google/api/gax/grpc/UnaryCallable.java b/src/main/java/com/google/api/gax/grpc/UnaryCallable.java index 5ccf850c6..561adb085 100644 --- a/src/main/java/com/google/api/gax/grpc/UnaryCallable.java +++ b/src/main/java/com/google/api/gax/grpc/UnaryCallable.java @@ -30,7 +30,9 @@ package com.google.api.gax.grpc; import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.core.ApiClock; import com.google.api.gax.core.ApiFuture; +import com.google.api.gax.core.NanoClock; import com.google.api.gax.core.RetrySettings; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -38,10 +40,7 @@ import com.google.common.util.concurrent.UncheckedExecutionException; import io.grpc.Channel; import io.grpc.Status; -import java.util.List; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; /** @@ -98,30 +97,6 @@ */ public final class UnaryCallable { - interface Scheduler { - ScheduledFuture schedule(Runnable runnable, long delay, TimeUnit unit); - - List shutdownNow(); - } - - static class DelegatingScheduler implements Scheduler { - private final ScheduledExecutorService executor; - - DelegatingScheduler(ScheduledExecutorService executor) { - this.executor = executor; - } - - @Override - public ScheduledFuture schedule(Runnable runnable, long delay, TimeUnit unit) { - return executor.schedule(runnable, delay, unit); - } - - @Override - public List shutdownNow() { - return executor.shutdownNow(); - } - } - private final FutureCallable callable; private final Channel channel; @Nullable private final UnaryCallSettings settings; @@ -324,7 +299,7 @@ UnaryCallable retryableOn(ImmutableSet retryab */ UnaryCallable retrying( RetrySettings retrySettings, ScheduledExecutorService executor) { - return retrying(retrySettings, new DelegatingScheduler(executor), DefaultNanoClock.create()); + return retrying(retrySettings, executor, NanoClock.getDefaultClock()); } /** @@ -337,7 +312,7 @@ UnaryCallable retrying( */ @VisibleForTesting UnaryCallable retrying( - RetrySettings retrySettings, Scheduler executor, NanoClock clock) { + RetrySettings retrySettings, ScheduledExecutorService executor, ApiClock clock) { return new UnaryCallable<>( new RetryingCallable<>(callable, retrySettings, executor, clock), channel, settings); } diff --git a/src/main/java/com/google/api/gax/retrying/DirectRetryingExecutor.java b/src/main/java/com/google/api/gax/retrying/DirectRetryingExecutor.java new file mode 100644 index 000000000..8bbc33c70 --- /dev/null +++ b/src/main/java/com/google/api/gax/retrying/DirectRetryingExecutor.java @@ -0,0 +1,102 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.retrying; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.gax.core.internal.ListenableFutureToApiFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import java.io.InterruptedIOException; +import java.nio.channels.ClosedByInterruptException; +import java.util.concurrent.Callable; +import org.joda.time.Duration; + +/** + * The retry executor which executes attempts in the current thread, potentially causing the current + * thread to sleep for the specified amount of time before execution. + * + *

+ * This class is thread-safe. + * + * @param response type + */ +public class DirectRetryingExecutor implements RetryingExecutor { + + private final RetryAlgorithm retryAlgorithm; + + /** + * Creates a new direct retrying executor instance, which will be using {@code retryAlgorithm} to + * determine retrying strategy. + * + * @param retryAlgorithm retry algorithm to use for attempts execution + * @throws NullPointerException if {@code retryAlgorithm} is null + */ + public DirectRetryingExecutor(RetryAlgorithm retryAlgorithm) { + this.retryAlgorithm = checkNotNull(retryAlgorithm); + } + + /** + * Creates a {@link RetryingFuture}, which is a facade, returned to the client code to wait for + * any retriable operation to complete. The future is bounded to {@code this} executor instance. + * + * @param callable the actual callable, which should be executed in a retriable context + * @return retrying future facade + */ + @Override + public RetryingFuture createFuture(Callable callable) { + return new RetryingFutureImpl<>(callable, retryAlgorithm, this); + } + + /** + * Submits an attempt for execution in the current thread, causing the current thread to sleep for + * the specified by the {@link RetryingFuture#getAttemptSettings()} amount of time. + * + * @param retryingFuture the future previously returned by {@link #createFuture(Callable)} + */ + @Override + public void submit(RetryingFuture retryingFuture) { + ListenableFuture attemptFuture; + try { + Duration delay = retryingFuture.getAttemptSettings().getRandomizedRetryDelay(); + if (Duration.ZERO.compareTo(delay) < 0) { + Thread.sleep(delay.getMillis()); + } + attemptFuture = Futures.immediateFuture(retryingFuture.getCallable().call()); + } catch (InterruptedException | InterruptedIOException | ClosedByInterruptException e) { + Thread.currentThread().interrupt(); + attemptFuture = Futures.immediateFailedFuture(e); + } catch (Throwable e) { + attemptFuture = Futures.immediateFailedFuture(e); + } + + retryingFuture.setAttemptFuture(new ListenableFutureToApiFuture<>(attemptFuture)); + } +} diff --git a/src/main/java/com/google/api/gax/retrying/ExceptionRetryAlgorithm.java b/src/main/java/com/google/api/gax/retrying/ExceptionRetryAlgorithm.java new file mode 100644 index 000000000..d6b443cab --- /dev/null +++ b/src/main/java/com/google/api/gax/retrying/ExceptionRetryAlgorithm.java @@ -0,0 +1,61 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.retrying; + +/** + * An exception retry algorithm is responsible for the following operations: + * + *

    + *
  1. Accepting or rejecting a task for retry depending on the exception thrown by the previous + * attempt. + *
  2. Creating {@link TimedAttemptSettings} for each subsequent retry attempt. + *
+ * + * Implementations of this interface must be thread-safe. + */ +public interface ExceptionRetryAlgorithm { + /** + * Creates a next attempt {@link TimedAttemptSettings}. + * + * @param prevThrowable exception thrown by the previous attempt + * @param prevSettings previous attempt settings + * @return next attempt settings or {@code null}, if the implementing algorithm does not provide + * specific settings for the next attempt + */ + TimedAttemptSettings createNextAttempt( + Throwable prevThrowable, TimedAttemptSettings prevSettings); + + /** + * Returns {@code true} if another attempt should be made, or {@code false} otherwise. + * + * @param prevThrowable exception thrown by the previous attempt + */ + boolean accept(Throwable prevThrowable); +} diff --git a/src/main/java/com/google/api/gax/retrying/ExponentialRetryAlgorithm.java b/src/main/java/com/google/api/gax/retrying/ExponentialRetryAlgorithm.java new file mode 100644 index 000000000..133c2d2ff --- /dev/null +++ b/src/main/java/com/google/api/gax/retrying/ExponentialRetryAlgorithm.java @@ -0,0 +1,139 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.retrying; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.gax.core.ApiClock; +import com.google.api.gax.core.RetrySettings; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import org.joda.time.Duration; + +/** + * The timed retry algorithm which uses randomized exponential backoff factor for calculating the + * next attempt execution time. + * + *

+ * This class is thread-safe. + */ +public class ExponentialRetryAlgorithm implements TimedRetryAlgorithm { + + private final RetrySettings globalSettings; + private final ApiClock clock; + + /** + * Creates a new exponential retry algorithm instance. + * + * @param globalSettings global retry settings (attempt independent) + * @param clock clock to use for time-specific calculations + * @throws NullPointerException if either {@code globalSettings} or {@code clock} is null + */ + public ExponentialRetryAlgorithm(RetrySettings globalSettings, ApiClock clock) { + this.globalSettings = checkNotNull(globalSettings); + this.clock = checkNotNull(clock); + } + + /** + * Creates a first attempt {@link TimedAttemptSettings}. The first attempt is configured to be + * executed immediately. + * + * @return first attempt settings + */ + @Override + public TimedAttemptSettings createFirstAttempt() { + return new TimedAttemptSettings( + globalSettings, + Duration.ZERO, + globalSettings.getTotalTimeout(), + Duration.ZERO, + 0, + clock.nanoTime()); + } + + /** + * Creates a next attempt {@link TimedAttemptSettings}. The implementation increments the current + * attempt count and uses randomized exponential backoff factor for calculating next attempt + * execution time. + * + * @param prevSettings previous attempt settings + * @return next attempt settings + */ + @Override + public TimedAttemptSettings createNextAttempt(TimedAttemptSettings prevSettings) { + RetrySettings settings = prevSettings.getGlobalSettings(); + + long newRetryDelay = settings.getInitialRetryDelay().getMillis(); + long newRpcTimeout = settings.getInitialRpcTimeout().getMillis(); + + if (prevSettings.getAttemptCount() > 0) { + newRetryDelay = + (long) (settings.getRetryDelayMultiplier() * prevSettings.getRetryDelay().getMillis()); + newRetryDelay = Math.min(newRetryDelay, settings.getMaxRetryDelay().getMillis()); + newRpcTimeout = + (long) (settings.getRpcTimeoutMultiplier() * prevSettings.getRpcTimeout().getMillis()); + newRpcTimeout = Math.min(newRpcTimeout, settings.getMaxRpcTimeout().getMillis()); + } + + return new TimedAttemptSettings( + prevSettings.getGlobalSettings(), + Duration.millis(newRetryDelay), + Duration.millis(newRpcTimeout), + Duration.millis(ThreadLocalRandom.current().nextLong(newRetryDelay)), + prevSettings.getAttemptCount() + 1, + prevSettings.getFirstAttemptStartTime()); + } + + /** + * Returns {@code true} if another attempt should be made, or {@code false} otherwise. + * + * @param nextAttemptSettings attempt settings, which will be used for the next attempt, if + * accepted + * @return {@code true} if {@code nextAttemptSettings} does not exceed either maxAttempts limit or + * totalTimeout limit, or {@code false} otherwise + */ + @Override + public boolean accept(TimedAttemptSettings nextAttemptSettings) { + RetrySettings globalSettings = nextAttemptSettings.getGlobalSettings(); + long randRetryDelayMillis = nextAttemptSettings.getRandomizedRetryDelay().getMillis(); + long totalTimeSpentNanos = + clock.nanoTime() + - nextAttemptSettings.getFirstAttemptStartTime() + + TimeUnit.NANOSECONDS.convert(randRetryDelayMillis, TimeUnit.MILLISECONDS); + + long totalTimeoutMillis = globalSettings.getTotalTimeout().getMillis(); + long totalTimeoutNanos = + TimeUnit.NANOSECONDS.convert(totalTimeoutMillis, TimeUnit.MILLISECONDS); + + return totalTimeSpentNanos <= totalTimeoutNanos + && (globalSettings.getMaxAttempts() <= 0 + || nextAttemptSettings.getAttemptCount() < globalSettings.getMaxAttempts()); + } +} diff --git a/src/main/java/com/google/api/gax/retrying/RetryAlgorithm.java b/src/main/java/com/google/api/gax/retrying/RetryAlgorithm.java new file mode 100644 index 000000000..9cfc007ab --- /dev/null +++ b/src/main/java/com/google/api/gax/retrying/RetryAlgorithm.java @@ -0,0 +1,97 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.retrying; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The retry algorithm, which makes decision based on the thrown exception and execution time + * settings of the previous attempt. + * + * This class is thread-safe. + */ +public class RetryAlgorithm { + private final TimedRetryAlgorithm timedAlgorithm; + private final ExceptionRetryAlgorithm exceptionAlgorithm; + + /** + * Creates a new retry algorithm instance, which uses {@code exceptionAlgorithm} and + * {@code timedAlgorithm} to make a decision. {@code exceptionAlgorithm} has higher priority than + * the {@code timedAlgorithm}. + * + * @param timedAlgorithm timed algorithm to use + * @param exceptionAlgorithm exception algorithm to use + */ + public RetryAlgorithm( + ExceptionRetryAlgorithm exceptionAlgorithm, TimedRetryAlgorithm timedAlgorithm) { + this.timedAlgorithm = checkNotNull(timedAlgorithm); + this.exceptionAlgorithm = checkNotNull(exceptionAlgorithm); + } + + /** + * Creates a first attempt {@link TimedAttemptSettings}. + * + * @return first attempt settings + */ + public TimedAttemptSettings createFirstAttempt() { + return timedAlgorithm.createFirstAttempt(); + } + + /** + * Creates a next attempt {@link TimedAttemptSettings}. This method will return the + * exception-specific next attempt settings, if there are any, otherwise it will default to the + * time-specific settings. + * + * @param prevThrowable exception thrown by the previous attempt + * @param prevSettings previous attempt settings + * @return next attempt settings + */ + public TimedAttemptSettings createNextAttempt( + Throwable prevThrowable, TimedAttemptSettings prevSettings) { + TimedAttemptSettings newSettings = + exceptionAlgorithm.createNextAttempt(prevThrowable, prevSettings); + if (newSettings == null) { + newSettings = timedAlgorithm.createNextAttempt(prevSettings); + } + return newSettings; + } + + /** + * Returns {@code true} if another attempt should be made, or {@code false} otherwise. This method + * will return {@code true} only if both timed and exception algorithms return true. + * + * @param nextAttemptSettings attempt settings, which will be used for the next attempt, if + * accepted + * @return {@code true} if another attempt should be made, or {@code false} otherwise + */ + boolean accept(Throwable prevThrowable, TimedAttemptSettings nextAttemptSettings) { + return exceptionAlgorithm.accept(prevThrowable) && timedAlgorithm.accept(nextAttemptSettings); + } +} diff --git a/src/main/java/com/google/api/gax/retrying/RetryingExecutor.java b/src/main/java/com/google/api/gax/retrying/RetryingExecutor.java new file mode 100644 index 000000000..38b4deac7 --- /dev/null +++ b/src/main/java/com/google/api/gax/retrying/RetryingExecutor.java @@ -0,0 +1,66 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.retrying; + +import java.util.concurrent.Callable; + +/** + * A retrying executor is responsible for the following operations: + * + *

    + *
  1. Creating first attempt {@link RetryingFuture}, which acts as a facade, hiding from client + * code the actual execution of scheduled retry attempts. + *
  2. Executing the actual {@link Callable} in a retriable context. + *
+ * + * This interface is for internal/advanced use only. + * + * @param response type + */ +public interface RetryingExecutor { + /** + * Creates the {@link RetryingFuture}, which is a facade, returned to the client code to wait for + * any retriable operation to complete. + * + * @param callable the actual callable, which should be executed in a retriable context + * @return retrying future facade + */ + RetryingFuture createFuture(Callable callable); + + /** + * Submits an attempt for execution. A typical implementation will either try to execute the + * attempt in the current thread or schedule it for an execution, using some sort of async + * execution service. + * + * @param retryingFuture the future previously returned by {@link #createFuture(Callable)} and + * reused for each subsequent attempt of same operation. + */ + void submit(RetryingFuture retryingFuture); +} diff --git a/src/main/java/com/google/api/gax/retrying/RetryingFuture.java b/src/main/java/com/google/api/gax/retrying/RetryingFuture.java new file mode 100644 index 000000000..6138096a6 --- /dev/null +++ b/src/main/java/com/google/api/gax/retrying/RetryingFuture.java @@ -0,0 +1,60 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.retrying; + +import com.google.api.gax.core.ApiFuture; +import java.util.concurrent.Callable; + +/** + * Represents a retrying future. This is a facade hiding all the complications of an asynchronous + * execution of a retriable task. + * + * This interface is for advanced/internal use only. + * + * @param response type + */ +public interface RetryingFuture extends ApiFuture { + + /** + * Sets the attempt future. This future represents a concrete retry attempt, potentially scheduled + * for execution in a some form of {@link java.util.concurrent.ScheduledExecutorService}. + * + * @param attemptFuture the attempt future + */ + void setAttemptFuture(ApiFuture attemptFuture); + + /** Returns current (active) attempt settings. */ + TimedAttemptSettings getAttemptSettings(); + + /** + * Returns callable tracked by this future. + */ + Callable getCallable(); +} diff --git a/src/main/java/com/google/api/gax/retrying/RetryingFutureImpl.java b/src/main/java/com/google/api/gax/retrying/RetryingFutureImpl.java new file mode 100644 index 000000000..3a6a90c53 --- /dev/null +++ b/src/main/java/com/google/api/gax/retrying/RetryingFutureImpl.java @@ -0,0 +1,183 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.google.api.gax.retrying; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.gax.core.ApiFuture; +import com.google.api.gax.core.ApiFutureCallback; +import com.google.api.gax.core.ApiFutures; +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.FutureCallback; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + +/** + * For internal use only. + * + *

+ * This class is the key component of the retry logic. It implements the {@link RetryingFuture} + * facade interface, and does the following: + * + *

    + *
  • Schedules the next attempt in case of a failure using the callback chaining technique. + *
  • Terminates retrying process if no more retries are accepted. + *
  • Propagates future cancellation in both directions (from this to the attempt and from the + * attempt to this) + *
+ * + * This class is thread-safe. + */ +class RetryingFutureImpl extends AbstractFuture + implements RetryingFuture { + + private final Object lock = new Object(); + private final Callable callable; + + private final RetryAlgorithm retryAlgorithm; + private final RetryingExecutor retryingExecutor; + + private volatile TimedAttemptSettings attemptSettings; + private volatile AttemptFutureCallback attemptFutureCallback; + + RetryingFutureImpl( + Callable callable, + RetryAlgorithm retryAlgorithm, + RetryingExecutor retryingExecutor) { + this.callable = checkNotNull(callable); + this.retryAlgorithm = checkNotNull(retryAlgorithm); + this.retryingExecutor = checkNotNull(retryingExecutor); + + this.attemptSettings = retryAlgorithm.createFirstAttempt(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + synchronized (lock) { + if (attemptFutureCallback != null) { + if (attemptFutureCallback.attemptFuture.cancel(mayInterruptIfRunning)) { + super.cancel(mayInterruptIfRunning); + } + return isCancelled(); + } else { + return super.cancel(mayInterruptIfRunning); + } + } + } + + @Override + public void setAttemptFuture(ApiFuture attemptFuture) { + if (isDone()) { + return; + } + synchronized (lock) { + if (isDone()) { + return; + } + if (attemptFuture != null) { + attemptFutureCallback = new AttemptFutureCallback(attemptFuture); + ApiFutures.addCallback(attemptFuture, attemptFutureCallback); + if (isCancelled()) { + attemptFuture.cancel(false); + } + } else { + attemptFutureCallback = null; + } + } + } + + @Override + public TimedAttemptSettings getAttemptSettings() { + synchronized (lock) { + return attemptSettings; + } + } + + @Override + public Callable getCallable() { + return callable; + } + + private void executeAttempt(Throwable delegateThrowable, Future prevAttemptFuture) { + try { + if (prevAttemptFuture.isCancelled()) { + cancel(false); + } + if (isDone()) { + return; + } + TimedAttemptSettings nextAttemptSettings = + retryAlgorithm.createNextAttempt(delegateThrowable, attemptSettings); + if (retryAlgorithm.accept(delegateThrowable, nextAttemptSettings)) { + attemptSettings = nextAttemptSettings; + retryingExecutor.submit(this); + } else { + setException(delegateThrowable); + } + } catch (Throwable e) { + setException(delegateThrowable); + } + } + + private class AttemptFutureCallback + implements FutureCallback, ApiFutureCallback { + + private Future attemptFuture; + + private AttemptFutureCallback(Future attemptFuture) { + this.attemptFuture = attemptFuture; + } + + @Override + public void onSuccess(ResponseT result) { + if (this == attemptFutureCallback && !isDone()) { + synchronized (lock) { + if (this == attemptFutureCallback && !isDone()) { + setAttemptFuture(null); + set(result); + } + } + } + } + + @Override + public void onFailure(Throwable t) { + if (this == attemptFutureCallback && !isDone()) { + synchronized (lock) { + if (this == attemptFutureCallback && !isDone()) { + setAttemptFuture(null); + executeAttempt(t, this.attemptFuture); + } + } + } + } + } +} diff --git a/src/main/java/com/google/api/gax/retrying/ScheduledRetryingExecutor.java b/src/main/java/com/google/api/gax/retrying/ScheduledRetryingExecutor.java new file mode 100644 index 000000000..121308f81 --- /dev/null +++ b/src/main/java/com/google/api/gax/retrying/ScheduledRetryingExecutor.java @@ -0,0 +1,100 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.retrying; + +import com.google.api.gax.core.internal.ListenableFutureToApiFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.concurrent.Callable; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * The retry executor which uses {@link ScheduledExecutorService} to schedule an attempt tasks. + * + *

+ * This class is thread-safe. + * + * @param response type + */ +public class ScheduledRetryingExecutor implements RetryingExecutor { + + private final RetryAlgorithm retryAlgorithm; + private final ListeningScheduledExecutorService scheduler; + + /** + * Creates a new scheduled retry executor, which will be using {@code scheduler} for actual + * attempts scheduling and {@code retryAlgorithm} for retrying strategy. + * + * @param retryAlgorithm retry algorithm to use + * @param scheduler scheduler + */ + public ScheduledRetryingExecutor( + RetryAlgorithm retryAlgorithm, ScheduledExecutorService scheduler) { + this.retryAlgorithm = retryAlgorithm; + this.scheduler = MoreExecutors.listeningDecorator(scheduler); + } + + /** + * Creates a {@link RetryingFuture}, which is a facade, returned to the client code to wait for + * any retriable operation to complete. The returned future is bounded to {@code this} executor + * instance. + * + * @param callable the actual callable, which should be executed in a retriable context + * @return retrying future facade + */ + @Override + public RetryingFuture createFuture(Callable callable) { + return new RetryingFutureImpl<>(callable, retryAlgorithm, this); + } + + /** + * Submits an attempt for execution in a different thread. + * + * @param retryingFuture the future previously returned by {@link #createFuture(Callable)} + */ + @Override + public void submit(RetryingFuture retryingFuture) { + ListenableFuture attemptFuture; + try { + attemptFuture = + scheduler.schedule( + retryingFuture.getCallable(), + retryingFuture.getAttemptSettings().getRandomizedRetryDelay().getMillis(), + TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException e) { + attemptFuture = Futures.immediateCancelledFuture(); + } + retryingFuture.setAttemptFuture(new ListenableFutureToApiFuture<>(attemptFuture)); + } +} diff --git a/src/main/java/com/google/api/gax/retrying/TimedAttemptSettings.java b/src/main/java/com/google/api/gax/retrying/TimedAttemptSettings.java new file mode 100644 index 000000000..eb8da03ea --- /dev/null +++ b/src/main/java/com/google/api/gax/retrying/TimedAttemptSettings.java @@ -0,0 +1,107 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.retrying; + +import com.google.api.gax.core.ApiClock; +import com.google.api.gax.core.RetrySettings; +import org.joda.time.Duration; + +/** + * Timed attempt execution settings. Defines time-specific properties of a retry attempt. + */ +public class TimedAttemptSettings { + + private final RetrySettings globalSettings; + private final Duration retryDelay; + private final Duration rpcTimeout; + private final Duration randomizedRetryDelay; + private final int attemptCount; + private final long firstAttemptStartTime; + + public TimedAttemptSettings( + RetrySettings globalSettings, + Duration retryDelay, + Duration rpcTimeout, + Duration randomizedRetryDelay, + int attemptCount, + long firstAttemptStartTime) { + this.globalSettings = globalSettings; + this.retryDelay = retryDelay; + this.rpcTimeout = rpcTimeout; + this.randomizedRetryDelay = randomizedRetryDelay; + this.attemptCount = attemptCount; + this.firstAttemptStartTime = firstAttemptStartTime; + } + + /** + * Returns global (attempt-independent) retry settings. + */ + public RetrySettings getGlobalSettings() { + return globalSettings; + } + + /** + * Returns the calculated retry delay. Note that the actual delay used for retry scheduling may be + * different (randomized, based on this value). + */ + public Duration getRetryDelay() { + return retryDelay; + } + + /** + * Returns rpc timeout used for this attempt. + */ + public Duration getRpcTimeout() { + return rpcTimeout; + } + + /** + * Returns randomized attempt delay. By default this value is calculated based on the + * {@code retryDelay} value, and is used as the actual attempt execution delay. + */ + public Duration getRandomizedRetryDelay() { + return randomizedRetryDelay; + } + + /** + * The attempt count. It is a zero-based value (first attempt will have this value set to 0). + */ + public int getAttemptCount() { + return attemptCount; + } + + /** + * The start time of the first attempt. Note that this value is dependent on the actual + * {@link ApiClock} used during the process. + */ + public long getFirstAttemptStartTime() { + return firstAttemptStartTime; + } +} diff --git a/src/main/java/com/google/api/gax/retrying/TimedRetryAlgorithm.java b/src/main/java/com/google/api/gax/retrying/TimedRetryAlgorithm.java new file mode 100644 index 000000000..612e0e30d --- /dev/null +++ b/src/main/java/com/google/api/gax/retrying/TimedRetryAlgorithm.java @@ -0,0 +1,71 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.google.api.gax.retrying; + +/** + * A timed retry algorithm is responsible for the following operations: + * + *

    + *
  1. Creating first attempt {@link TimedAttemptSettings}. + *
  2. Accepting or rejecting a task for retry depending on the previous attempt settings and + * current time. + *
  3. Creating {@link TimedAttemptSettings} for each subsequent retry attempt. + *
+ * + * Implementations of this interface must be be thread-save. + */ +public interface TimedRetryAlgorithm { + + /** + * Creates a first attempt {@link TimedAttemptSettings}. + * + * @return first attempt settings + */ + TimedAttemptSettings createFirstAttempt(); + + /** + * Creates a next attempt {@link TimedAttemptSettings}, which defines properties of the next + * attempt. + * + * @param prevSettings previous attempt settings + * @return next attempt settings or {@code null} if the implementing algorithm does not provide + * specific settings for the next attempt + */ + TimedAttemptSettings createNextAttempt(TimedAttemptSettings prevSettings); + + /** + * Returns {@code true} if another attempt should be made, or {@code false} otherwise. + * + * @param nextAttemptSettings attempt settings, which will be used for the next attempt, if + * accepted + */ + boolean accept(TimedAttemptSettings nextAttemptSettings); +} diff --git a/src/test/java/com/google/api/gax/grpc/FakeNanoClock.java b/src/test/java/com/google/api/gax/core/FakeApiClock.java similarity index 86% rename from src/test/java/com/google/api/gax/grpc/FakeNanoClock.java rename to src/test/java/com/google/api/gax/core/FakeApiClock.java index 1fb7695cc..d46df30f2 100644 --- a/src/test/java/com/google/api/gax/grpc/FakeNanoClock.java +++ b/src/test/java/com/google/api/gax/core/FakeApiClock.java @@ -27,13 +27,14 @@ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -package com.google.api.gax.grpc; +package com.google.api.gax.core; -class FakeNanoClock implements NanoClock { +import java.util.concurrent.TimeUnit; +public class FakeApiClock implements ApiClock { private volatile long currentNanoTime; - public FakeNanoClock(long initialNanoTime) { + public FakeApiClock(long initialNanoTime) { currentNanoTime = initialNanoTime; } @@ -42,6 +43,11 @@ public long nanoTime() { return currentNanoTime; } + @Override + public long millisTime() { + return TimeUnit.MILLISECONDS.convert(nanoTime(), TimeUnit.NANOSECONDS); + } + public void setCurrentNanoTime(long nanoTime) { currentNanoTime = nanoTime; } diff --git a/src/test/java/com/google/api/gax/grpc/CancellationHelpers.java b/src/test/java/com/google/api/gax/grpc/CancellationHelpers.java index c38eeb77b..f9891f7a1 100644 --- a/src/test/java/com/google/api/gax/grpc/CancellationHelpers.java +++ b/src/test/java/com/google/api/gax/grpc/CancellationHelpers.java @@ -47,10 +47,12 @@ public static void cancelInThreadAfterLatchCountDown( public void run() { try { latch.await(); + while (!resultFuture.cancel(true)) { + Thread.sleep(1L); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - resultFuture.cancel(true); } }); t.start(); diff --git a/src/test/java/com/google/api/gax/grpc/CancellationTest.java b/src/test/java/com/google/api/gax/grpc/CancellationTest.java index 7b4f92cff..5a898d2e3 100644 --- a/src/test/java/com/google/api/gax/grpc/CancellationTest.java +++ b/src/test/java/com/google/api/gax/grpc/CancellationTest.java @@ -29,11 +29,15 @@ */ package com.google.api.gax.grpc; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.when; + import com.google.api.gax.core.AbstractApiFuture; import com.google.api.gax.core.ApiFuture; +import com.google.api.gax.core.FakeApiClock; import com.google.api.gax.core.RetrySettings; import com.google.api.gax.core.SettableApiFuture; -import com.google.api.gax.grpc.UnaryCallable.Scheduler; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.truth.Truth; @@ -54,6 +58,8 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; @RunWith(JUnit4.class) public class CancellationTest { @@ -82,15 +88,15 @@ public class CancellationTest { .setTotalTimeout(Duration.millis(3000L)) .build(); - private FakeNanoClock fakeClock; + private FakeApiClock fakeClock; private RecordingScheduler executor; @Rule public ExpectedException thrown = ExpectedException.none(); @Before public void resetClock() { - fakeClock = new FakeNanoClock(System.nanoTime()); - executor = new RecordingScheduler(fakeClock); + fakeClock = new FakeApiClock(System.nanoTime()); + executor = RecordingScheduler.create(fakeClock); } @After @@ -104,9 +110,9 @@ public void cancellationBeforeGetOnRetryingCallable() throws Exception { Mockito.when(callInt.futureCall((Integer) Mockito.any(), (CallContext) Mockito.any())) .thenReturn(SettableApiFuture.create()); - ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); + ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); UnaryCallable callable = - UnaryCallable.create(callInt) + UnaryCallable.create(callInt) .retryableOn(retryable) .retrying(FAST_RETRY_SETTINGS, executor, fakeClock); @@ -115,24 +121,36 @@ public void cancellationBeforeGetOnRetryingCallable() throws Exception { resultFuture.get(); } - private static class LatchCountDownScheduler implements UnaryCallable.Scheduler { - private final ScheduledExecutorService executor; - private final CountDownLatch latch; - - LatchCountDownScheduler(CountDownLatch latch) { - this.executor = new ScheduledThreadPoolExecutor(1); - this.latch = latch; - } - - @Override - public ScheduledFuture schedule(Runnable runnable, long delay, TimeUnit unit) { - latch.countDown(); - return executor.schedule(runnable, delay, unit); - } - - @Override - public List shutdownNow() { - return executor.shutdownNow(); + private abstract static class LatchCountDownScheduler implements ScheduledExecutorService { + private static LatchCountDownScheduler get(final CountDownLatch latch) { + LatchCountDownScheduler mock = Mockito.mock(LatchCountDownScheduler.class); + + // mock class fields: + final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1); + + // mock class methods: + // ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit); + when(mock.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) + .then( + new Answer>() { + @Override + public ScheduledFuture answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + latch.countDown(); + return executor.schedule((Runnable) args[0], (Long) args[1], (TimeUnit) args[2]); + } + }); + // List shutdownNow() + when(mock.shutdownNow()) + .then( + new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return executor.shutdownNow(); + } + }); + + return mock; } } @@ -184,16 +202,13 @@ public void cancellationDuringFirstCall() throws Exception { CancellationTrackingFuture innerFuture = CancellationTrackingFuture.create(); CountDownLatch callIssuedLatch = new CountDownLatch(1); FutureCallable innerCallable = - new LatchCountDownFutureCallable(callIssuedLatch, innerFuture); + new LatchCountDownFutureCallable<>(callIssuedLatch, innerFuture); - ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); + ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); UnaryCallable callable = - UnaryCallable.create(innerCallable) + UnaryCallable.create(innerCallable) .retryableOn(retryable) - .retrying( - FAST_RETRY_SETTINGS, - new UnaryCallable.DelegatingScheduler(new ScheduledThreadPoolExecutor(1)), - fakeClock); + .retrying(FAST_RETRY_SETTINGS, new ScheduledThreadPoolExecutor(1), fakeClock); ApiFuture resultFuture = callable.futureCall(0); CancellationHelpers.cancelInThreadAfterLatchCountDown(resultFuture, callIssuedLatch); @@ -210,16 +225,16 @@ public void cancellationDuringFirstCall() throws Exception { @Test public void cancellationDuringRetryDelay() throws Exception { Throwable throwable = Status.UNAVAILABLE.asException(); - CancellationTrackingFuture innerFuture = CancellationTrackingFuture.create(); + CancellationTrackingFuture innerFuture = CancellationTrackingFuture.create(); Mockito.when(callInt.futureCall((Integer) Mockito.any(), (CallContext) Mockito.any())) .thenReturn(UnaryCallableTest.immediateFailedFuture(throwable)) .thenReturn(innerFuture); CountDownLatch retryScheduledLatch = new CountDownLatch(1); - Scheduler scheduler = new LatchCountDownScheduler(retryScheduledLatch); - ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); + LatchCountDownScheduler scheduler = LatchCountDownScheduler.get(retryScheduledLatch); + ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); UnaryCallable callable = - UnaryCallable.create(callInt) + UnaryCallable.create(callInt) .retryableOn(retryable) .retrying(SLOW_RETRY_SETTINGS, scheduler, fakeClock); @@ -232,6 +247,8 @@ public void cancellationDuringRetryDelay() throws Exception { gotException = e; } Truth.assertThat(gotException).isNotNull(); + Truth.assertThat(resultFuture.isDone()).isTrue(); + Truth.assertThat(resultFuture.isCancelled()).isTrue(); Truth.assertThat(innerFuture.isCancelled()).isFalse(); } @@ -239,21 +256,18 @@ public void cancellationDuringRetryDelay() throws Exception { public void cancellationDuringSecondCall() throws Exception { Throwable throwable = Status.UNAVAILABLE.asException(); ApiFuture failingFuture = UnaryCallableTest.immediateFailedFuture(throwable); - CancellationTrackingFuture innerFuture = CancellationTrackingFuture.create(); + CancellationTrackingFuture innerFuture = CancellationTrackingFuture.create(); CountDownLatch callIssuedLatch = new CountDownLatch(2); @SuppressWarnings("unchecked") FutureCallable innerCallable = - new LatchCountDownFutureCallable( - callIssuedLatch, Lists.>newArrayList(failingFuture, innerFuture)); + new LatchCountDownFutureCallable<>( + callIssuedLatch, Lists.newArrayList(failingFuture, innerFuture)); - ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); + ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); UnaryCallable callable = - UnaryCallable.create(innerCallable) + UnaryCallable.create(innerCallable) .retryableOn(retryable) - .retrying( - FAST_RETRY_SETTINGS, - new UnaryCallable.DelegatingScheduler(new ScheduledThreadPoolExecutor(1)), - fakeClock); + .retrying(FAST_RETRY_SETTINGS, new ScheduledThreadPoolExecutor(1), fakeClock); ApiFuture resultFuture = callable.futureCall(0); CancellationHelpers.cancelInThreadAfterLatchCountDown(resultFuture, callIssuedLatch); @@ -264,6 +278,8 @@ public void cancellationDuringSecondCall() throws Exception { gotException = e; } Truth.assertThat(gotException).isNotNull(); - Truth.assertThat(innerFuture.isCancelled()).isTrue(); + Truth.assertThat(resultFuture.isDone()).isTrue(); + Truth.assertThat(resultFuture.isCancelled()).isTrue(); + Truth.assertThat(innerFuture.isDone()).isTrue(); } } diff --git a/src/test/java/com/google/api/gax/grpc/RecordingScheduler.java b/src/test/java/com/google/api/gax/grpc/RecordingScheduler.java index 66f9c4c11..e82a80fb1 100644 --- a/src/test/java/com/google/api/gax/grpc/RecordingScheduler.java +++ b/src/test/java/com/google/api/gax/grpc/RecordingScheduler.java @@ -29,6 +29,11 @@ */ package com.google.api.gax.grpc; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.when; + +import com.google.api.gax.core.FakeApiClock; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledExecutorService; @@ -36,34 +41,52 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.joda.time.Duration; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; -class RecordingScheduler implements UnaryCallable.Scheduler { - private final ScheduledExecutorService executor; - private final List sleepDurations = new ArrayList<>(); - private final FakeNanoClock clock; +abstract class RecordingScheduler implements ScheduledExecutorService { - public RecordingScheduler(FakeNanoClock clock) { - this.executor = new ScheduledThreadPoolExecutor(1); - this.clock = clock; - } + abstract List getSleepDurations(); - @Override - public ScheduledFuture schedule(Runnable runnable, long delay, TimeUnit unit) { - sleepDurations.add(new Duration(TimeUnit.MILLISECONDS.convert(delay, unit))); - clock.setCurrentNanoTime(clock.nanoTime() + TimeUnit.NANOSECONDS.convert(delay, unit)); - return executor.schedule(runnable, 0, TimeUnit.NANOSECONDS); - } + static RecordingScheduler create(final FakeApiClock clock) { + RecordingScheduler mock = Mockito.mock(RecordingScheduler.class); - @Override - public List shutdownNow() { - return executor.shutdownNow(); - } + // mock class fields: + final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1); + final List sleepDurations = new ArrayList<>(); - public List getSleepDurations() { - return sleepDurations; - } + // mock class methods: + // ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit); + when(mock.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) + .then( + new Answer>() { + @Override + public ScheduledFuture answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + Runnable runnable = (Runnable) args[0]; + Long delay = (Long) args[1]; + TimeUnit unit = (TimeUnit) args[2]; + sleepDurations.add(new Duration(TimeUnit.MILLISECONDS.convert(delay, unit))); + clock.setCurrentNanoTime( + clock.nanoTime() + TimeUnit.NANOSECONDS.convert(delay, unit)); + return executor.schedule(runnable, 0, TimeUnit.NANOSECONDS); + } + }); + + // List shutdownNow() + when(mock.shutdownNow()) + .then( + new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return executor.shutdownNow(); + } + }); + + // List getSleepDurations() + when(mock.getSleepDurations()).thenReturn(sleepDurations); - public FakeNanoClock getClock() { - return clock; + return mock; } } diff --git a/src/test/java/com/google/api/gax/grpc/UnaryCallableTest.java b/src/test/java/com/google/api/gax/grpc/UnaryCallableTest.java index 05ea21c0d..2d92bb619 100644 --- a/src/test/java/com/google/api/gax/grpc/UnaryCallableTest.java +++ b/src/test/java/com/google/api/gax/grpc/UnaryCallableTest.java @@ -33,6 +33,7 @@ import com.google.api.gax.batching.RequestBuilder; import com.google.api.gax.core.ApiFuture; import com.google.api.gax.core.ApiFutures; +import com.google.api.gax.core.FakeApiClock; import com.google.api.gax.core.FixedSizeCollection; import com.google.api.gax.core.FlowControlSettings; import com.google.api.gax.core.FlowController.LimitExceededBehavior; @@ -91,14 +92,14 @@ static ApiFuture immediateFailedFuture(Throwable t) { return ApiFutures.immediateFailedFuture(t); } - private FakeNanoClock fakeClock; + private FakeApiClock fakeClock; private RecordingScheduler executor; private ScheduledExecutorService batchingExecutor; @Before public void resetClock() { - fakeClock = new FakeNanoClock(System.nanoTime()); - executor = new RecordingScheduler(fakeClock); + fakeClock = new FakeApiClock(System.nanoTime()); + executor = RecordingScheduler.create(fakeClock); } @Before @@ -276,6 +277,62 @@ public void retry() { Truth.assertThat(callable.call(1)).isEqualTo(2); } + @Test(expected = ApiException.class) + public void retryTotalTimeoutExceeded() { + ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); + Throwable throwable = Status.UNAVAILABLE.asException(); + Mockito.when(callInt.futureCall((Integer) Mockito.any(), (CallContext) Mockito.any())) + .thenReturn(UnaryCallableTest.immediateFailedFuture(throwable)) + .thenReturn(immediateFuture(2)); + + RetrySettings retrySettings = + FAST_RETRY_SETTINGS + .toBuilder() + .setInitialRetryDelay(Duration.millis(Integer.MAX_VALUE)) + .setMaxRetryDelay(Duration.millis(Integer.MAX_VALUE)) + .build(); + UnaryCallable callable = + UnaryCallable.create(callInt) + .retryableOn(retryable) + .retrying(retrySettings, executor, fakeClock); + callable.call(1); + } + + @Test(expected = ApiException.class) + public void retryMaxAttemptsExeeded() { + ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); + Throwable throwable = Status.UNAVAILABLE.asException(); + Mockito.when(callInt.futureCall((Integer) Mockito.any(), (CallContext) Mockito.any())) + .thenReturn(UnaryCallableTest.immediateFailedFuture(throwable)) + .thenReturn(UnaryCallableTest.immediateFailedFuture(throwable)) + .thenReturn(immediateFuture(2)); + + RetrySettings retrySettings = FAST_RETRY_SETTINGS.toBuilder().setMaxAttempts(2).build(); + UnaryCallable callable = + UnaryCallable.create(callInt) + .retryableOn(retryable) + .retrying(retrySettings, executor, fakeClock); + callable.call(1); + } + + @Test + public void retryWithinMaxAttempts() { + ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); + Throwable throwable = Status.UNAVAILABLE.asException(); + Mockito.when(callInt.futureCall((Integer) Mockito.any(), (CallContext) Mockito.any())) + .thenReturn(UnaryCallableTest.immediateFailedFuture(throwable)) + .thenReturn(UnaryCallableTest.immediateFailedFuture(throwable)) + .thenReturn(immediateFuture(2)); + + RetrySettings retrySettings = FAST_RETRY_SETTINGS.toBuilder().setMaxAttempts(3).build(); + UnaryCallable callable = + UnaryCallable.create(callInt) + .retryableOn(retryable) + .retrying(retrySettings, executor, fakeClock); + callable.call(1); + Truth.assertThat(callable.call(1)).isEqualTo(2); + } + @Test public void retryOnStatusUnknown() { ImmutableSet retryable = ImmutableSet.of(Status.Code.UNKNOWN); diff --git a/src/test/java/com/google/api/gax/retrying/AbstractRetryingExecutorTest.java b/src/test/java/com/google/api/gax/retrying/AbstractRetryingExecutorTest.java new file mode 100644 index 000000000..3d11e99a5 --- /dev/null +++ b/src/test/java/com/google/api/gax/retrying/AbstractRetryingExecutorTest.java @@ -0,0 +1,157 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.retrying; + +import static com.google.api.gax.retrying.FailingCallable.FAST_RETRY_SETTINGS; +import static junit.framework.TestCase.assertFalse; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.api.gax.core.RetrySettings; +import com.google.api.gax.retrying.FailingCallable.CustomException; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import org.joda.time.Duration; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public abstract class AbstractRetryingExecutorTest { + + protected abstract RetryingExecutor getRetryingExecutor(RetrySettings retrySettings); + + protected ExceptionRetryAlgorithm getNoOpExceptionRetryAlgorithm() { + return new ExceptionRetryAlgorithm() { + @Override + public TimedAttemptSettings createNextAttempt( + Throwable prevThrowable, TimedAttemptSettings prevSettings) { + return null; + } + + @Override + public boolean accept(Throwable prevThrowable) { + return true; + } + }; + } + + @Test + public void testNoFailures() throws ExecutionException, InterruptedException { + FailingCallable callable = new FailingCallable(0, "SUCCESS"); + RetryingExecutor executor = getRetryingExecutor(FAST_RETRY_SETTINGS); + RetryingFuture future = executor.createFuture(callable); + executor.submit(future); + + assertEquals("SUCCESS", future.get()); + assertTrue(future.isDone()); + assertFalse(future.isCancelled()); + assertEquals(0, future.getAttemptSettings().getAttemptCount()); + } + + @Test + public void testSuccessWithFailures() throws ExecutionException, InterruptedException { + FailingCallable callable = new FailingCallable(5, "SUCCESS"); + RetryingExecutor executor = getRetryingExecutor(FAST_RETRY_SETTINGS); + RetryingFuture future = executor.createFuture(callable); + executor.submit(future); + + assertEquals("SUCCESS", future.get()); + assertTrue(future.isDone()); + assertFalse(future.isCancelled()); + assertEquals(5, future.getAttemptSettings().getAttemptCount()); + } + + @Test + public void testMaxRetriesExcceeded() { + FailingCallable callable = new FailingCallable(6, "FAILURE"); + RetryingExecutor executor = getRetryingExecutor(FAST_RETRY_SETTINGS); + RetryingFuture future = executor.createFuture(callable); + executor.submit(future); + + CustomException exception = null; + try { + future.get(); + } catch (Exception e) { + exception = (CustomException) e.getCause(); + } + assertEquals(CustomException.class, exception.getClass()); + assertEquals(5, future.getAttemptSettings().getAttemptCount()); + assertTrue(future.isDone()); + assertFalse(future.isCancelled()); + } + + @Test + public void testTotalTimeoutExcceeded() throws Exception { + RetrySettings retrySettings = + FAST_RETRY_SETTINGS + .toBuilder() + .setInitialRetryDelay(Duration.millis(Integer.MAX_VALUE)) + .setMaxRetryDelay(Duration.millis(Integer.MAX_VALUE)) + .build(); + RetryingExecutor executor = getRetryingExecutor(retrySettings); + FailingCallable callable = new FailingCallable(6, "FAILURE"); + RetryingFuture future = executor.createFuture(callable); + executor.submit(future); + + CustomException exception = null; + try { + future.get(); + } catch (Exception e) { + exception = (CustomException) e.getCause(); + } + assertEquals(CustomException.class, exception.getClass()); + assertTrue(future.getAttemptSettings().getAttemptCount() < 4); + assertTrue(future.isDone()); + assertFalse(future.isCancelled()); + } + + @Test(expected = CancellationException.class) + public void testCancelOuterFuture() throws ExecutionException, InterruptedException { + FailingCallable callable = new FailingCallable(4, "SUCCESS"); + + RetrySettings retrySettings = + FAST_RETRY_SETTINGS + .toBuilder() + .setInitialRetryDelay(Duration.millis(1_000L)) + .setMaxRetryDelay(Duration.millis(1_000L)) + .setTotalTimeout(Duration.millis(10_0000L)) + .build(); + RetryingExecutor executor = getRetryingExecutor(retrySettings); + RetryingFuture future = executor.createFuture(callable); + future.cancel(false); + executor.submit(future); + + assertTrue(future.isDone()); + assertTrue(future.isCancelled()); + assertTrue(future.getAttemptSettings().getAttemptCount() < 4); + future.get(); + } +} diff --git a/src/test/java/com/google/api/gax/retrying/DirectRetryingExecutorTest.java b/src/test/java/com/google/api/gax/retrying/DirectRetryingExecutorTest.java new file mode 100644 index 000000000..03009d948 --- /dev/null +++ b/src/test/java/com/google/api/gax/retrying/DirectRetryingExecutorTest.java @@ -0,0 +1,49 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.retrying; + +import com.google.api.gax.core.CurrentMillisClock; +import com.google.api.gax.core.RetrySettings; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class DirectRetryingExecutorTest extends AbstractRetryingExecutorTest { + + @Override + protected RetryingExecutor getRetryingExecutor(RetrySettings retrySettings) { + RetryAlgorithm retryAlgorithm = + new RetryAlgorithm( + getNoOpExceptionRetryAlgorithm(), + new ExponentialRetryAlgorithm(retrySettings, CurrentMillisClock.getDefaultClock())); + + return new DirectRetryingExecutor<>(retryAlgorithm); + } +} diff --git a/src/test/java/com/google/api/gax/retrying/FailingCallable.java b/src/test/java/com/google/api/gax/retrying/FailingCallable.java new file mode 100644 index 000000000..c411a24a7 --- /dev/null +++ b/src/test/java/com/google/api/gax/retrying/FailingCallable.java @@ -0,0 +1,71 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.retrying; + +import com.google.api.gax.core.RetrySettings; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import org.joda.time.Duration; + +class FailingCallable implements Callable { + protected static final RetrySettings FAST_RETRY_SETTINGS = + RetrySettings.newBuilder() + .setMaxAttempts(6) + .setInitialRetryDelay(Duration.millis(2L)) + .setRetryDelayMultiplier(1) + .setMaxRetryDelay(Duration.millis(2L)) + .setInitialRpcTimeout(Duration.millis(2L)) + .setRpcTimeoutMultiplier(1) + .setMaxRpcTimeout(Duration.millis(2L)) + .setTotalTimeout(Duration.millis(100L)) + .build(); + + private AtomicInteger attemptsCount = new AtomicInteger(0); + private final int expectedFailuresCount; + private final String result; + + protected FailingCallable(int expectedFailuresCount, String result) { + this.expectedFailuresCount = expectedFailuresCount; + this.result = result; + } + + @Override + public String call() throws Exception { + if (attemptsCount.getAndIncrement() < expectedFailuresCount) { + throw new CustomException(); + } + return result; + } + + protected static class CustomException extends RuntimeException { + + private static final long serialVersionUID = -1543459008653697004L; + } +} diff --git a/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java b/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java new file mode 100644 index 000000000..436e93ee2 --- /dev/null +++ b/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java @@ -0,0 +1,121 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.retrying; + +import static com.google.api.gax.retrying.FailingCallable.FAST_RETRY_SETTINGS; +import static org.junit.Assert.assertTrue; + +import com.google.api.gax.core.NanoClock; +import com.google.api.gax.core.RetrySettings; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import org.joda.time.Duration; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ScheduledRetryingExecutorTest extends AbstractRetryingExecutorTest { + private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + + @After + public void after() { + executorService.shutdownNow(); + } + + @Override + protected RetryingExecutor getRetryingExecutor(RetrySettings retrySettings) { + RetryAlgorithm retryAlgorithm = + new RetryAlgorithm( + getNoOpExceptionRetryAlgorithm(), + new ExponentialRetryAlgorithm(retrySettings, NanoClock.getDefaultClock())); + + return new ScheduledRetryingExecutor<>(retryAlgorithm, executorService); + } + + @Test(expected = CancellationException.class) + public void testCancelOuterFutureAfterStart() throws ExecutionException, InterruptedException { + FailingCallable callable = new FailingCallable(4, "SUCCESS"); + RetrySettings retrySettings = + FAST_RETRY_SETTINGS + .toBuilder() + .setInitialRetryDelay(Duration.millis(1_000L)) + .setMaxRetryDelay(Duration.millis(1_000L)) + .setTotalTimeout(Duration.millis(10_0000L)) + .build(); + RetryingExecutor executor = getRetryingExecutor(retrySettings); + RetryingFuture future = executor.createFuture(callable); + executor.submit(future); + + future.cancel(false); + assertTrue(future.isDone()); + assertTrue(future.isCancelled()); + assertTrue(future.getAttemptSettings().getAttemptCount() < 4); + future.get(); + } + + @Test(expected = CancellationException.class) + public void testCancelProxiedFutureAfterStart() throws ExecutionException, InterruptedException { + FailingCallable callable = new FailingCallable(5, "SUCCESS"); + RetrySettings retrySettings = + FAST_RETRY_SETTINGS + .toBuilder() + .setInitialRetryDelay(Duration.millis(1_000L)) + .setMaxRetryDelay(Duration.millis(1_000L)) + .setTotalTimeout(Duration.millis(10_0000L)) + .build(); + RetryingExecutor executor = getRetryingExecutor(retrySettings); + RetryingFuture future = executor.createFuture(callable); + executor.submit(future); + + Thread.sleep(50L); + RetryingExecutor handler = getRetryingExecutor(retrySettings); + + //Note that shutdownNow() will not cancel internal FutureTasks automatically, which + //may potentially cause another thread handing on RetryingFuture#get() call forever. + //Canceling the tasks returned by shutdownNow() also does not help, because of missing feature + //in guava's ListenableScheduledFuture, which does not cancel itself, when its delegate is canceled. + //So only the graceful shutdown() is supported properly. + executorService.shutdown(); + + try { + future.get(); + } catch (CancellationException e) { + //Used to wait for cancellation to propagate, so isDone() is guaranteed to return true. + } + assertTrue(future.isDone()); + assertTrue(future.isCancelled()); + assertTrue(future.getAttemptSettings().getAttemptCount() < 4); + future.get(); + } +}